# coding=utf-8 import datetime import os from collections import defaultdict from pprint import pprint from bson import ObjectId from base import init_env os.environ["DJANGO_SETTINGS_MODULE"] = "configs.dev_zjl" init_env(False) from apps.web.user.models import ConsumeRecord, PaymentInfo, RechargeRecord from apps.web.user.utils2 import UnifiedConsumeOrderManager, generate_net_payment, generate_refund from apps.web.user.utils import get_consume_order from apps.web.user.validator2 import unifiedConsumeOrderSchema, RechargeOrderSchema from apilib.monetary import RMB from apps.web.core.models import LedgerConsumeApp def test_package(): info = { "devInfo": { "logicalCode": "G548284", }, "userInfo": { "openId": "o-VzzwM-sFVkcNiq3Yf3nsi5Nbow", "groupId": "62188d6fbdc7de20129441eb" }, "startInfo": { "port": 1, "packageId": "1", "startType": 1, "isTemporary": False, "consumeValue": "100" } } data = unifiedConsumeOrderSchema(info) with UnifiedConsumeOrderManager(**data) as manager: order = manager.buildOrder() pprint(order.package) def default(): return { "price": 0, "price2": 0 } def test_default_dict(): from apps.web.user.models import MyUser a = defaultdict(lambda: { MyUser.objects.first().chargeBalance.name: "", "price": 0, "price2": 0 }) a[1]["price"] += 10 pprint(a[1]) def test_payment(): paymentInfo = { "deduct_list": [] } from apps.web.user.models import PaymentInfo p = PaymentInfo(paymentInfo) p.payTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") from apps.web.user.models import ConsumeRecord order = ConsumeRecord.objects.first() order.update_payment(p) def test_upset(): from apps.web.dealer.proxy import DealerGroupStats from apilib.monetary import RMB result = DealerGroupStats.objects.filter( date="2022-12-08", groupId="123456789" ).update_one( upsert=True, inc__orderCount=1, inc__elecCount=10, inc__amount=RMB(10), inc__duration=1800 ) print(result) def test_recharge_order(): payload = { "terminalInfo": { "logicalCode": "", }, "userInfo": { "openId": "oFx-Z5UgniODNNmownuHv69FjSeA", "groupId": "123456789" }, "startInfo": { "category": "recharge/quickpay", "client": "", "orderNo": "", "ruleId": "" } } data = RechargeOrderSchema(payload) pprint(data) def test_order_pay(): orderNo = "1670496397941948" order = get_consume_order(orderNo) # type: ConsumeRecord pprint(order.package.dumpDict) def test_payment(): p = PaymentInfo() print(p) if not p: print(1111) def test_s(): from apps.web.core.exceptions import ServiceException try: raise ServiceException({"result": 2, "description": 2}) except ServiceException as se: print(se.result["description"]) def test_post_pay(): order = RechargeRecord.objects.get(id="63957920211b612ccebfad68") from apps.web.user.transaction import recharge_start_device recharge_start_device(order.user, order) def refund_order(): order = RechargeRecord.objects.get(id="63958524da9594c65b8678d7") from apps.web.user.transaction_deprecated import refund_cash refund_cash(order, order.money, deductCoins=RMB(1)) def rep_mer(username): from apps.web.dealer.models import Dealer from apps.web.merchant.models import MerchantSourceInfo from apps.web.merchant.utils import MerchantApplyProxy d = Dealer.objects.get(username=username) mer = MerchantSourceInfo.objects.get(ownerId=str(d.id)) # type: MerchantSourceInfo assert mer.status == 6 w = mer.wxSource w.applymentId = "" w.save() mer.status = 3 mer.save() p = MerchantApplyProxy(mer) p.submit_auth() mer.reload() if mer.status == 5: mer.status = 6 mer.save() def test_ledger_consume(): from apps.web.dealer.proxy import DealerGroupStats stats = DealerGroupStats.objects.get(id="6397f6f769f046a084b34dc1") from apps.web.report.ledger import LedgerConsumeOrder LedgerConsumeOrder(stats).execute() def test_namedtuple(): from apps.web.dealer.utils2 import Stats print(Stats( amount=0, bestowAmount=0, profit=0, elecCount=0, elecFee=0, duration=0, orderCount=0 ).as_dict()) def test_day(): start = "" end = "2022-12-14" if not start: start = datetime.date.today() else: start = datetime.datetime.strptime(start, "%Y-%m-%d").date() if not end: end = datetime.date.today() else: end = datetime.datetime.strptime(end, "%Y-%m-%d").date() dateInterval = (end - start).days + 1 if dateInterval > 7: raise ValueError("gt 7") if start > end: raise ValueError("re") for _day in range(dateInterval): print(start.strftime("%Y-%m-%d")) start += datetime.timedelta(days=1) if __name__ == '__main__': test_day()