123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- from collections import namedtuple
- from apilib.monetary import RMB
- from apps.web.common.models import WithdrawRecord
- TEST_OPEN_ID = ''
- recharge_record = namedtuple('recharge_record', ['nickname', 'gateway', 'money'])
- def test_add():
- from taskmanager.tasks import add
- assert add(1, 2) == 3
- ##################
- ## To end users ##
- ##################
- def test_report_to_user_via_wechat():
- from taskmanager.tasks import report_to_user_via_wechat
- ################
- ## To dealers ##
- ################
- def test_report_to_dealer_via_wechat():
- from taskmanager.tasks import report_to_dealer_via_wechat
- def test_report_feedback_to_dealer_via_wechat():
- from taskmanager.tasks import report_feedback_to_dealer_via_wechat
- def test_report_daily_report_to_dealer_via_wechat():
- from taskmanager.tasks import report_daily_report_to_dealer_via_wechat
- def test_report_device_abnormally_offline_to_dealer_via_wechat():
- from taskmanager.tasks import report_device_abnormally_offline_to_dealer_via_wechat
- def test_send_SIM_expired_messages():
- from taskmanager.tasks import send_SIM_expired_messages
- ##################
- ### To managers ##
- ##################
- def test_weekly_notify_finance_manager():
- from taskmanager.tasks import weekly_notify_finance_manager
- def test_whale_withdraw_order_alert():
- from taskmanager.tasks import whale_withdraw_order_alert
- def test_withdraw_error_alert():
- from taskmanager.tasks import withdraw_error_alert
- def check_website_is_online():
- from taskmanager.tasks import check_website_is_online
- def test_generate_simCharge_excel_report():
- from taskmanager.tasks import generate_simCharge_excel_report
- def test_generate_dealerWithDraw_excel_report():
- from taskmanager.tasks import generate_dealerWithDraw_excel_report
- def test_generate_manager_map_options():
- from taskmanager.tasks import generate_manager_map_options
- def test_generate_biz_stats_for_manager():
- from taskmanager.tasks import generate_biz_stats_for_manager
- ######################
- ## system operation ##
- ######################
- def test_daily_check_auto_withdraw(mocker, dealer, income_type, source_key):
- from taskmanager.tasks import daily_check_auto_withdraw
- from django.conf import settings
- from apps.web.core.payment.wechat import WechatPaymentGateway
- settings.SUPPORT_DEALER_AUTO_WITHDRAW = True
- from testcase.unit.responses import WITHDRAW_SUCCEEDED
- mocker.patch.object(WechatPaymentGateway, 'withdraw_via_changes', return_value=WITHDRAW_SUCCEEDED)
- dealer.autoWithdrawAllowable = True
- dealer.set_balance(income_type, source_key, RMB(100))
- dealer.save()
- daily_check_auto_withdraw()
- assert dealer.reload().sub_balance(income_type = income_type, source_key = source_key) == RMB(0)
- def test_remove_serviceProgress_periodically():
- from taskmanager.tasks import remove_serviceProgress_periodically
- def test_check_wechat_withdraw_via_bank(mocker):
- import xmltodict
- from taskmanager.tasks import check_withdraw_via_bank
- from apps.web.core.payment.wechat import WechatPaymentGateway
- success_response = xmltodict.parse("""<xml>
- <return_code><![CDATA[SUCCESS]]></return_code>
- <return_msg><![CDATA[ok]]></return_msg>
- <result_code><![CDATA[SUCCESS]]></result_code>
- <err_code><![CDATA[SUCCESS]]></err_code>
- <err_code_des><![CDATA[ok]]></err_code_des>
- <mch_id><![CDATA[2302758702]]></mch_id>
- <partner_trade_no><![CDATA[1212121221278]]></partner_trade_no>
- <payment_no><![CDATA[10000600500852017030900000020006012]]></payment_no>
- <bank_no_md5><![CDATA[2260AB5EF3D290E28EFD3F74FF7A29A0]]></bank_no_md5>
- <true_name_md5><![CDATA[7F25B325D37790764ABA55DAD8D09B76]]></true_name_md5>
- <amount>500</amount>
- <status><![CDATA[处理中]]></status>
- <cmms_amt>0</cmms_amt>
- <create_time><![CDATA[2017-03-09 15:04:04]]></create_time>
- <reason><![CDATA[]]></reason>
- </xml>""")['xml']
- fail_resposne_ORDERNOTEXIST = xmltodict.parse("""<xml>
- <return_code><![CDATA[SUCCESS]]></return_code>
- <return_msg><![CDATA[订单不存在,请核实后再查]]></return_msg>
- <result_code><![CDATA[FAIL]]></result_code>
- <err_code><![CDATA[ORDERNOTEXIST]]></err_code>
- <err_code_des><![CDATA[订单不存在,请核实后再查]]></err_code_des>
- </xml>""")['xml']
- fail_response_FAIL = {
- u'amount': u'39164',
- u'bank_no_md5': u'0A31660F888670C19FBA203F0C982311',
- u'cmms_amt': u'100',
- u'create_time': u'2019-04-15 15:33:49',
- u'err_code': u'SUCCESS',
- u'err_code_des': u'\u67e5\u8be2\u6210\u529f',
- u'mch_id': u'1480791292',
- u'partner_trade_no': u'TX20190415153349430564wXNtu6qKld',
- u'pay_succ_time': u'2019-04-16 02:10:06',
- u'payment_no': u'10000287536082019041500000111424399',
- u'reason': None,
- u'result_code': u'SUCCESS',
- u'return_code': u'SUCCESS',
- u'return_msg': u'\u67e5\u8be2\u6210\u529f',
- u'status': u'FAILED',
- u'true_name_md5': u'260C6052708B32F64521D2D0FD280D8E'
- }
- def test_success():
- mocker.patch.object(WechatPaymentGateway, 'get_transfer_result_via_bank', return_value=success_response)
- mocker.patch.object(WithdrawRecord, 'get_today_via_bank', )
- check_withdraw_via_bank()
- ## task system
- def test_send_topic_command():
- from taskmanager.tasks import send_topic_command
|