# -*- coding: utf-8 -*- # !/usr/bin/env python from collections import namedtuple from apilib.monetary import RMB from apps.web.common.models import WithdrawRecord TEST_OPEN_ID = '' recharge_record = namedtuple('recharge_record', ['nickname', 'gateway', 'money']) def test_add(): from taskmanager.tasks import add assert add(1, 2) == 3 ################## ## To end users ## ################## def test_report_to_user_via_wechat(): from taskmanager.tasks import report_to_user_via_wechat ################ ## To dealers ## ################ def test_report_to_dealer_via_wechat(): from taskmanager.tasks import report_to_dealer_via_wechat def test_report_feedback_to_dealer_via_wechat(): from taskmanager.tasks import report_feedback_to_dealer_via_wechat def test_report_daily_report_to_dealer_via_wechat(): from taskmanager.tasks import report_daily_report_to_dealer_via_wechat def test_report_device_abnormally_offline_to_dealer_via_wechat(): from taskmanager.tasks import report_device_abnormally_offline_to_dealer_via_wechat def test_send_SIM_expired_messages(): from taskmanager.tasks import send_SIM_expired_messages ################## ### To managers ## ################## def test_weekly_notify_finance_manager(): from taskmanager.tasks import weekly_notify_finance_manager def test_whale_withdraw_order_alert(): from taskmanager.tasks import whale_withdraw_order_alert def test_withdraw_error_alert(): from taskmanager.tasks import withdraw_error_alert def check_website_is_online(): from taskmanager.tasks import check_website_is_online def test_generate_simCharge_excel_report(): from taskmanager.tasks import generate_simCharge_excel_report def test_generate_dealerWithDraw_excel_report(): from taskmanager.tasks import generate_dealerWithDraw_excel_report def test_generate_manager_map_options(): from taskmanager.tasks import generate_manager_map_options def test_generate_biz_stats_for_manager(): from taskmanager.tasks import generate_biz_stats_for_manager ###################### ## system operation ## ###################### def test_daily_check_auto_withdraw(mocker, dealer, income_type, source_key): from taskmanager.tasks import daily_check_auto_withdraw from django.conf import settings from apps.web.core.payment.wechat import WechatPaymentGateway settings.SUPPORT_DEALER_AUTO_WITHDRAW = True from testcase.unit.responses import WITHDRAW_SUCCEEDED mocker.patch.object(WechatPaymentGateway, 'withdraw_via_changes', return_value=WITHDRAW_SUCCEEDED) dealer.autoWithdrawAllowable = True dealer.set_balance(income_type, source_key, RMB(100)) dealer.save() daily_check_auto_withdraw() assert dealer.reload().sub_balance(income_type = income_type, source_key = source_key) == RMB(0) def test_remove_serviceProgress_periodically(): from taskmanager.tasks import remove_serviceProgress_periodically def test_check_wechat_withdraw_via_bank(mocker): import xmltodict from taskmanager.tasks import check_withdraw_via_bank from apps.web.core.payment.wechat import WechatPaymentGateway success_response = xmltodict.parse(""" 500 0 """)['xml'] fail_resposne_ORDERNOTEXIST = xmltodict.parse(""" """)['xml'] fail_response_FAIL = { u'amount': u'39164', u'bank_no_md5': u'0A31660F888670C19FBA203F0C982311', u'cmms_amt': u'100', u'create_time': u'2019-04-15 15:33:49', u'err_code': u'SUCCESS', u'err_code_des': u'\u67e5\u8be2\u6210\u529f', u'mch_id': u'1480791292', u'partner_trade_no': u'TX20190415153349430564wXNtu6qKld', u'pay_succ_time': u'2019-04-16 02:10:06', u'payment_no': u'10000287536082019041500000111424399', u'reason': None, u'result_code': u'SUCCESS', u'return_code': u'SUCCESS', u'return_msg': u'\u67e5\u8be2\u6210\u529f', u'status': u'FAILED', u'true_name_md5': u'260C6052708B32F64521D2D0FD280D8E' } def test_success(): mocker.patch.object(WechatPaymentGateway, 'get_transfer_result_via_bank', return_value=success_response) mocker.patch.object(WithdrawRecord, 'get_today_via_bank', ) check_withdraw_via_bank() ## task system def test_send_topic_command(): from taskmanager.tasks import send_topic_command