# -*- coding: utf-8 -*- #!/usr/bin/env python """ celery的配置文件 apps.tasks.config ~~~~~~~~~~~~~~~~~ """ import os from os.path import join, abspath from django.conf import settings from dotenv import get_key, load_dotenv from celery.schedules import crontab base_dir = os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/..") task_dir = join(base_dir, 'taskmanager') dotenv_path = abspath(join(base_dir, '.env.%s' % os.environ.get('MY_ENV'))) load_dotenv(dotenv_path) env = lambda key: get_key(dotenv_path, key) # ========== broker settings =========== #: broker url BROKER_URL = env('CELERY_BROKER_URL') # : 指定接受的内容类型 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # BROKER_POOL_LIMIT = 10 # CONNECTION RETRY仅对WORKER生效, 对PUBLISH不生效 # BROKER_CONNECTION_TIMEOUT = 4 # BROKER_CONNECTION_RETRY = True # BROKER_CONNECTION_MAX_RETRIES = 100 #BROKER_TRANSPORT_OPTIONS # ========== celery settings =========== MY_CELERY_QUEUE = settings.MY_CELERY_QUEUE MY_CELERY_ROUTING_KEY = settings.MY_CELERY_ROUTING_KEY POLL_RECHARGE_RECORD_QUEUE = 'poll_recharge_record' POLL_RECHARGE_RECORD_QUEUE_KEY = 'poll_recharge_record' DEALER_DATA_ANYLYSE_QUEUE = 'dealer_data_anylyse' DEALER_DATA_ANYLYSE_QUEUE_KEY = 'dealer_data_anylyse' # 设备离线的延时任务 安全起见这个地方仅仅区里发送任务的通知 不要访问数据库 作为独立的单元处理 DEVICE_OFFLINE_QUEUE = "device_offline" DEVICE_OFFLINE_QUEUE_KEY = "device_offline" OFFLINE_TASK_QUEUE = 'offline_task' OFFLINE_TASK_QUEUE_KEY = 'offline_task' REPORT_TASK_QUEUE = 'report_task' REPORT_TASK_QUEUE_KEY = 'report_task' CELERY_QUEUES = { MY_CELERY_ROUTING_KEY: { 'exchange': MY_CELERY_QUEUE, 'exchange_type': 'direct', 'binding_key': MY_CELERY_ROUTING_KEY }, POLL_RECHARGE_RECORD_QUEUE: { 'exchange': POLL_RECHARGE_RECORD_QUEUE, 'exchange_type': 'direct', 'binding_key': POLL_RECHARGE_RECORD_QUEUE_KEY }, DEALER_DATA_ANYLYSE_QUEUE: { 'exchange': DEALER_DATA_ANYLYSE_QUEUE, 'exchange_type': 'direct', 'binding_key': DEALER_DATA_ANYLYSE_QUEUE_KEY }, DEVICE_OFFLINE_QUEUE: { 'exchange': DEVICE_OFFLINE_QUEUE, 'exchange_type': 'direct', 'binding_key': DEVICE_OFFLINE_QUEUE_KEY }, OFFLINE_TASK_QUEUE: { 'exchange': OFFLINE_TASK_QUEUE, 'exchange_type': 'direct', 'binding_key': OFFLINE_TASK_QUEUE_KEY }, REPORT_TASK_QUEUE: { 'exchange': REPORT_TASK_QUEUE, 'exchange_type': 'direct', 'binding_key': REPORT_TASK_QUEUE_KEY } } CELERY_ROUTES = { 'tasks.device_offline_notify': { 'queue': DEVICE_OFFLINE_QUEUE, 'routing_key': DEVICE_OFFLINE_QUEUE_KEY }, 'tasks.report_new_payment_to_dealer_via_wechat': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.report_service_complete_to_user_via_wechat': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.add': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.report_to_user_via_wechat': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.send_msg_to_user_via_wechat': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.report_to_dealer_via_wechat': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.report_feedback_to_dealer_via_wechat': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.send_topic_command': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.send_to_xf_falut': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.send_to_xf_falut_handle': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'tasks.generate_simCharge_excel_report': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.generate_dealerWithDraw_excel_report': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.generate_biz_stats_for_manager': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.generate_business_stats_report_by_dealer': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.export_charge_order_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.export_consume_order_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.export_send_coins_to_card_order_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.export_on_points_order_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.export_API_order_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.export_group_stat_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.export_vcard_info_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.export_aggregate_dealer_income': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.import_simcard_excel_to_db': { 'queue': OFFLINE_TASK_QUEUE, 'routing_key': OFFLINE_TASK_QUEUE_KEY }, 'tasks.export_simcard_excel_from_db': { 'queue': OFFLINE_TASK_QUEUE, 'routing_key': OFFLINE_TASK_QUEUE_KEY }, 'tasks.export_device_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.poll_dealer_recharge_record': { 'queue': POLL_RECHARGE_RECORD_QUEUE, 'routing_key': POLL_RECHARGE_RECORD_QUEUE_KEY }, 'tasks.poll_user_recharge_record': { 'queue': POLL_RECHARGE_RECORD_QUEUE, 'routing_key': POLL_RECHARGE_RECORD_QUEUE_KEY }, 'tasks.test_sync': { 'queue': POLL_RECHARGE_RECORD_QUEUE, 'routing_key': POLL_RECHARGE_RECORD_QUEUE_KEY }, 'tasks.manager_export_charge_order_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.manager_export_dealer_info_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.manager_export_consume_order_excel_from_db': { 'queue': REPORT_TASK_QUEUE, 'routing_key': REPORT_TASK_QUEUE_KEY }, 'tasks.batch_set_device_params': { 'queue': OFFLINE_TASK_QUEUE, 'routing_key': OFFLINE_TASK_QUEUE_KEY }, 'tasks.set_device_params': { 'queue': OFFLINE_TASK_QUEUE, 'routing_key': OFFLINE_TASK_QUEUE_KEY }, 'tasks.batch_set_server_settings': { 'queue': OFFLINE_TASK_QUEUE, 'routing_key': OFFLINE_TASK_QUEUE_KEY }, 'tasks.set_server_settings': { 'queue': OFFLINE_TASK_QUEUE, 'routing_key': OFFLINE_TASK_QUEUE_KEY }, } # TODO zjl如果一个任务没有在visibility_timeout内被确认 则会将此任务分发给另一个worker执行 # TODO zjl那么当延时任务的countdown时间长度超过此 时间的时候,有可能会造成多个worker共同执行同一个任务 # TODO zjl使用redis作为消息队列的时候,此值的默认 是 3600s ,目前并没有超过这个时间的延时任务,暂时注释,需要的时候打开 # BROKER_TRANSPORT_OPTIONS = { # 'visibility_timeout': 10 # } CELERYBEAT_SCHEDULE = { 'report_daily_report_to_dealer_via_wechat': { 'task': 'tasks.report_daily_report_to_dealer_via_wechat', # : 每日9点运行 'schedule': crontab(minute = 0, hour = 9), # 'args': () , 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'weekly_notify_finance_manager': { 'task': 'tasks.weekly_notify_finance_manager', 'schedule': crontab(minute = 0, hour = 8, day_of_week = 'wed,fri'), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'remove_serviceProgress_periodically': { 'task': 'tasks.remove_serviceProgress_periodically', 'schedule': crontab(minute = 0, hour = 2), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'notify_virtual_card_expired': { 'task': 'tasks.notify_virtual_card_expired', 'schedule': crontab(minute = 0, hour = 15), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'handle_customer_complaints_yesterday': { 'task': 'tasks.handle_customer_complaints_yesterday', 'schedule': crontab(minute=0, hour=3), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'send_SIM_expired_messages_by_sms': { 'task': 'tasks.send_SIM_expired_messages', 'schedule': crontab(day_of_month = '1,6,11,19,23,25,26', minute = 0, hour = 10), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'args': ('sms',) }, 'send_SIM_expired_messages_by_wechat': { 'task': 'tasks.send_SIM_expired_messages', 'schedule': crontab(day_of_month = '1,6,11,19,23,25,26', minute = 0, hour = 20), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY }, 'args': ('wechat',) }, 'calc_dealer_stat_and_insert_into_db': { 'task': 'tasks.calc_dealer_stat_and_insert_into_db', 'schedule': crontab(minute = 10, hour = 00), 'options': { 'queue': DEALER_DATA_ANYLYSE_QUEUE, 'routing_key': DEALER_DATA_ANYLYSE_QUEUE_KEY } }, 'calc_dealer_user_count': { 'task': 'tasks.calc_dealer_user_count', 'schedule': crontab(minute = 30, hour = 0, day_of_week = 'fri'), 'options': { 'queue': DEALER_DATA_ANYLYSE_QUEUE, 'routing_key': DEALER_DATA_ANYLYSE_QUEUE_KEY } }, 'generate_manager_map_options': { 'task': 'tasks.generate_manager_map_options', 'schedule': crontab(minute = 0, hour = 1), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'check_wechat_withdraw_via_bank': { 'task': 'tasks.check_wechat_withdraw_via_bank', 'schedule': crontab(minute = 0, hour = 2), 'options': { 'queue': POLL_RECHARGE_RECORD_QUEUE, 'routing_key': POLL_RECHARGE_RECORD_QUEUE_KEY } }, 'check_and_retry_withdraw': { 'task': 'tasks.check_and_retry_withdraw', 'schedule': crontab(minute = 0, hour = 4), 'options': { 'queue': POLL_RECHARGE_RECORD_QUEUE, 'routing_key': POLL_RECHARGE_RECORD_QUEUE_KEY } }, "send_to_xf_all_dev_info": { "task": "tasks.send_to_xf_all_dev_info", "schedule": crontab(minute = 10, hour = 0), "options": { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, # 租用设备相关的 # 每日晚上凌晨2点生成订单 "gen_daily_rent_order": { "task": "tasks.gen_daily_rent_order", "schedule": crontab(minute=0, hour=2), "options": { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, # 每日下午3点进行订单扣款 "deduct_rent_order": { "task": "tasks.deduct_rent_order", "schedule": crontab(minute=0, hour=15), "options": { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, # 每天发送同步时间的消息。支持的是云快充协议的设备 "sync_device_time_for_tcpcar": { "task": "tasks.sync_device_time_for_tcpcar", "schedule": crontab(minute=0, hour=16), "options": { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, # 每天凌晨 1 点 对于退款订单进行一次拉取信息 'pull_refund_order': { 'task': 'tasks.pull_refund_order', 'schedule': crontab(minute=0, hour=1), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, # 浙江的消防局的消防 'report_to_zhejiang_fight': { 'task': 'tasks.report_to_zhejiang_fight', 'schedule': crontab(minute=0, hour=2), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, # 每天凌晨 2 点 运行一次经销商的自动提现 'dealer_auto_withdraw': { 'task': 'tasks.dealer_auto_withdraw', # 每个月1号执行任务 'schedule': crontab(minute=0, hour=2), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'sum_customer': { 'task': 'tasks.sum_customer', # 每个月1号执行任务 'schedule': crontab(minute = 0, hour = 0, day_of_month = 1), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'query_merchant_status': { 'task': 'tasks.query_merchant_status', 'schedule': crontab(minute=0, hour=3), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'dealer_auto_charge_sim_card':{ 'task': 'tasks.dealer_auto_charge_sim_card', 'schedule': crontab(day_of_month = '15', minute = 0, hour = 10), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, # 处理超过72小时的没有分账的订单 'check_not_ledger_recharge_record': { 'task': 'tasks.check_not_ledger_recharge_record', 'schedule': crontab(minute=0, hour=5), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, 'make_rpt_into_db': { 'task': 'tasks.make_rpt_into_db', 'schedule': crontab(minute = 30, hour = 2), 'options': { 'queue': MY_CELERY_QUEUE, 'routing_key': MY_CELERY_ROUTING_KEY } }, } CELERY_ENABLE_UTC = True CELERY_TIMEZONE = "Asia/Shanghai" CELERY_IGNORE_RESULT = True # : 任务序列化和反序列化使用msgpack方案 CELERY_TASK_SERIALIZER = 'msgpack' # : 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON CELERY_RESULT_SERIALIZER = 'json' # Decides if publishing task messages will be retried in the case of connection loss or other connection errors CELERY_TASK_PUBLISH_RETRY = False # :每个worker处理的最大任务数量 超过任务数量直接销毁, 同时释放内存 CELERYD_MAX_TASKS_PER_CHILD = 100 # : 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 #: 结果储存 CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') CELERYD_LOG_FORMAT = '[%(asctime)s] %(levelname)s[%(processName)s] %(module)s [%(name)s:%(lineno)d]: %(message)s' CELERYD_TASK_LOG_FORMAT = '[%(asctime)s] %(levelname)s[%(processName)s] %(module)s [%(name)s:%(lineno)d]:' \ ' [%(task_name)s(%(task_id)s)] %(message)s'