123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- import os
- import logging
- from django.http import HttpResponseRedirect
- from django.conf import settings
- from mongoengine.errors import DoesNotExist
- from apilib.utils_string import cn
- from apps.web.user import UserAuthState
- from apps.web.user.auth import response_with_login
- from apps.web.user.views import ErrorResponseRedirect
- from apps.web.user.models import MyUser
- from apps.web.device.models import Device, Group
- from apps.web.dealer.models import Dealer
- from apps.web.utils import detect_alipay_client, detect_wechat_client
- from apps.web.ad.utils import user_has_available_ads
- from apps.web.agent.models import Agent
- from .utils import not_allowed_in_prod
- logger = logging.getLogger(__name__)
- logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.DEBUG)
- class CreateUserError(Exception): pass
- def _get_test_user(groupId, gateway):
- # type: (str, str)->MyUser
- group = Group.get_group(groupId)
- dealer = Dealer.objects(id=group['ownerId']).get()
- query = {
- 'openId': os.environ['TEST_OPEN_ID'],
- 'authAppId': os.environ['TEST_AUTH_APP_ID'],
- 'groupId': groupId,
- 'gateway': gateway,
- 'agentId': dealer.agentId
- }
- try:
- return MyUser.objects(openId=query['openId'], groupId=query['groupId']).get()
- except DoesNotExist:
- user = MyUser(**query).save()
- return user
- def _get_gateway(request):
- if detect_wechat_client(request): return 'wechat'
- elif detect_alipay_client(request): return 'alipay'
- else: return ''
- @not_allowed_in_prod
- def _userLogin(request):
- """
- Fake userLogin
- :param request:
- :return:
- """
- def userLoginByAgentId(request, agentId, href):
- # state = UserAuthState(by = UserAuthState.BY.AGENT, **{
- # 'agentId': agentId,
- # 'href': href
- # })
- openId = os.environ['TEST_OPEN_ID']
- user = MyUser.objects(openId=openId).get()
- user.update(agentId=agentId)
- response = HttpResponseRedirect(href)
- return response_with_login(request, user, response)
- def userLoginByDev(request, dev, port = None):
- logger.debug('fake user login. device = (logicalCode=%s), port = %s' % (logicalCode, port))
- user = _get_test_user(groupId=dev['groupId'], gateway=_get_gateway(request)) # type: MyUser
- dealer = Dealer.objects(id=dev['ownerId']).get() # type: Dealer
- setattr(user, 'agentId', dealer.agentId)
- if user_has_available_ads(device=dev, user=user):
- url = '/pages/index.html?devNo={devNo}'
- else:
- url = '/user/index.html#/dev?logicalCode={logicalCode}'
- response = HttpResponseRedirect(url.format(logicalCode=dev['logicalCode']))
- return response_with_login(request, user, response)
- # 扫描登录
- devNo = request.GET.get('devNo', None)
- logicalCode = request.GET.get('l', None)
- port = request.GET.get('chargeIndex', None)
- # 个人中心登录
- agentId = request.GET.get('agentId', None)
- href = request.GET.get('redirect', '/user/index.html#/user/me')
- if not any([devNo, logicalCode, agentId]):
- return ErrorResponseRedirect(error = cn(u'错误的二维码,可能是二维码损坏,建议直接投币,或者联系工作人员进行维修'))
- if devNo or logicalCode:
- if not devNo:
- dev = Device.get_dev_by_l(logicalCode=logicalCode)
- else:
- dev = Device.get_dev(devNo = devNo)
- if not dev: return ErrorResponseRedirect(error=cn(u'设备不存在'))
- return userLoginByDev(request, dev, port)
- else:
- return userLoginByAgentId(request, agentId, href)
- fake_view_mapping = {
- 'userLogin': _userLogin
- }
- __all__ = ['fake_view_mapping']
|