# -*- coding: utf-8 -*-
# !/usr/bin/env python
from collections import namedtuple
from apilib.monetary import RMB
from apps.web.common.models import WithdrawRecord
TEST_OPEN_ID = ''
recharge_record = namedtuple('recharge_record', ['nickname', 'gateway', 'money'])
def test_add():
from taskmanager.tasks import add
assert add(1, 2) == 3
##################
## To end users ##
##################
def test_report_to_user_via_wechat():
from taskmanager.tasks import report_to_user_via_wechat
################
## To dealers ##
################
def test_report_to_dealer_via_wechat():
from taskmanager.tasks import report_to_dealer_via_wechat
def test_report_feedback_to_dealer_via_wechat():
from taskmanager.tasks import report_feedback_to_dealer_via_wechat
def test_report_daily_report_to_dealer_via_wechat():
from taskmanager.tasks import report_daily_report_to_dealer_via_wechat
def test_report_device_abnormally_offline_to_dealer_via_wechat():
from taskmanager.tasks import report_device_abnormally_offline_to_dealer_via_wechat
def test_send_SIM_expired_messages():
from taskmanager.tasks import send_SIM_expired_messages
##################
### To managers ##
##################
def test_weekly_notify_finance_manager():
from taskmanager.tasks import weekly_notify_finance_manager
def test_whale_withdraw_order_alert():
from taskmanager.tasks import whale_withdraw_order_alert
def test_withdraw_error_alert():
from taskmanager.tasks import withdraw_error_alert
def check_website_is_online():
from taskmanager.tasks import check_website_is_online
def test_generate_simCharge_excel_report():
from taskmanager.tasks import generate_simCharge_excel_report
def test_generate_dealerWithDraw_excel_report():
from taskmanager.tasks import generate_dealerWithDraw_excel_report
def test_generate_manager_map_options():
from taskmanager.tasks import generate_manager_map_options
def test_generate_biz_stats_for_manager():
from taskmanager.tasks import generate_biz_stats_for_manager
######################
## system operation ##
######################
def test_daily_check_auto_withdraw(mocker, dealer, income_type, source_key):
from taskmanager.tasks import daily_check_auto_withdraw
from django.conf import settings
from apps.web.core.payment.wechat import WechatPaymentGateway
settings.SUPPORT_DEALER_AUTO_WITHDRAW = True
from testcase.unit.responses import WITHDRAW_SUCCEEDED
mocker.patch.object(WechatPaymentGateway, 'withdraw_via_changes', return_value=WITHDRAW_SUCCEEDED)
dealer.autoWithdrawAllowable = True
dealer.set_balance(income_type, source_key, RMB(100))
dealer.save()
daily_check_auto_withdraw()
assert dealer.reload().sub_balance(income_type = income_type, source_key = source_key) == RMB(0)
def test_remove_serviceProgress_periodically():
from taskmanager.tasks import remove_serviceProgress_periodically
def test_check_wechat_withdraw_via_bank(mocker):
import xmltodict
from taskmanager.tasks import check_withdraw_via_bank
from apps.web.core.payment.wechat import WechatPaymentGateway
success_response = xmltodict.parse("""
500
0
""")['xml']
fail_resposne_ORDERNOTEXIST = xmltodict.parse("""
""")['xml']
fail_response_FAIL = {
u'amount': u'39164',
u'bank_no_md5': u'0A31660F888670C19FBA203F0C982311',
u'cmms_amt': u'100',
u'create_time': u'2019-04-15 15:33:49',
u'err_code': u'SUCCESS',
u'err_code_des': u'\u67e5\u8be2\u6210\u529f',
u'mch_id': u'1480791292',
u'partner_trade_no': u'TX20190415153349430564wXNtu6qKld',
u'pay_succ_time': u'2019-04-16 02:10:06',
u'payment_no': u'10000287536082019041500000111424399',
u'reason': None,
u'result_code': u'SUCCESS',
u'return_code': u'SUCCESS',
u'return_msg': u'\u67e5\u8be2\u6210\u529f',
u'status': u'FAILED',
u'true_name_md5': u'260C6052708B32F64521D2D0FD280D8E'
}
def test_success():
mocker.patch.object(WechatPaymentGateway, 'get_transfer_result_via_bank', return_value=success_response)
mocker.patch.object(WithdrawRecord, 'get_today_via_bank', )
check_withdraw_via_bank()
## task system
def test_send_topic_command():
from taskmanager.tasks import send_topic_command