123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- """
- 用户需要测试的场景
- * 扫入,正常看到所有个人信息
- * 充值
- * 启动设备
- * 直接付钱 启动设备
- """
- import json
- from urlparse import urlparse, parse_qs, ParseResult
- import pytest
- from apilib.utils_json import JsonResponse, json_dumps
- from apilib.monetary import VirtualCoin
- from apps.web.core.adapter.commonPulse import CommonPulseAdapter
- from apps.web.user.views import cn
- from apps.web.utils import ErrorResponseRedirect
- from apps.web.user import UserAuthState
- from apps.web.core.utils import JsonErrorResponse
- from common import url_fn, MY_USER_FIXTURE, json_is_the_same, is_concurrently_safe
- def user_login_url_lack_parameter():
- return ErrorResponseRedirect(error = u'错误的二维码,可能是二维码损坏,建议直接投币,或者联系工作人员进行维修')
- #: ignore hosts, we only care about endpoints
- def url_eq(a, b):
- assert isinstance(a, str) and isinstance(b, str), 'a and b has to be Url(str)'
- parsed_a, parsed_b = urlparse(a), urlparse(b)
- ignore_keys = ['t']
- def qs(pr):
- # type: (ParseResult)->dict
- return {k: v for k, v in parse_qs(pr.query).iteritems() if k not in ignore_keys}
- return parsed_a.path == parsed_b.path and qs(parsed_a) == qs(parsed_b)
- ################
- ## User Model
- ################
- def test_user_get_or_create_via_wechat():
- from apps.web.user.models import MyUser
- payload = {}
- assert MyUser.get_or_create_via_wechat(**payload) is None
- ################
- ## Auth related
- ################
- _foreign_gateway_uri_map = {
- 'wechat': "https://open.weixin.qq.com/connect/oauth2/authorize",
- 'alipay': "https://openauth.alipay.com/oauth2/publicAppAuthorize.htm"
- }
- @pytest.mark.parametrize("client_name", ['wechat', 'alipay'])
- def test_userLogin(wechat_user_client, alipay_user_client, device, dealer, agent, client_name):
- _client_map = {'wechat': wechat_user_client, 'alipay': alipay_user_client}
- from apps.web.user.views import userLogin
- userLogin_url = url_fn(userLogin)
- client = _client_map[client_name]
- get = client.get
- #: lack params user login
- assert url_eq(get(userLogin_url()).url, user_login_url_lack_parameter().url)
- #: normal situation
- response = get(userLogin_url(l = device.logicalCode))
- assert _foreign_gateway_uri_map[client_name] in response.url
- def test_baseAccess(wechat_client, device, mocker, user, agent, manager, wechat_managerial_app,
- default_wechat_payment_app):
- from apps.web.user.views import wechatAuthUser
- url = url_fn(wechatAuthUser)
- #: 如果不带参数,必定报错
- assert assert_redirected_to_error_page(wechat_client.get(url).url)
- state_by_dev = UserAuthState.by_dev(devNo = device['devNo'])
- from apps.web.core.auth.wechat import WechatAuthBridge
- mocker.patch.object(WechatAuthBridge, 'authorize', return_value = user.openId)
- response = wechat_client.get(url(payload = WechatAuthBridge.encode_state(state_by_dev.to_json()), code = 'CODE'))
- assert user.update(avatar = '', nickname = '')
- assert _foreign_gateway_uri_map['wechat'] in response.url
- assert user.update(avatar = 'something', nickname = 'someone')
- response = wechat_client.get(url(payload = WechatAuthBridge.encode_state(state_by_dev.to_json()), code = 'CODE'))
- assert _foreign_gateway_uri_map['wechat'] in response.url
- assert user.get_collection().update_one(
- {'_id': user.id},
- {'$set':
- dict(avatar = 'something',
- nickname = 'someone',
- managerialAppId = wechat_managerial_app.appid,
- managerialOpenId = 'test_managerial_openId')
- })
- response = wechat_client.get(url(payload = WechatAuthBridge.encode_state(state_by_dev.to_json()), code = 'CODE'))
- assert _foreign_gateway_uri_map['wechat'] in response.url
- assert user.get_collection().update_one(
- {'_id': user.id},
- {'$set':
- dict(avatar = 'something',
- nickname = 'someone',
- managerialAppId = default_wechat_payment_app.appid,
- managerialOpenId = 'test_managerial_openId')
- })
- response = wechat_client.get(url(payload = WechatAuthBridge.encode_state(state_by_dev.to_json()), code = 'CODE'))
- assert 'user/index.html' in response.url
- def test_managerAccess(wechat_client, device, mocker, user, agent, manager):
- from apps.web.user.views import wechatManagerAuthUser
- url = url_fn(wechatManagerAuthUser)
- #: 如果不带参数,必定报错
- assert assert_redirected_to_error_page(wechat_client.get(url).url)
- def test_wxpayBaseAccess(wechat_client, device, mocker, user, agent, manager):
- from apps.web.user.views import wxpayBaseAccess
- url = url_fn(wxpayBaseAccess)
- #: 如果不带参数,必定报错
- assert assert_redirected_to_error_page(wechat_client.get(url).url)
- #################
- ### wxpay gateway
- #################
- def test_wxpayGateway_recharge(wechat_user_client, device, sole_group, mocker, agent, default_wechat_payment_app):
- from apps.web.user.views import wxpayGateway
- wxpayGateway_url = url_fn(wxpayGateway)
- post = wechat_user_client.post_json
- cached_group = sole_group.cached
- rule_dict = cached_group['ruleDict']
- payload = {
- 'attachParas': {},
- 'ruleId': 1,
- 'type': '',
- 'forced': 1
- }
- response = post(wxpayGateway_url(), payload)
- assert json_is_the_same(response.content,
- JsonErrorResponse(description = u'该设备未开通网络支付,请投币或者联系客服(1008)').content)
- payload = {
- 'attachParas': {},
- 'ruleId': rule_dict.keys()[0],
- 'type': '',
- 'forced': 1,
- 'devNo': device.devNo
- }
- #: device is default set to offline
- assert json.loads(post(wxpayGateway_url(), payload).content)['result'] == 103
- invalid_rule_id = next(str(_) for _ in xrange(1000) if _ not in map(int, rule_dict.keys()))
- payload = {
- 'attachParas': {},
- 'ruleId': invalid_rule_id,
- 'type': '',
- 'forced': 1,
- 'devNo': device.devNo
- }
- from apps.web.core.payment.wechat import WechatPaymentGateway
- mocker.patch.object(CommonPulseAdapter, 'check_dev_status', return_value = None)
- assert post(wxpayGateway_url(), payload).content == json_dumps({'result': 0, 'description': u'组内未找到所提供ruleId'})
- payload = {
- 'attachParas': {},
- 'ruleId': rule_dict.keys()[0],
- 'type': '',
- 'forced': 1,
- 'devNo': device.devNo
- }
- mocker.patch.object(WechatPaymentGateway, 'generate_js_payment_signature', return_value = {})
- assert json.loads(post(wxpayGateway_url(), payload).content) == {'result': 1, 'description': 'SUCCESS',
- 'payload': {}}
- def test_wxpayGateway_chargeCardWithCardNo(wechat_user_client, device, sole_group, mocker):
- pass
- def test_wxpayGateway_chargeCardWithDevNo(wechat_user_client, device, sole_group, mocker):
- pass
- ### wechatPayNotify
- ### dev side info
- def test_equipmentPara(wechat_user_client, device):
- from apps.web.user.views import equipmentPara
- url = url_fn(equipmentPara)
- assert wechat_user_client.get(url).json()['result'] == 0
- assert wechat_user_client.get(url(devNo = device.devNo)).json()['result'] == 1
- def test_userBalance(wechat_user_client):
- from apps.web.user.views import userBalance
- userBalance_url = url_fn(userBalance)
- response = wechat_user_client.get(userBalance_url()) # type: JsonResponse
- assert response.status_code == 200
- assert json_is_the_same(
- response.content,
- json_dumps({
- 'result': 1,
- 'description': '',
- 'payload':
- {
- 'balance': MY_USER_FIXTURE['balance'],
- 'overallBalance': MY_USER_FIXTURE['balance'],
- 'currencyCoins': MY_USER_FIXTURE['balance']
- }
- })
- )
- ### Ad related
- def test_adAccess(client, wechat_user_client, device):
- from apps.web.user.views import adAccess
- url = url_fn(adAccess)
- assert client.get(url()).status_code == 302
- wechat_user_client.cookies['devNo'] = device.devNo
- response = wechat_user_client.get(url())
- assert response.status_code == 302
- assert url_eq(response.url, ErrorResponseRedirect(error = cn(u'找不到广告ID')).url)
- ### Promotion
- ### 主要测试并发场景
- def test_getPromotionalCoins(wechat_user_client, onsale, device, user):
- from apps.web.user.views import getPromotionalCoins
- url = url_fn(getPromotionalCoins)
- balance = user.balance
- assert is_concurrently_safe(lambda:
- wechat_user_client.get(
- url(logicalCode = device.logicalCode, onsaleId = str(onsale.id))).json())
- assert (user.reload().balance - balance) == VirtualCoin(onsale.detailDict.get('coins', 0))
- def test_getPromotionalDuration(wechat_user_client, onsale, device, mocker):
- from apps.web.user.views import getPromotionalDuration
- from apps.web.core.networking import MessageSender
- url = url_fn(getPromotionalDuration)
- mocker.patch.object(MessageSender, 'send', return_value = {'rst': 0})
- assert is_concurrently_safe(
- lambda: wechat_user_client.get(url(logicalCode = device.logicalCode, onsaleId = str(onsale.id))).json())
- def test_submitFeedback(wechat_user_client, device):
- import random
- from apps.web.user.views import submitFeedback
- from apps.web.constant import FEEDBACK_TYPE
- payload = {
- 'phone': '13112233122',
- 'logicalCode': device.logicalCode,
- 'coins': 1,
- 'feedType': random.choice(FEEDBACK_TYPE.choices()),
- 'imgList': [],
- 'description': 'test',
- }
- url = url_fn(submitFeedback)
- response = wechat_user_client.post_json(url, payload)
- assert response.json()['result'] == 1
- assert is_concurrently_safe(lambda: wechat_user_client.post_json(url, payload).json())
- ########################
- # Testing about promo #
- ########################
|