123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- # coding=utf-8
- import logging
- import json
- import requests
- from urlparse import urljoin
- from django.conf import settings
- from apilib.exceptions import AliBankCardParamError, AliBankCardSysError
- from apilib.utils_url import add_query
- from apps.web.utils import LimitAttemptsManager
- logger = logging.getLogger(__name__)
- class BankRecognizer(object):
- """
- 查询银行卡联行号相关信息
- helps: https://market.aliyun.com/products/57000002/cmapi017567.html?spm=5176.12901015.0.i12901015.5737525cxikPYJ#sku=yuncode1156700000
- """
- HOST = "https://cnaps.market.alicloudapi.com"
- def __init__(self, visitor, appCode=None, timeout=None):
- self._appCode = appCode or settings.ALI_BANK_CARD_APP_CODE
- self._timeout = timeout or 5
- self._visitor = visitor or "visitor"
- limiter = LimitAttemptsManager(self._visitor, self.__class__.__name__, maxTimes=30 if not settings.DEBUG else 9999)
- if limiter.is_exceeded_limit():
- raise AliBankCardParamError(u"访问频率超限")
- limiter.incr()
- def _request(self, path, callback=None):
- """
- 目前使用的两个接口比较简单, request没有必要实现的太过于复杂 get请求足矣
- """
- url = urljoin(self.HOST, path)
- headers = {
- "Authorization": "APPCODE {}".format(self._appCode)
- }
- try:
- res = requests.get(url ,headers=headers, timeout=self._timeout)
- res.raise_for_status()
- except requests.RequestException as ree:
- logger.warning("[{} _request] send error request error = {}, path={}, appCode = {}******".format(
- self.__class__.__name__, ree, path, self._appCode[:-6]
- ))
- raise AliBankCardParamError(ree.message)
- return callback(res) if callback else res
- @staticmethod
- def _handle_response(res):
- """
- 处理查询支行联行号的回调函数
- """
- try:
- result = json.loads(res.content.decode('utf-8', 'ignore'), strict=False)
- except (TypeError, ValueError):
- logger.debug('[_handle_sub_code] Can not decode response as JSON', exc_info=True)
- raise AliBankCardSysError(u"请求错误(10000)")
- if "resp" not in result:
- raise AliBankCardSysError(u"请求错误(10001)")
- if "RespCode" not in result["resp"]:
- raise AliBankCardSysError(u"请求错误(10002)")
- respCode = result["resp"]["RespCode"]
- if respCode != "200":
- raise AliBankCardParamError(result["resp"]["RespMsg"])
- return result
- def query_sub_code(self, bank=None, card=None, province=None, city=None, queryKey=None, page=None):
- """
- 多条件查询银行支行信息
- :param bank:
- :param card:
- :param province:
- :param city:
- :param queryKey:
- :param page:
- """
- path = "/lianzhuo/searchcnaps"
- params = {
- "card": card or "", "bank": str(bank or ""),
- "province": str(province or ""), "city": str(city or ""), "key": str(queryKey or ""), "page": page or 1
- }
- path = add_query(path, params)
- return self._request(path, self._handle_response)
- def query_bank(self, card):
- """
- 根据银行卡查询总行的信息
- """
- path = "/lundroid/querybankno"
- params = {
- "bankno": card
- }
- return self._request(add_query(path, params), self._handle_response)
- def query_all_bank(self, bank):
- """
- 查询支持的银行总行
- 这个接口不是很好用 建议不要使用
- bank为关键字
- """
- path = "/lundroid/querybank"
- params = {
- "bank": str(bank)
- }
- return self._request(add_query(path, params), self._handle_response)
- def query_sub_bank(self, subCode):
- """
- 根据银行的支行联行号 查询银行
- """
- path = "/lundroid/querybankcode"
- params = {
- "bankcode": subCode
- }
- return self._request(add_query(path, params), self._handle_response)
|