Explorar el Código

celery 队列Queue更换防止与现网的celery冲突

zjl hace 2 años
padre
commit
d6ad13fab8
Se han modificado 5 ficheros con 31 adiciones y 209 borrados
  1. 0 6
      apps/web/dealer/proxy.py
  2. 1 48
      apps/web/dealer/tasks.py
  3. 1 0
      apps/web/device/tasks.py
  4. 3 5
      configs/production.py
  5. 26 150
      taskmanager/config.py

+ 0 - 6
apps/web/dealer/proxy.py

@@ -421,12 +421,6 @@ class DealerGroupStats(Searchable):
         return self.save()
 
 
-
-
-
-
-
-
 def record_income_proxy(source, record, partitionMap=None, dateTime=None):
     # type: (str, RechargeRecord, dict, datetime.datetime)->Optional[DealerIncomeProxy]
     """

+ 1 - 48
apps/web/dealer/tasks.py

@@ -2055,53 +2055,6 @@ def auto_charge_sim_card(dealerId):
                     logger.exception(e)
 
 
-def check_not_ledger_recharge_record():
-    """
-    查询没有分账的订单对其进行分账 
-    task 任务每天执行一次, 任务窗口只查2天的时间
-    :return:
-    """
-    from apps.web.user.models import RefundMoneyRecord
-    # 昌源的厂商ID
-    managerId = "5d857a130030483f797808b4"
-    # 测试的ID
-    # managerId = "59974b4b8732d6480fb699e5"
-    # 查询结束时间是3天前
-    et = datetime.datetime.now() - datetime.timedelta(days = 3)
-    # 查询的开始时间比结束时间早2天
-    st = et - datetime.timedelta(days = 2)
-
-    agents = Agent.objects.filter(managerId = managerId)
-
-    for _agent in agents:
-        logger.info("agent is <{}>, ready to check not ledger record".format(_agent.username))
-
-        dealers = Dealer.objects.filter(agentId = str(_agent.id))
-
-        # 每个经销商单独执行 减轻对于RechargeRecord遍历的压力
-        for _dealer in dealers:
-            # 筛选出 支付成功的 该经销商的 快速启动的订单
-            records = RechargeRecord.objects.filter(
-                ownerId = str(_dealer.id),
-                via = "recharge",
-                isQuickPay = True,
-                result = RechargeRecord.PayResult.SUCCESS,
-                dateTimeAdded__gte = st,
-                dateTimeAdded__lte = et
-            )
-            for _record in records:
-                # 没有分帐的订单 并且没有退款 直接分账了事儿
-                if not DealerIncomeProxy.objects.filter(ref_id = _record.id) and not RefundMoneyRecord.objects.filter(
-                        rechargeObjId = ObjectId(_record.id)):
-                    try:
-                        # 以 订单 的当时的 groupId 为准
-                        Ledger(USER_RECHARGE_TYPE.RECHARGE, _record).execute(
-                            journal = False, stats = True, check = False)
-                    except Exception as e:
-                        logger.exception(e)
-                        continue
-
-
 def batch_set_device_params(logicalCodes, updateConf, lastSetConf, operationId):
     logger.info(
         'start batch_set_device_params, logicalCodes=<{}>, updateConf=<{}>, lastSetConf=<{}>, operationId=<{}>'.format(
@@ -2274,7 +2227,7 @@ def ledger_consume_order_stats(date=None, statsId=None):  # type: (Optional[str,
     统计经销商的每日分润的收益
     """
     # 获取时间戳的转换
-    data = date or datetime.datetime.now()
+    data = date or datetime.datetime.now() - datetime.timedelta(days=1)
     if isinstance(date, datetime.datetime):
         date = date.strftime("%Y-%m-%d")
 

+ 1 - 0
apps/web/device/tasks.py

@@ -319,6 +319,7 @@ def sync_device_time_for_tcpcar():
 groupid_from_key = lambda groupKey: groupKey.split('_')[1]
 dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1]
 
+
 # todo 凌晨2点半执行
 def make_rpt_into_db():
     from apps.web.report.models import DevReport, GroupReport, DealerReport

+ 3 - 5
configs/production.py

@@ -5,20 +5,18 @@ import os
 
 os.environ.setdefault('MY_DOMAIN', 'manyi.washpayer.com')
 os.environ.setdefault('FRONT_END_DOMAIN', 'manyi.washpayer.com')
-# os.environ.setdefault('COOKIE_DOMAIN', '.washpayer.com')
 
-os.environ.setdefault('MY_ENV', 'production')
+os.environ.setdefault('MY_ENV', 'manyi')
 
 # noinspection PyUnresolvedReferences
 from .base import *
 
-MY_CELERY_ROUTING_KEY = 'production'
-MY_CELERY_QUEUE = 'production'
+MY_CELERY_ROUTING_KEY = 'manyi'
+MY_CELERY_QUEUE = 'manyi'
 
 TEST_OPEN_ID = 'on7D-0SCNKSk6B_3xqK6-WuZPL2s'
 
 MY_PRIMARY_AGENT_ID = '6417d4456f29257125ebf705'
-# MY_PRIMARY_AGENT_WALLET_KEY = 'ledger-wechat-6417d4456f29257125ebf705-1480791292'
 
 CHECK_DEVICE_SMS_EXPIRE = True
 

+ 26 - 150
taskmanager/config.py

@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-#!/usr/bin/env python
+# !/usr/bin/env python
 """
 celery的配置文件
 apps.tasks.config
@@ -7,7 +7,6 @@ apps.tasks.config
 """
 import os
 from os.path import join, abspath
-from django.conf import settings
 
 from dotenv import get_key, load_dotenv
 from celery.schedules import crontab
@@ -19,7 +18,11 @@ task_dir = join(base_dir, 'taskmanager')
 dotenv_path = abspath(join(base_dir, '.env.%s' % os.environ.get('MY_ENV')))
 
 load_dotenv(dotenv_path)
-env = lambda key: get_key(dotenv_path, key)
+
+
+def env(key):
+    return get_key(dotenv_path, key)
+
 
 # ========== broker settings ===========
 #: broker url
@@ -28,35 +31,26 @@ BROKER_URL = env('CELERY_BROKER_URL')
 # : 指定接受的内容类型
 CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
 
-# BROKER_POOL_LIMIT = 10
-
-# CONNECTION RETRY仅对WORKER生效, 对PUBLISH不生效
-# BROKER_CONNECTION_TIMEOUT = 4
-# BROKER_CONNECTION_RETRY = True
-# BROKER_CONNECTION_MAX_RETRIES = 100
-
-#BROKER_TRANSPORT_OPTIONS
-
 # ========== celery settings ===========
 
-MY_CELERY_QUEUE = settings.MY_CELERY_QUEUE
-MY_CELERY_ROUTING_KEY = settings.MY_CELERY_ROUTING_KEY
+MY_CELERY_QUEUE = "manyi_celery"
+MY_CELERY_ROUTING_KEY = "manyi_celery"
 
-POLL_RECHARGE_RECORD_QUEUE = 'poll_recharge_record'
-POLL_RECHARGE_RECORD_QUEUE_KEY = 'poll_recharge_record'
+POLL_RECHARGE_RECORD_QUEUE = 'manyi_poll_recharge_record'
+POLL_RECHARGE_RECORD_QUEUE_KEY = 'manyi_poll_recharge_record'
 
-DEALER_DATA_ANYLYSE_QUEUE = 'dealer_data_anylyse'
-DEALER_DATA_ANYLYSE_QUEUE_KEY = 'dealer_data_anylyse'
+DEALER_DATA_ANYLYSE_QUEUE = 'manyi_dealer_data_anylyse'
+DEALER_DATA_ANYLYSE_QUEUE_KEY = 'manyi_dealer_data_anylyse'
 
 # 设备离线的延时任务 安全起见这个地方仅仅区里发送任务的通知 不要访问数据库 作为独立的单元处理
-DEVICE_OFFLINE_QUEUE = "device_offline"
-DEVICE_OFFLINE_QUEUE_KEY = "device_offline"
+DEVICE_OFFLINE_QUEUE = "manyi_device_offline"
+DEVICE_OFFLINE_QUEUE_KEY = "manyi_device_offline"
 
-OFFLINE_TASK_QUEUE = 'offline_task'
-OFFLINE_TASK_QUEUE_KEY = 'offline_task'
+OFFLINE_TASK_QUEUE = 'manyi_offline_task'
+OFFLINE_TASK_QUEUE_KEY = 'manyi_offline_task'
 
-REPORT_TASK_QUEUE = 'report_task'
-REPORT_TASK_QUEUE_KEY = 'report_task'
+REPORT_TASK_QUEUE = 'manyi_report_task'
+REPORT_TASK_QUEUE_KEY = 'manyi_report_task'
 
 CELERY_QUEUES = {
     MY_CELERY_ROUTING_KEY: {
@@ -147,11 +141,6 @@ CELERY_ROUTES = {
         'routing_key': MY_CELERY_ROUTING_KEY
     },
 
-    'tasks.send_to_xf_falut_handle': {
-        'queue': MY_CELERY_QUEUE,
-        'routing_key': MY_CELERY_ROUTING_KEY
-    },
-
     'tasks.generate_simCharge_excel_report': {
         'queue': REPORT_TASK_QUEUE,
         'routing_key': REPORT_TASK_QUEUE_KEY
@@ -237,11 +226,6 @@ CELERY_ROUTES = {
         'routing_key': POLL_RECHARGE_RECORD_QUEUE_KEY
     },
 
-    'tasks.test_sync': {
-        'queue': POLL_RECHARGE_RECORD_QUEUE,
-        'routing_key': POLL_RECHARGE_RECORD_QUEUE_KEY
-    },
-
     'tasks.manager_export_charge_order_excel_from_db': {
         'queue': REPORT_TASK_QUEUE,
         'routing_key': REPORT_TASK_QUEUE_KEY
@@ -278,64 +262,19 @@ CELERY_ROUTES = {
     },
 }
 
-# TODO zjl如果一个任务没有在visibility_timeout内被确认 则会将此任务分发给另一个worker执行
-# TODO zjl那么当延时任务的countdown时间长度超过此 时间的时候,有可能会造成多个worker共同执行同一个任务
-# TODO zjl使用redis作为消息队列的时候,此值的默认 是 3600s ,目前并没有超过这个时间的延时任务,暂时注释,需要的时候打开
-# BROKER_TRANSPORT_OPTIONS = {
-#     'visibility_timeout': 10
-# }
-
-
 CELERYBEAT_SCHEDULE = {
-    'report_daily_report_to_dealer_via_wechat': {
-        'task': 'tasks.report_daily_report_to_dealer_via_wechat',
-        # : 每日9点运行
-        'schedule': crontab(minute = 0, hour = 9),
-        # 'args': () ,
-        'options': {
-            'queue': MY_CELERY_QUEUE,
-            'routing_key': MY_CELERY_ROUTING_KEY
-        }
-    },
-
-    'weekly_notify_finance_manager': {
-        'task': 'tasks.weekly_notify_finance_manager',
-
-        'schedule': crontab(minute = 0, hour = 8, day_of_week = 'wed,fri'),
-
-        'options': {
-            'queue': MY_CELERY_QUEUE,
-            'routing_key': MY_CELERY_ROUTING_KEY
-        }
-    },
-
     'remove_serviceProgress_periodically': {
         'task': 'tasks.remove_serviceProgress_periodically',
-
-        'schedule': crontab(minute = 0, hour = 2),
-
+        'schedule': crontab(minute=0, hour=2),
         'options': {
             'queue': MY_CELERY_QUEUE,
             'routing_key': MY_CELERY_ROUTING_KEY
         }
     },
 
-    'notify_virtual_card_expired': {
-            'task': 'tasks.notify_virtual_card_expired',
-
-            'schedule': crontab(minute = 0, hour = 15),
-
-            'options': {
-                'queue': MY_CELERY_QUEUE,
-                'routing_key': MY_CELERY_ROUTING_KEY
-            }
-        },
-
     'handle_customer_complaints_yesterday': {
         'task': 'tasks.handle_customer_complaints_yesterday',
-
         'schedule': crontab(minute=0, hour=3),
-
         'options': {
             'queue': MY_CELERY_QUEUE,
             'routing_key': MY_CELERY_ROUTING_KEY
@@ -344,14 +283,11 @@ CELERYBEAT_SCHEDULE = {
 
     'send_SIM_expired_messages_by_sms': {
         'task': 'tasks.send_SIM_expired_messages',
-
-        'schedule': crontab(day_of_month = '1,6,11,19,23,25,26', minute = 0, hour = 10),
-
+        'schedule': crontab(day_of_month='1,6,11,19,23,25,26', minute = 0, hour = 10),
         'options': {
             'queue': MY_CELERY_QUEUE,
             'routing_key': MY_CELERY_ROUTING_KEY
         },
-
         'args': ('sms',)
     },
 
@@ -368,17 +304,6 @@ CELERYBEAT_SCHEDULE = {
         'args': ('wechat',)
     },
 
-    'calc_dealer_stat_and_insert_into_db': {
-        'task': 'tasks.calc_dealer_stat_and_insert_into_db',
-
-        'schedule': crontab(minute = 10, hour = 00),
-
-        'options': {
-            'queue': DEALER_DATA_ANYLYSE_QUEUE,
-            'routing_key': DEALER_DATA_ANYLYSE_QUEUE_KEY
-        }
-    },
-
     'calc_dealer_user_count': {
         'task': 'tasks.calc_dealer_user_count',
 
@@ -423,17 +348,6 @@ CELERYBEAT_SCHEDULE = {
         }
     },
 
-    "send_to_xf_all_dev_info": {
-        "task": "tasks.send_to_xf_all_dev_info",
-
-        "schedule": crontab(minute = 10, hour = 0),
-        "options": {
-            'queue': MY_CELERY_QUEUE,
-            'routing_key': MY_CELERY_ROUTING_KEY
-        }
-    },
-
-    # 租用设备相关的
     # 每日晚上凌晨2点生成订单
     "gen_daily_rent_order": {
         "task": "tasks.gen_daily_rent_order",
@@ -456,17 +370,18 @@ CELERYBEAT_SCHEDULE = {
         }
     },
 
-    # 每天发送同步时间的消息。支持的是云快充协议的设备
-    "sync_device_time_for_tcpcar": {
-        "task": "tasks.sync_device_time_for_tcpcar",
+    # 每日统计
+    "ledger_consume_order_stats": {
+        "task": "tasks.ledger_consume_order_stats",
 
-        "schedule": crontab(minute=0, hour=16),
+        "schedule": crontab(hour=1, minute=0),
         "options": {
             'queue': MY_CELERY_QUEUE,
             'routing_key': MY_CELERY_ROUTING_KEY
         }
     },
-	
+
+
     # 每天凌晨 1 点 对于退款订单进行一次拉取信息
     'pull_refund_order': {
         'task': 'tasks.pull_refund_order',
@@ -478,17 +393,6 @@ CELERYBEAT_SCHEDULE = {
         }
     },
 
-    # 浙江的消防局的消防
-    'report_to_zhejiang_fight': {
-        'task': 'tasks.report_to_zhejiang_fight',
-        'schedule': crontab(minute=0, hour=2),
-
-        'options': {
-            'queue': MY_CELERY_QUEUE,
-            'routing_key': MY_CELERY_ROUTING_KEY
-        }
-    },
-
     # 每天凌晨 2 点 运行一次经销商的自动提现
     'dealer_auto_withdraw': {
         'task': 'tasks.dealer_auto_withdraw',
@@ -511,34 +415,6 @@ CELERYBEAT_SCHEDULE = {
             'routing_key': MY_CELERY_ROUTING_KEY
         }
     },
-                       
-    'query_merchant_status': {
-        'task': 'tasks.query_merchant_status',
-        'schedule': crontab(minute=0, hour=3),
-        'options': {
-            'queue': MY_CELERY_QUEUE,
-            'routing_key': MY_CELERY_ROUTING_KEY
-        }
-    },
-
-    'dealer_auto_charge_sim_card':{
-        'task': 'tasks.dealer_auto_charge_sim_card',
-        'schedule': crontab(day_of_month = '15', minute = 0, hour = 10),
-        'options': {
-            'queue': MY_CELERY_QUEUE,
-            'routing_key': MY_CELERY_ROUTING_KEY
-        }
-    },
-
-    # 处理超过72小时的没有分账的订单
-    'check_not_ledger_recharge_record': {
-        'task': 'tasks.check_not_ledger_recharge_record',
-        'schedule': crontab(minute=0, hour=5),
-        'options': {
-            'queue': MY_CELERY_QUEUE,
-            'routing_key': MY_CELERY_ROUTING_KEY
-        }
-    },
 
     'make_rpt_into_db': {
         'task': 'tasks.make_rpt_into_db',