core.py 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709
  1. # #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # <HTTPretty - HTTP client mock for Python>
  4. # Copyright (C) <2011-2018> Gabriel Falcao <gabriel@nacaolivre.org>
  5. #
  6. # Permission is hereby granted, free of charge, to any person
  7. # obtaining a copy of this software and associated documentation
  8. # files (the "Software"), to deal in the Software without
  9. # restriction, including without limitation the rights to use,
  10. # copy, modify, merge, publish, distribute, sublicense, and/or sell
  11. # copies of the Software, and to permit persons to whom the
  12. # Software is furnished to do so, subject to the following
  13. # conditions:
  14. #
  15. # The above copyright notice and this permission notice shall be
  16. # included in all copies or substantial portions of the Software.
  17. #
  18. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  19. # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
  20. # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  21. # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  22. # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
  23. # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  24. # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
  25. # OTHER DEALINGS IN THE SOFTWARE.
  26. from __future__ import unicode_literals
  27. import codecs
  28. import contextlib
  29. import functools
  30. import hashlib
  31. import inspect
  32. import itertools
  33. import json
  34. import re
  35. import socket
  36. import tempfile
  37. import threading
  38. import traceback
  39. import warnings
  40. from functools import partial
  41. from .compat import (
  42. PY3,
  43. StringIO,
  44. text_type,
  45. binary_type,
  46. BaseClass,
  47. BaseHTTPRequestHandler,
  48. quote,
  49. quote_plus,
  50. urlencode,
  51. encode_obj,
  52. urlunsplit,
  53. urlsplit,
  54. parse_qs,
  55. unquote_utf8,
  56. ClassTypes,
  57. basestring
  58. )
  59. from .http import (
  60. STATUSES,
  61. HttpBaseClass,
  62. parse_requestline,
  63. last_requestline,
  64. )
  65. from .utils import (
  66. utf8,
  67. decode_utf8,
  68. )
  69. from .errors import HTTPrettyError, UnmockedError
  70. from datetime import datetime
  71. from datetime import timedelta
  72. from errno import EAGAIN
  73. old_socket = socket.socket
  74. old_SocketType = socket.SocketType
  75. old_create_connection = socket.create_connection
  76. old_gethostbyname = socket.gethostbyname
  77. old_gethostname = socket.gethostname
  78. old_getaddrinfo = socket.getaddrinfo
  79. old_socksocket = None
  80. old_ssl_wrap_socket = None
  81. old_sslwrap_simple = None
  82. old_sslsocket = None
  83. old_sslcontext_wrap_socket = None
  84. MULTILINE_ANY_REGEX = re.compile(r'.*', re.M)
  85. hostname_re = re.compile(r'\^?(?:https?://)?[^:/]*[:/]?')
  86. try: # pragma: no cover
  87. import socks
  88. old_socksocket = socks.socksocket
  89. except ImportError:
  90. socks = None
  91. try: # pragma: no cover
  92. import ssl
  93. old_ssl_wrap_socket = ssl.wrap_socket
  94. try:
  95. old_sslcontext_wrap_socket = ssl.SSLContext.wrap_socket
  96. except AttributeError:
  97. pass
  98. if not PY3:
  99. old_sslwrap_simple = ssl.sslwrap_simple
  100. old_sslsocket = ssl.SSLSocket
  101. except ImportError: # pragma: no cover
  102. ssl = None
  103. try:
  104. import requests.packages.urllib3.connection as requests_urllib3_connection
  105. old_requests_ssl_wrap_socket = requests_urllib3_connection.ssl_wrap_socket
  106. except ImportError:
  107. requests_urllib3_connection = None
  108. DEFAULT_HTTP_PORTS = frozenset([80])
  109. POTENTIAL_HTTP_PORTS = set(DEFAULT_HTTP_PORTS)
  110. DEFAULT_HTTPS_PORTS = frozenset([443])
  111. POTENTIAL_HTTPS_PORTS = set(DEFAULT_HTTPS_PORTS)
  112. def FALLBACK_FUNCTION(x):
  113. return x
  114. class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
  115. r"""
  116. Represents a HTTP request. It takes a valid multi-line,
  117. ``\r\n`` separated string with HTTP headers and parse them out using
  118. the internal `parse_request` method.
  119. It also replaces the `rfile` and `wfile` attributes with StringIO
  120. instances so that we guarantee that it won't make any I/O, neighter
  121. for writing nor reading.
  122. It has some convenience attributes:
  123. ``headers`` -> a mimetype object that can be cast into a dictionary,
  124. contains all the request headers
  125. ``method`` -> the HTTP method used in this request
  126. ``querystring`` -> a dictionary containing lists with the
  127. attributes. Please notice that if you need a single value from a
  128. query string you will need to get it manually like:
  129. ``body`` -> the request body as a string
  130. ``parsed_body`` -> the request body parsed by ``parse_request_body``
  131. .. testcode::
  132. >>> request.querystring
  133. {'name': ['Gabriel Falcao']}
  134. >>> print request.querystring['name'][0]
  135. """
  136. def __init__(self, headers, body=''):
  137. # first of all, lets make sure that if headers or body are
  138. # unicode strings, it must be converted into a utf-8 encoded
  139. # byte string
  140. self.raw_headers = utf8(headers.strip())
  141. self._body = utf8(body)
  142. # Now let's concatenate the headers with the body, and create
  143. # `rfile` based on it
  144. self.rfile = StringIO(b'\r\n\r\n'.join([self.raw_headers, self.body]))
  145. # Creating `wfile` as an empty StringIO, just to avoid any
  146. # real I/O calls
  147. self.wfile = StringIO()
  148. # parsing the request line preemptively
  149. self.raw_requestline = self.rfile.readline()
  150. # initiating the error attributes with None
  151. self.error_code = None
  152. self.error_message = None
  153. # Parse the request based on the attributes above
  154. if not self.parse_request():
  155. return
  156. # making the HTTP method string available as the command
  157. self.method = self.command
  158. # Now 2 convenient attributes for the HTTPretty API:
  159. # `querystring` holds a dictionary with the parsed query string
  160. try:
  161. self.path = self.path.encode('iso-8859-1')
  162. except UnicodeDecodeError:
  163. pass
  164. self.path = decode_utf8(self.path)
  165. qstring = self.path.split("?", 1)[-1]
  166. self.querystring = self.parse_querystring(qstring)
  167. # And the body will be attempted to be parsed as
  168. # `application/json` or `application/x-www-form-urlencoded`
  169. """a dictionary containing parsed request body or None if
  170. HTTPrettyRequest doesn't know how to parse it. It currently
  171. supports parsing body data that was sent under the
  172. ``content`-type` headers values: ``application/json`` or
  173. ``application/x-www-form-urlencoded``
  174. """
  175. self.parsed_body = self.parse_request_body(self._body)
  176. @property
  177. def body(self):
  178. return self._body
  179. @body.setter
  180. def body(self, value):
  181. self._body = utf8(value)
  182. # And the body will be attempted to be parsed as
  183. # `application/json` or `application/x-www-form-urlencoded`
  184. self.parsed_body = self.parse_request_body(self._body)
  185. def __nonzero__(self):
  186. return bool(self.body) or bool(self.raw_headers)
  187. def __str__(self):
  188. tmpl = '<HTTPrettyRequest("{}", total_headers={}, body_length={})>'
  189. return tmpl.format(
  190. self.headers.get('content-type', ''),
  191. len(self.headers),
  192. len(self.body),
  193. )
  194. def parse_querystring(self, qs):
  195. """parses an UTF-8 encoded query string into a dict of string lists
  196. :param qs: a querystring
  197. :returns: a dict of lists
  198. """
  199. expanded = unquote_utf8(qs)
  200. parsed = parse_qs(expanded)
  201. result = {}
  202. for k in parsed:
  203. result[k] = list(map(decode_utf8, parsed[k]))
  204. return result
  205. def parse_request_body(self, body):
  206. """Attempt to parse the post based on the content-type passed.
  207. Return the regular body if not
  208. :param body: string
  209. :returns: a python object such as dict or list in case the deserialization suceeded. Else returns the given param ``body``
  210. """
  211. PARSING_FUNCTIONS = {
  212. 'application/json': json.loads,
  213. 'text/json': json.loads,
  214. 'application/x-www-form-urlencoded': self.parse_querystring,
  215. }
  216. content_type = self.headers.get('content-type', '')
  217. do_parse = PARSING_FUNCTIONS.get(content_type, FALLBACK_FUNCTION)
  218. try:
  219. body = decode_utf8(body)
  220. return do_parse(body)
  221. except (Exception, BaseException):
  222. return body
  223. class EmptyRequestHeaders(dict):
  224. """A dict subclass used as internal representation of empty request
  225. headers
  226. """
  227. class HTTPrettyRequestEmpty(object):
  228. """Represents an empty :py:class:`~httpretty.core.HTTPrettyRequest`
  229. where all its properties are somehow empty or ``None``
  230. """
  231. method = None
  232. url = None
  233. body = ''
  234. headers = EmptyRequestHeaders()
  235. class FakeSockFile(object):
  236. """Fake socket file descriptor. Under the hood all data is written in
  237. a temporary file, giving it a real file descriptor number.
  238. """
  239. def __init__(self):
  240. self.file = tempfile.TemporaryFile()
  241. self._fileno = self.file.fileno()
  242. def getvalue(self):
  243. if hasattr(self.file, 'getvalue'):
  244. return self.file.getvalue()
  245. else:
  246. return self.file.read()
  247. def close(self):
  248. self.socket.close()
  249. self.file.close()
  250. def fileno(self):
  251. return self._fileno
  252. def __getattr__(self, name):
  253. return getattr(self.file, name)
  254. class FakeSSLSocket(object):
  255. """Shorthand for :py:class:`~httpretty.core.fakesock`
  256. """
  257. def __init__(self, sock, *args, **kw):
  258. self._httpretty_sock = sock
  259. def __getattr__(self, attr):
  260. return getattr(self._httpretty_sock, attr)
  261. class fakesock(object):
  262. """
  263. fake :py:mod:`socket`
  264. """
  265. class socket(object):
  266. """drop-in replacement for :py:class:`socket.socket`
  267. """
  268. _entry = None
  269. debuglevel = 0
  270. _sent_data = []
  271. def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM,
  272. protocol=0, _sock=None):
  273. self.truesock = (old_socket(family, type, protocol)
  274. if httpretty.allow_net_connect
  275. else None)
  276. self._connected_truesock = False
  277. self._closed = True
  278. self.fd = FakeSockFile()
  279. self.fd.socket = _sock or self
  280. self.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
  281. self._sock = _sock or self
  282. self.is_http = False
  283. self._bufsize = 1024
  284. def getpeercert(self, *a, **kw):
  285. now = datetime.now()
  286. shift = now + timedelta(days=30 * 12)
  287. return {
  288. 'notAfter': shift.strftime('%b %d %H:%M:%S GMT'),
  289. 'subjectAltName': (
  290. ('DNS', '*.%s' % self._host),
  291. ('DNS', self._host),
  292. ('DNS', '*'),
  293. ),
  294. 'subject': (
  295. (
  296. ('organizationName', '*.%s' % self._host),
  297. ),
  298. (
  299. ('organizationalUnitName',
  300. 'Domain Control Validated'),
  301. ),
  302. (
  303. ('commonName', '*.%s' % self._host),
  304. ),
  305. ),
  306. }
  307. def ssl(self, sock, *args, **kw):
  308. return sock
  309. def setsockopt(self, level, optname, value):
  310. if self.truesock:
  311. self.truesock.setsockopt(level, optname, value)
  312. def connect(self, address):
  313. self._closed = False
  314. try:
  315. self._address = (self._host, self._port) = address
  316. except ValueError:
  317. # We get here when the address is just a string pointing to a
  318. # unix socket path/file
  319. #
  320. # See issue #206
  321. self.is_http = False
  322. else:
  323. ports_to_check = (
  324. POTENTIAL_HTTP_PORTS.union(POTENTIAL_HTTPS_PORTS))
  325. self.is_http = self._port in ports_to_check
  326. if not self.is_http:
  327. if self.truesock and not self._connected_truesock:
  328. self.truesock.connect(self._address)
  329. self._connected_truesock = True
  330. else:
  331. raise UnmockedError()
  332. elif self.truesock and not self._connected_truesock:
  333. matcher = httpretty.match_http_address(self._host, self._port)
  334. if matcher is None:
  335. self.truesock.connect(self._address)
  336. self._connected_truesock = True
  337. def fileno(self):
  338. if self.truesock:
  339. return self.truesock.fileno()
  340. return self.fd.fileno()
  341. def close(self):
  342. if self._connected_truesock:
  343. self.truesock.close()
  344. self._connected_truesock = False
  345. self._closed = True
  346. def makefile(self, mode='r', bufsize=-1):
  347. """Returns this fake socket's own tempfile buffer.
  348. If there is an entry associated with the socket, the file
  349. descriptor gets filled in with the entry data before being
  350. returned.
  351. """
  352. self._mode = mode
  353. self._bufsize = bufsize
  354. if self._entry:
  355. t = threading.Thread(
  356. target=self._entry.fill_filekind, args=(self.fd,)
  357. )
  358. t.start()
  359. if self.timeout == socket._GLOBAL_DEFAULT_TIMEOUT:
  360. timeout = None
  361. else:
  362. timeout = self.timeout
  363. t.join(timeout)
  364. if t.isAlive():
  365. raise socket.timeout
  366. return self.fd
  367. def real_sendall(self, data, *args, **kw):
  368. """Sends data to the remote server. This method is called
  369. when HTTPretty identifies that someone is trying to send
  370. non-http data.
  371. The received bytes are written in this socket's tempfile
  372. buffer so that HTTPretty can return it accordingly when
  373. necessary.
  374. """
  375. if not self.truesock:
  376. raise UnmockedError()
  377. if not self.is_http:
  378. return self.truesock.sendall(data, *args, **kw)
  379. if self._address[1] == 443 and old_sslsocket:
  380. sock = old_sslsocket(self.truesock)
  381. else:
  382. sock = self.truesock
  383. if not self._connected_truesock:
  384. sock.connect(self._address)
  385. sock.setblocking(1)
  386. sock.sendall(data, *args, **kw)
  387. should_continue = True
  388. while should_continue:
  389. try:
  390. received = sock.recv(self._bufsize)
  391. self.fd.write(received)
  392. should_continue = bool(received.strip())
  393. except socket.error as e:
  394. if e.errno == EAGAIN:
  395. continue
  396. break
  397. self.fd.seek(0)
  398. def sendall(self, data, *args, **kw):
  399. self._sent_data.append(data)
  400. self.fd = FakeSockFile()
  401. self.fd.socket = self
  402. try:
  403. requestline, _ = data.split(b'\r\n', 1)
  404. method, path, version = parse_requestline(
  405. decode_utf8(requestline))
  406. is_parsing_headers = True
  407. except ValueError:
  408. path = ''
  409. is_parsing_headers = False
  410. if self._entry is None:
  411. # If the previous request wasn't mocked, don't
  412. # mock the subsequent sending of data
  413. return self.real_sendall(data, *args, **kw)
  414. else:
  415. method = self._entry.method
  416. path = self._entry.info.path
  417. self.fd.seek(0)
  418. if not is_parsing_headers:
  419. if len(self._sent_data) > 1:
  420. headers = utf8(last_requestline(self._sent_data))
  421. meta = self._entry.request.headers
  422. body = utf8(self._sent_data[-1])
  423. if meta.get('transfer-encoding', '') == 'chunked':
  424. if not body.isdigit() and (body != b'\r\n') and (body != b'0\r\n\r\n'):
  425. self._entry.request.body += body
  426. else:
  427. self._entry.request.body += body
  428. httpretty.historify_request(headers, body, False)
  429. return
  430. # path might come with
  431. s = urlsplit(path)
  432. POTENTIAL_HTTP_PORTS.add(int(s.port or 80))
  433. parts = list(map(utf8, data.split(b'\r\n\r\n', 1)))
  434. if len(parts) == 2:
  435. headers, body = parts
  436. else:
  437. headers = ''
  438. body = data
  439. request = httpretty.historify_request(headers, body)
  440. info = URIInfo(
  441. hostname=self._host,
  442. port=self._port,
  443. path=s.path,
  444. query=s.query,
  445. last_request=request
  446. )
  447. matcher, entries = httpretty.match_uriinfo(info)
  448. if not entries:
  449. self._entry = None
  450. self.real_sendall(data)
  451. return
  452. self._entry = matcher.get_next_entry(method, info, request)
  453. def debug(self, truesock_func, *a, **kw):
  454. if self.is_http:
  455. frame = inspect.stack()[0][0]
  456. lines = list(map(utf8, traceback.format_stack(frame)))
  457. message = [
  458. "HTTPretty intercepted and unexpected socket method call.",
  459. ("Please open an issue at "
  460. "'https://github.com/gabrielfalcao/HTTPretty/issues'"),
  461. "And paste the following traceback:\n",
  462. "".join(decode_utf8(lines)),
  463. ]
  464. raise RuntimeError("\n".join(message))
  465. if not self.truesock:
  466. raise UnmockedError()
  467. return getattr(self.truesock, truesock_func)(*a, **kw)
  468. def settimeout(self, new_timeout):
  469. self.timeout = new_timeout
  470. def send(self, *args, **kwargs):
  471. return self.debug('send', *args, **kwargs)
  472. def sendto(self, *args, **kwargs):
  473. return self.debug('sendto', *args, **kwargs)
  474. def recvfrom_into(self, *args, **kwargs):
  475. return self.debug('recvfrom_into', *args, **kwargs)
  476. def recv_into(self, *args, **kwargs):
  477. return self.debug('recv_into', *args, **kwargs)
  478. def recvfrom(self, *args, **kwargs):
  479. return self.debug('recvfrom', *args, **kwargs)
  480. def recv(self, *args, **kwargs):
  481. return self.debug('recv', *args, **kwargs)
  482. def __getattr__(self, name):
  483. if not self.truesock:
  484. raise UnmockedError()
  485. return getattr(self.truesock, name)
  486. def fake_wrap_socket(orig_wrap_socket_fn, *args, **kw):
  487. """drop-in replacement for py:func:`ssl.wrap_socket`
  488. """
  489. server_hostname = kw.get('server_hostname')
  490. if server_hostname is not None:
  491. matcher = httpretty.match_https_hostname(server_hostname)
  492. if matcher is None:
  493. return orig_wrap_socket_fn(*args, **kw)
  494. if 'sock' in kw:
  495. return kw['sock']
  496. else:
  497. return args[0]
  498. def create_fake_connection(
  499. address,
  500. timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
  501. source_address=None):
  502. """drop-in replacement for :py:func:`socket.create_connection`"""
  503. s = fakesock.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
  504. if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
  505. s.settimeout(timeout)
  506. if source_address:
  507. s.bind(source_address)
  508. s.connect(address)
  509. return s
  510. def fake_gethostbyname(host):
  511. """drop-in replacement for :py:func:`socket.gethostbyname`"""
  512. return '127.0.0.1'
  513. def fake_gethostname():
  514. """drop-in replacement for :py:func:`socket.gethostname`"""
  515. return 'localhost'
  516. def fake_getaddrinfo(
  517. host, port, family=None, socktype=None, proto=None, flags=None):
  518. """drop-in replacement for :py:func:`socket.getaddrinfo`"""
  519. return [(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP,
  520. '', (host, port))]
  521. class Entry(BaseClass):
  522. """Created by :py:meth:`~httpretty.core.httpretty.register_uri` and
  523. stored in memory as internal representation of a HTTP
  524. request/response definition.
  525. :param method: string
  526. :param uri: string
  527. :param body: string
  528. :param adding_headers: dict - headers to be added to the response
  529. :param forcing_headers: dict - headers to be forcefully set in the response
  530. :param status: an integer (e.g.: ``status=200``)
  531. :param streaming: bool - whether to stream the response
  532. :param headers: keyword-args with headers to be added to the response
  533. .. warning:: When using the ``forcing_headers`` option make sure to add the header ``Content-Length`` otherwise calls using :py:mod:`requests` will try to load the response endlessly.
  534. """
  535. def __init__(self, method, uri, body,
  536. adding_headers=None,
  537. forcing_headers=None,
  538. status=200,
  539. streaming=False,
  540. **headers):
  541. self.method = method
  542. self.uri = uri
  543. self.info = None
  544. self.request = None
  545. self.body_is_callable = False
  546. if hasattr(body, "__call__"):
  547. self.callable_body = body
  548. self.body = None
  549. self.body_is_callable = True
  550. elif isinstance(body, text_type):
  551. self.body = utf8(body)
  552. else:
  553. self.body = body
  554. self.streaming = streaming
  555. if not streaming and not self.body_is_callable:
  556. self.body_length = len(self.body or '')
  557. else:
  558. self.body_length = 0
  559. self.adding_headers = adding_headers or {}
  560. self.forcing_headers = forcing_headers or {}
  561. self.status = int(status)
  562. for k, v in headers.items():
  563. name = "-".join(k.split("_")).title()
  564. self.adding_headers[name] = v
  565. self.validate()
  566. def validate(self):
  567. """validates the body size with the value of the ``Content-Length``
  568. header
  569. """
  570. content_length_keys = 'Content-Length', 'content-length'
  571. for key in content_length_keys:
  572. got = self.adding_headers.get(
  573. key, self.forcing_headers.get(key, None))
  574. if got is None:
  575. continue
  576. igot = None
  577. try:
  578. igot = int(got)
  579. except (ValueError, TypeError):
  580. warnings.warn(
  581. 'HTTPretty got to register the Content-Length header '
  582. 'with "%r" which is not a number' % got)
  583. return
  584. if igot and igot > self.body_length:
  585. raise HTTPrettyError(
  586. 'HTTPretty got inconsistent parameters. The header '
  587. 'Content-Length you registered expects size "%d" but '
  588. 'the body you registered for that has actually length '
  589. '"%d".' % (
  590. igot, self.body_length,
  591. )
  592. )
  593. def __str__(self):
  594. return r'<Entry {} {} getting {}>'.format(
  595. self.method,
  596. self.uri,
  597. self.status
  598. )
  599. def normalize_headers(self, headers):
  600. """Normalize keys in header names so that ``COntent-tyPe`` becomes ``content-type``
  601. :param headers: dict
  602. :returns: dict
  603. """
  604. new = {}
  605. for k in headers:
  606. new_k = '-'.join([s.lower() for s in k.split('-')])
  607. new[new_k] = headers[k]
  608. return new
  609. def fill_filekind(self, fk):
  610. """writes HTTP Response data to a file descriptor
  611. :parm fk: a file-like object
  612. .. warning:: **side-effect:** this method moves the cursor of the given file object to zero
  613. """
  614. now = datetime.utcnow()
  615. headers = {
  616. 'status': self.status,
  617. 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
  618. 'server': 'Python/HTTPretty',
  619. 'connection': 'close',
  620. }
  621. if self.forcing_headers:
  622. headers = self.forcing_headers
  623. if self.adding_headers:
  624. headers.update(
  625. self.normalize_headers(
  626. self.adding_headers))
  627. headers = self.normalize_headers(headers)
  628. status = headers.get('status', self.status)
  629. if self.body_is_callable:
  630. status, headers, self.body = self.callable_body(self.request, self.info.full_url(), headers)
  631. headers = self.normalize_headers(headers)
  632. # TODO: document this behavior:
  633. if 'content-length' not in headers:
  634. headers.update({
  635. 'content-length': len(self.body)
  636. })
  637. string_list = [
  638. 'HTTP/1.1 %d %s' % (status, STATUSES[status]),
  639. ]
  640. if 'date' in headers:
  641. string_list.append('date: %s' % headers.pop('date'))
  642. if not self.forcing_headers:
  643. content_type = headers.pop('content-type',
  644. 'text/plain; charset=utf-8')
  645. content_length = headers.pop('content-length',
  646. self.body_length)
  647. string_list.append('content-type: %s' % content_type)
  648. if not self.streaming:
  649. string_list.append('content-length: %s' % content_length)
  650. server = headers.pop('server', None)
  651. if server:
  652. string_list.append('server: %s' % server)
  653. for k, v in headers.items():
  654. string_list.append(
  655. '{}: {}'.format(k, v),
  656. )
  657. for item in string_list:
  658. fk.write(utf8(item) + b'\n')
  659. fk.write(b'\r\n')
  660. if self.streaming:
  661. self.body, body = itertools.tee(self.body)
  662. for chunk in body:
  663. fk.write(utf8(chunk))
  664. else:
  665. fk.write(utf8(self.body))
  666. fk.seek(0)
  667. def url_fix(s, charset=None):
  668. """escapes special characters
  669. """
  670. if charset:
  671. warnings.warn("{}.url_fix() charset argument is deprecated".format(__name__), DeprecationWarning)
  672. scheme, netloc, path, querystring, fragment = urlsplit(s)
  673. path = quote(path, b'/%')
  674. querystring = quote_plus(querystring, b':&=')
  675. return urlunsplit((scheme, netloc, path, querystring, fragment))
  676. class URIInfo(BaseClass):
  677. """Internal representation of `URIs <https://en.wikipedia.org/wiki/Uniform_Resource_Identifier>`_
  678. .. tip:: all arguments are optional
  679. :param username:
  680. :param password:
  681. :param hostname:
  682. :param port:
  683. :param path:
  684. :param query:
  685. :param fragment:
  686. :param scheme:
  687. :param last_request:
  688. """
  689. default_str_attrs = (
  690. 'username',
  691. 'password',
  692. 'hostname',
  693. 'port',
  694. 'path',
  695. )
  696. def __init__(self,
  697. username='',
  698. password='',
  699. hostname='',
  700. port=80,
  701. path='/',
  702. query='',
  703. fragment='',
  704. scheme='',
  705. last_request=None):
  706. self.username = username or ''
  707. self.password = password or ''
  708. self.hostname = hostname or ''
  709. if port:
  710. port = int(port)
  711. elif scheme == 'https':
  712. port = 443
  713. self.port = port or 80
  714. self.path = path or ''
  715. if query:
  716. query_items = sorted(parse_qs(query).items())
  717. self.query = urlencode(
  718. encode_obj(query_items),
  719. doseq=True,
  720. )
  721. else:
  722. self.query = ''
  723. if scheme:
  724. self.scheme = scheme
  725. elif self.port in POTENTIAL_HTTPS_PORTS:
  726. self.scheme = 'https'
  727. else:
  728. self.scheme = 'http'
  729. self.fragment = fragment or ''
  730. self.last_request = last_request
  731. def to_str(self, attrs):
  732. fmt = ", ".join(['%s="%s"' % (k, getattr(self, k, '')) for k in attrs])
  733. return r'<httpretty.URIInfo(%s)>' % fmt
  734. def __str__(self):
  735. return self.to_str(self.default_str_attrs)
  736. def str_with_query(self):
  737. attrs = self.default_str_attrs + ('query',)
  738. return self.to_str(attrs)
  739. def __hash__(self):
  740. return int(hashlib.sha1(binary_type(self, 'ascii')).hexdigest(), 16)
  741. def __eq__(self, other):
  742. self_tuple = (
  743. self.port,
  744. decode_utf8(self.hostname.lower()),
  745. url_fix(decode_utf8(self.path)),
  746. )
  747. other_tuple = (
  748. other.port,
  749. decode_utf8(other.hostname.lower()),
  750. url_fix(decode_utf8(other.path)),
  751. )
  752. return self_tuple == other_tuple
  753. def full_url(self, use_querystring=True):
  754. """
  755. :param use_querystring: bool
  756. :returns: a string with the full url with the format ``{scheme}://{credentials}{domain}{path}{query}``
  757. """
  758. credentials = ""
  759. if self.password:
  760. credentials = "{}:{}@".format(
  761. self.username, self.password)
  762. query = ""
  763. if use_querystring and self.query:
  764. query = "?{}".format(decode_utf8(self.query))
  765. result = "{scheme}://{credentials}{domain}{path}{query}".format(
  766. scheme=self.scheme,
  767. credentials=credentials,
  768. domain=self.get_full_domain(),
  769. path=decode_utf8(self.path),
  770. query=query
  771. )
  772. return result
  773. def get_full_domain(self):
  774. """
  775. :returns: a string in the form ``{domain}:{port}`` or just the domain if the port is 80 or 443
  776. """
  777. hostname = decode_utf8(self.hostname)
  778. # Port 80/443 should not be appended to the url
  779. if self.port not in DEFAULT_HTTP_PORTS | DEFAULT_HTTPS_PORTS:
  780. return ":".join([hostname, str(self.port)])
  781. return hostname
  782. @classmethod
  783. def from_uri(cls, uri, entry):
  784. """
  785. :param uri: string
  786. :param entry: an instance of :py:class:`~httpretty.core.Entry`
  787. """
  788. result = urlsplit(uri)
  789. if result.scheme == 'https':
  790. POTENTIAL_HTTPS_PORTS.add(int(result.port or 443))
  791. else:
  792. POTENTIAL_HTTP_PORTS.add(int(result.port or 80))
  793. return cls(result.username,
  794. result.password,
  795. result.hostname,
  796. result.port,
  797. result.path,
  798. result.query,
  799. result.fragment,
  800. result.scheme,
  801. entry)
  802. class URIMatcher(object):
  803. regex = None
  804. info = None
  805. def __init__(self, uri, entries, match_querystring=False, priority=0):
  806. self._match_querystring = match_querystring
  807. # CPython, Jython
  808. regex_types = ('SRE_Pattern', 'org.python.modules.sre.PatternObject',
  809. 'Pattern')
  810. is_regex = type(uri).__name__ in regex_types
  811. if is_regex:
  812. self.regex = uri
  813. result = urlsplit(uri.pattern)
  814. if result.scheme == 'https':
  815. POTENTIAL_HTTPS_PORTS.add(int(result.port or 443))
  816. else:
  817. POTENTIAL_HTTP_PORTS.add(int(result.port or 80))
  818. else:
  819. self.info = URIInfo.from_uri(uri, entries)
  820. self.entries = entries
  821. self.priority = priority
  822. # hash of current_entry pointers, per method.
  823. self.current_entries = {}
  824. def matches(self, info):
  825. if self.info:
  826. # Query string is not considered when comparing info objects, compare separately
  827. return self.info == info and (not self._match_querystring or self.info.query == info.query)
  828. else:
  829. return self.regex.search(info.full_url(
  830. use_querystring=self._match_querystring))
  831. def __str__(self):
  832. wrap = 'URLMatcher({})'
  833. if self.info:
  834. if self._match_querystring:
  835. return wrap.format(text_type(self.info.str_with_query()))
  836. else:
  837. return wrap.format(text_type(self.info))
  838. else:
  839. return wrap.format(self.regex.pattern)
  840. def get_next_entry(self, method, info, request):
  841. """Cycle through available responses, but only once.
  842. Any subsequent requests will receive the last response"""
  843. if method not in self.current_entries:
  844. self.current_entries[method] = 0
  845. # restrict selection to entries that match the requested
  846. # method
  847. entries_for_method = [e for e in self.entries if e.method == method]
  848. if self.current_entries[method] >= len(entries_for_method):
  849. self.current_entries[method] = -1
  850. if not self.entries or not entries_for_method:
  851. raise ValueError('I have no entries for method %s: %s'
  852. % (method, self))
  853. entry = entries_for_method[self.current_entries[method]]
  854. if self.current_entries[method] != -1:
  855. self.current_entries[method] += 1
  856. # Attach more info to the entry
  857. # So the callback can be more clever about what to do
  858. # This does also fix the case where the callback
  859. # would be handed a compiled regex as uri instead of the
  860. # real uri
  861. entry.info = info
  862. entry.request = request
  863. return entry
  864. def __hash__(self):
  865. return hash(text_type(self))
  866. def __eq__(self, other):
  867. return text_type(self) == text_type(other)
  868. class httpretty(HttpBaseClass):
  869. """manages HTTPretty's internal request/response registry and request matching.
  870. """
  871. _entries = {}
  872. latest_requests = []
  873. last_request = HTTPrettyRequestEmpty()
  874. _is_enabled = False
  875. allow_net_connect = True
  876. @classmethod
  877. def match_uriinfo(cls, info):
  878. """
  879. :param info: an :py:class:`~httpretty.core.URIInfo`
  880. :returns: a 2-item tuple: (:py:class:`~httpretty.core.URLMatcher`, :py:class:`~httpretty.core.URIInfo`) or ``(None, [])``
  881. """
  882. items = sorted(
  883. cls._entries.items(),
  884. key=lambda matcher_entries: matcher_entries[0].priority,
  885. reverse=True,
  886. )
  887. for matcher, value in items:
  888. if matcher.matches(info):
  889. return (matcher, info)
  890. return (None, [])
  891. @classmethod
  892. def match_https_hostname(cls, hostname):
  893. """
  894. :param hostname: a string
  895. :returns: an :py:class:`~httpretty.core.URLMatcher` or ``None``
  896. """
  897. items = sorted(
  898. cls._entries.items(),
  899. key=lambda matcher_entries: matcher_entries[0].priority,
  900. reverse=True,
  901. )
  902. for matcher, value in items:
  903. if matcher.info is None:
  904. pattern_with_port = "https://{0}:".format(hostname)
  905. pattern_without_port = "https://{0}/".format(hostname)
  906. hostname_pattern = (
  907. hostname_re
  908. .match(matcher.regex.pattern)
  909. .group(0)
  910. )
  911. for pattern in [pattern_with_port, pattern_without_port]:
  912. if re.match(hostname_pattern, pattern):
  913. return matcher
  914. elif matcher.info.hostname == hostname:
  915. return matcher
  916. return None
  917. @classmethod
  918. def match_http_address(cls, hostname, port):
  919. """
  920. :param hostname: a string
  921. :param port: an integer
  922. :returns: an :py:class:`~httpretty.core.URLMatcher` or ``None``
  923. """
  924. items = sorted(
  925. cls._entries.items(),
  926. key=lambda matcher_entries: matcher_entries[0].priority,
  927. reverse=True,
  928. )
  929. for matcher, value in items:
  930. if matcher.info is None:
  931. if port in POTENTIAL_HTTPS_PORTS:
  932. scheme = 'https://'
  933. else:
  934. scheme = 'http://'
  935. pattern_without_port = "{0}{1}/".format(scheme, hostname)
  936. pattern_with_port = "{0}{1}:{2}/".format(scheme, hostname, port)
  937. hostname_pattern = (
  938. hostname_re
  939. .match(matcher.regex.pattern)
  940. .group(0)
  941. )
  942. for pattern in [pattern_with_port, pattern_without_port]:
  943. if re.match(hostname_pattern, pattern):
  944. return matcher
  945. elif matcher.info.hostname == hostname \
  946. and matcher.info.port == port:
  947. return matcher
  948. return None
  949. @classmethod
  950. @contextlib.contextmanager
  951. def record(cls, filename, indentation=4, encoding='utf-8'):
  952. """
  953. .. testcode::
  954. import io
  955. import json
  956. import requests
  957. import httpretty
  958. with httpretty.record('/tmp/ip.json'):
  959. data = requests.get('https://httpbin.org/ip').json()
  960. with io.open('/tmp/ip.json') as fd:
  961. assert data == json.load(fd)
  962. :param filename: a string
  963. :param indentation: an integer, defaults to **4**
  964. :param encoding: a string, defaults to **"utf-8"**
  965. :returns: a `context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_
  966. """
  967. try:
  968. import urllib3
  969. except ImportError:
  970. msg = (
  971. 'HTTPretty requires urllib3 installed '
  972. 'for recording actual requests.'
  973. )
  974. raise RuntimeError(msg)
  975. http = urllib3.PoolManager()
  976. cls.enable()
  977. calls = []
  978. def record_request(request, uri, headers):
  979. cls.disable()
  980. kw = {}
  981. kw.setdefault('body', request.body)
  982. kw.setdefault('headers', dict(request.headers))
  983. response = http.request(request.method, uri, **kw)
  984. calls.append({
  985. 'request': {
  986. 'uri': uri,
  987. 'method': request.method,
  988. 'headers': dict(request.headers),
  989. 'body': decode_utf8(request.body),
  990. 'querystring': request.querystring
  991. },
  992. 'response': {
  993. 'status': response.status,
  994. 'body': decode_utf8(response.data),
  995. # urllib3 1.10 had a bug if you just did:
  996. # dict(response.headers)
  997. # which would cause all the values to become lists
  998. # with the header name as the first item and the
  999. # true value as the second item. Workaround that
  1000. 'headers': dict(response.headers.items())
  1001. }
  1002. })
  1003. cls.enable()
  1004. return response.status, response.headers, response.data
  1005. for method in cls.METHODS:
  1006. cls.register_uri(method, MULTILINE_ANY_REGEX, body=record_request)
  1007. yield
  1008. cls.disable()
  1009. with codecs.open(filename, 'w', encoding) as f:
  1010. f.write(json.dumps(calls, indent=indentation))
  1011. @classmethod
  1012. @contextlib.contextmanager
  1013. def playback(cls, filename):
  1014. """
  1015. .. testcode::
  1016. import io
  1017. import json
  1018. import requests
  1019. import httpretty
  1020. with httpretty.record('/tmp/ip.json'):
  1021. data = requests.get('https://httpbin.org/ip').json()
  1022. with io.open('/tmp/ip.json') as fd:
  1023. assert data == json.load(fd)
  1024. :param filename: a string
  1025. :returns: a `context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_
  1026. """
  1027. cls.enable()
  1028. data = json.loads(open(filename).read())
  1029. for item in data:
  1030. uri = item['request']['uri']
  1031. method = item['request']['method']
  1032. body = item['response']['body']
  1033. headers = item['response']['headers']
  1034. cls.register_uri(method, uri, body=body, forcing_headers=headers)
  1035. yield
  1036. cls.disable()
  1037. @classmethod
  1038. def reset(cls):
  1039. """resets the internal state of HTTPretty, unregistering all URLs
  1040. """
  1041. POTENTIAL_HTTP_PORTS.intersection_update(DEFAULT_HTTP_PORTS)
  1042. POTENTIAL_HTTPS_PORTS.intersection_update(DEFAULT_HTTPS_PORTS)
  1043. cls._entries.clear()
  1044. cls.latest_requests = []
  1045. cls.last_request = HTTPrettyRequestEmpty()
  1046. @classmethod
  1047. def historify_request(cls, headers, body='', append=True):
  1048. """appends request to a list for later retrieval
  1049. .. testcode::
  1050. import httpretty
  1051. httpretty.register_uri(httpretty.GET, 'https://httpbin.org/ip', body='')
  1052. with httpretty.enabled():
  1053. requests.get('https://httpbin.org/ip')
  1054. assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip'
  1055. """
  1056. request = HTTPrettyRequest(headers, body)
  1057. cls.last_request = request
  1058. if append or not cls.latest_requests:
  1059. cls.latest_requests.append(request)
  1060. else:
  1061. cls.latest_requests[-1] = request
  1062. return request
  1063. @classmethod
  1064. def register_uri(cls, method, uri, body='{"message": "HTTPretty :)"}',
  1065. adding_headers=None,
  1066. forcing_headers=None,
  1067. status=200,
  1068. responses=None,
  1069. match_querystring=False,
  1070. priority=0,
  1071. **headers):
  1072. """
  1073. .. testcode::
  1074. import httpretty
  1075. def request_callback(request, uri, response_headers):
  1076. content_type = request.headers.get('Content-Type')
  1077. assert request.body == '{"nothing": "here"}', 'unexpected body: {}'.format(request.body)
  1078. assert content_type == 'application/json', 'expected application/json but received Content-Type: {}'.format(content_type)
  1079. return [200, response_headers, json.dumps({"hello": "world"})]
  1080. httpretty.register_uri(
  1081. HTTPretty.POST, "https://httpretty.example.com/api",
  1082. body=request_callback)
  1083. with httpretty.enabled():
  1084. requests.post('https://httpretty.example.com/api', data='{"nothing": "here"}', headers={'Content-Type': 'application/json'})
  1085. assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip'
  1086. :param method: one of ``httpretty.GET``, ``httpretty.PUT``, ``httpretty.POST``, ``httpretty.DELETE``, ``httpretty.HEAD``, ``httpretty.PATCH``, ``httpretty.OPTIONS``, ``httpretty.CONNECT``
  1087. :param uri: a string (e.g.: **"https://httpbin.org/ip"**)
  1088. :param body: a string, defaults to ``{"message": "HTTPretty :)"}``
  1089. :param adding_headers: dict - headers to be added to the response
  1090. :param forcing_headers: dict - headers to be forcefully set in the response
  1091. :param status: an integer, defaults to **200**
  1092. :param responses: a list of entries, ideally each created with :py:meth:`~httpretty.core.httpretty.Response`
  1093. :param priority: an integer, useful for setting higher priority over previously registered urls. defaults to zero
  1094. :param match_querystring: bool - whether to take the querystring into account when matching an URL
  1095. :param headers: headers to be added to the response
  1096. """
  1097. uri_is_string = isinstance(uri, basestring)
  1098. if uri_is_string and re.search(r'^\w+://[^/]+[.]\w{2,}$', uri):
  1099. uri += '/'
  1100. if isinstance(responses, list) and len(responses) > 0:
  1101. for response in responses:
  1102. response.uri = uri
  1103. response.method = method
  1104. entries_for_this_uri = responses
  1105. else:
  1106. headers[str('body')] = body
  1107. headers[str('adding_headers')] = adding_headers
  1108. headers[str('forcing_headers')] = forcing_headers
  1109. headers[str('status')] = status
  1110. entries_for_this_uri = [
  1111. cls.Response(method=method, uri=uri, **headers),
  1112. ]
  1113. matcher = URIMatcher(uri, entries_for_this_uri,
  1114. match_querystring, priority)
  1115. if matcher in cls._entries:
  1116. matcher.entries.extend(cls._entries[matcher])
  1117. del cls._entries[matcher]
  1118. cls._entries[matcher] = entries_for_this_uri
  1119. def __str__(self):
  1120. return '<HTTPretty with %d URI entries>' % len(self._entries)
  1121. @classmethod
  1122. def Response(
  1123. cls, body,
  1124. method=None,
  1125. uri=None,
  1126. adding_headers=None,
  1127. forcing_headers=None,
  1128. status=200,
  1129. streaming=False,
  1130. **kw):
  1131. """
  1132. shortcut to create an :py:class:`~httpretty.core.Entry` that takes the body as first positional argument
  1133. .. seealso:: the parameters of this function match those of the :py:class:`~httpretty.core.Entry` constructor
  1134. :param body:
  1135. :param method: one of ``httpretty.GET``, ``httpretty.PUT``, ``httpretty.POST``, ``httpretty.DELETE``, ``httpretty.HEAD``, ``httpretty.PATCH``, ``httpretty.OPTIONS``, ``httpretty.CONNECT``
  1136. :param uri:
  1137. :param adding_headers:
  1138. :param forcing_headers:
  1139. :param status: defaults to **200**
  1140. :param streaming: defaults to **False**
  1141. :param kw: keyword-arguments passed onto the :py:class:`~httpretty.core.Entry`
  1142. :returns: an :py:class:`~httpretty.core.Entry`
  1143. """
  1144. kw['body'] = body
  1145. kw['adding_headers'] = adding_headers
  1146. kw['forcing_headers'] = forcing_headers
  1147. kw['status'] = int(status)
  1148. kw['streaming'] = streaming
  1149. return Entry(method, uri, **kw)
  1150. @classmethod
  1151. def disable(cls):
  1152. """Disables HTTPretty entirely, putting the original :py:mod:`socket`
  1153. module back in its place.
  1154. .. code::
  1155. import re, json
  1156. import httpretty
  1157. httpretty.enable()
  1158. # request passes through fake socket
  1159. response = requests.get('https://httpbin.org')
  1160. httpretty.disable()
  1161. # request uses real python socket module
  1162. response = requests.get('https://httpbin.org')
  1163. .. note:: This method does not call :py:meth:`httpretty.core.reset` automatically.
  1164. """
  1165. cls._is_enabled = False
  1166. socket.socket = old_socket
  1167. socket.SocketType = old_SocketType
  1168. socket._socketobject = old_socket
  1169. socket.create_connection = old_create_connection
  1170. socket.gethostname = old_gethostname
  1171. socket.gethostbyname = old_gethostbyname
  1172. socket.getaddrinfo = old_getaddrinfo
  1173. socket.__dict__['socket'] = old_socket
  1174. socket.__dict__['_socketobject'] = old_socket
  1175. socket.__dict__['SocketType'] = old_SocketType
  1176. socket.__dict__['create_connection'] = old_create_connection
  1177. socket.__dict__['gethostname'] = old_gethostname
  1178. socket.__dict__['gethostbyname'] = old_gethostbyname
  1179. socket.__dict__['getaddrinfo'] = old_getaddrinfo
  1180. if socks:
  1181. socks.socksocket = old_socksocket
  1182. socks.__dict__['socksocket'] = old_socksocket
  1183. if ssl:
  1184. ssl.wrap_socket = old_ssl_wrap_socket
  1185. ssl.SSLSocket = old_sslsocket
  1186. try:
  1187. ssl.SSLContext.wrap_socket = old_sslcontext_wrap_socket
  1188. except AttributeError:
  1189. pass
  1190. ssl.__dict__['wrap_socket'] = old_ssl_wrap_socket
  1191. ssl.__dict__['SSLSocket'] = old_sslsocket
  1192. if not PY3:
  1193. ssl.sslwrap_simple = old_sslwrap_simple
  1194. ssl.__dict__['sslwrap_simple'] = old_sslwrap_simple
  1195. if requests_urllib3_connection is not None:
  1196. requests_urllib3_connection.ssl_wrap_socket = \
  1197. old_requests_ssl_wrap_socket
  1198. requests_urllib3_connection.__dict__['ssl_wrap_socket'] = \
  1199. old_requests_ssl_wrap_socket
  1200. @classmethod
  1201. def is_enabled(cls):
  1202. """Check if HTTPretty is enabled
  1203. :returns: bool
  1204. .. testcode::
  1205. import httpretty
  1206. httpretty.enable()
  1207. assert httpretty.is_enabled() == True
  1208. httpretty.disable()
  1209. assert httpretty.is_enabled() == False
  1210. """
  1211. return cls._is_enabled
  1212. @classmethod
  1213. def enable(cls, allow_net_connect=True):
  1214. """Enables HTTPretty.
  1215. When ``allow_net_connect`` is ``False`` any connection to an unregistered uri will throw :py:class:`httpretty.errors.UnmockedError`.
  1216. .. testcode::
  1217. import re, json
  1218. import httpretty
  1219. httpretty.enable()
  1220. httpretty.register_uri(
  1221. httpretty.GET,
  1222. re.compile(r'http://.*'),
  1223. body=json.dumps({'man': 'in', 'the': 'middle'})
  1224. )
  1225. response = requests.get('https://foo.bar/foo/bar')
  1226. response.json().should.equal({
  1227. "man": "in",
  1228. "the": "middle",
  1229. })
  1230. .. warning:: after calling this method the original :py:mod:`socket` is replaced with :py:class:`httpretty.core.fakesock`. Make sure to call :py:meth:`~httpretty.disable` after done with your tests or use the :py:class:`httpretty.enabled` as decorator or `context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_
  1231. """
  1232. cls.allow_net_connect = allow_net_connect
  1233. cls._is_enabled = True
  1234. # Some versions of python internally shadowed the
  1235. # SocketType variable incorrectly https://bugs.python.org/issue20386
  1236. bad_socket_shadow = (socket.socket != socket.SocketType)
  1237. socket.socket = fakesock.socket
  1238. socket._socketobject = fakesock.socket
  1239. if not bad_socket_shadow:
  1240. socket.SocketType = fakesock.socket
  1241. socket.create_connection = create_fake_connection
  1242. socket.gethostname = fake_gethostname
  1243. socket.gethostbyname = fake_gethostbyname
  1244. socket.getaddrinfo = fake_getaddrinfo
  1245. socket.__dict__['socket'] = fakesock.socket
  1246. socket.__dict__['_socketobject'] = fakesock.socket
  1247. if not bad_socket_shadow:
  1248. socket.__dict__['SocketType'] = fakesock.socket
  1249. socket.__dict__['create_connection'] = create_fake_connection
  1250. socket.__dict__['gethostname'] = fake_gethostname
  1251. socket.__dict__['gethostbyname'] = fake_gethostbyname
  1252. socket.__dict__['getaddrinfo'] = fake_getaddrinfo
  1253. if socks:
  1254. socks.socksocket = fakesock.socket
  1255. socks.__dict__['socksocket'] = fakesock.socket
  1256. if ssl:
  1257. new_wrap = partial(fake_wrap_socket, old_ssl_wrap_socket)
  1258. ssl.wrap_socket = new_wrap
  1259. ssl.SSLSocket = FakeSSLSocket
  1260. try:
  1261. ssl.SSLContext.wrap_socket = partial(fake_wrap_socket, old_sslcontext_wrap_socket)
  1262. except AttributeError:
  1263. pass
  1264. ssl.__dict__['wrap_socket'] = new_wrap
  1265. ssl.__dict__['SSLSocket'] = FakeSSLSocket
  1266. if not PY3:
  1267. ssl.sslwrap_simple = new_wrap
  1268. ssl.__dict__['sslwrap_simple'] = new_wrap
  1269. if requests_urllib3_connection is not None:
  1270. new_wrap = partial(fake_wrap_socket, old_requests_ssl_wrap_socket)
  1271. requests_urllib3_connection.ssl_wrap_socket = new_wrap
  1272. requests_urllib3_connection.__dict__['ssl_wrap_socket'] = new_wrap
  1273. class httprettized(object):
  1274. """`context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_ for enabling HTTPretty.
  1275. .. testcode::
  1276. import json
  1277. import httpretty
  1278. httpretty.register_uri(httpretty.GET, 'https://httpbin.org/ip', body=json.dumps({'origin': '42.42.42.42'}))
  1279. with httpretty.enabled():
  1280. response = requests.get('https://httpbin.org/ip')
  1281. assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip'
  1282. assert response.json() == {'origin': '42.42.42.42'}
  1283. """
  1284. def __init__(self, allow_net_connect=True):
  1285. self.allow_net_connect = allow_net_connect
  1286. def __enter__(self):
  1287. httpretty.reset()
  1288. httpretty.enable(allow_net_connect=self.allow_net_connect)
  1289. def __exit__(self, exc_type, exc_value, traceback):
  1290. httpretty.disable()
  1291. httpretty.reset()
  1292. def httprettified(test=None, allow_net_connect=True):
  1293. """decorator for test functions
  1294. .. tip:: Also available under the alias :py:func:`httpretty.activate`
  1295. :param test: a callable
  1296. example usage with `nosetests <https://nose.readthedocs.io/en/latest/>`_
  1297. .. testcode::
  1298. import sure
  1299. from httpretty import httprettified
  1300. @httprettified
  1301. def test_using_nosetests():
  1302. httpretty.register_uri(
  1303. httpretty.GET,
  1304. 'https://httpbin.org/ip'
  1305. )
  1306. response = requests.get('https://httpbin.org/ip')
  1307. response.json().should.equal({
  1308. "message": "HTTPretty :)"
  1309. })
  1310. example usage with `unittest module <https://docs.python.org/3/library/unittest.html>`_
  1311. .. testcode::
  1312. import unittest
  1313. from sure import expect
  1314. from httpretty import httprettified
  1315. @httprettified
  1316. class TestWithPyUnit(unittest.TestCase):
  1317. def test_httpbin(self):
  1318. httpretty.register_uri(httpretty.GET, 'https://httpbin.org/ip')
  1319. response = requests.get('https://httpbin.org/ip')
  1320. expect(response.json()).to.equal({
  1321. "message": "HTTPretty :)"
  1322. })
  1323. """
  1324. def decorate_unittest_TestCase_setUp(klass):
  1325. # Prefer addCleanup (added in python 2.7), but fall back
  1326. # to using tearDown if it isn't available
  1327. use_addCleanup = hasattr(klass, 'addCleanup')
  1328. original_setUp = (klass.setUp
  1329. if hasattr(klass, 'setUp')
  1330. else None)
  1331. def new_setUp(self):
  1332. httpretty.reset()
  1333. httpretty.enable(allow_net_connect)
  1334. if use_addCleanup:
  1335. self.addCleanup(httpretty.disable)
  1336. if original_setUp:
  1337. original_setUp(self)
  1338. klass.setUp = new_setUp
  1339. if not use_addCleanup:
  1340. original_tearDown = (klass.setUp
  1341. if hasattr(klass, 'tearDown')
  1342. else None)
  1343. def new_tearDown(self):
  1344. httpretty.disable()
  1345. httpretty.reset()
  1346. if original_tearDown:
  1347. original_tearDown(self)
  1348. klass.tearDown = new_tearDown
  1349. return klass
  1350. def decorate_test_methods(klass):
  1351. for attr in dir(klass):
  1352. if not attr.startswith('test_'):
  1353. continue
  1354. attr_value = getattr(klass, attr)
  1355. if not hasattr(attr_value, "__call__"):
  1356. continue
  1357. setattr(klass, attr, decorate_callable(attr_value))
  1358. return klass
  1359. def is_unittest_TestCase(klass):
  1360. try:
  1361. import unittest
  1362. return issubclass(klass, unittest.TestCase)
  1363. except ImportError:
  1364. return False
  1365. "A decorator for tests that use HTTPretty"
  1366. def decorate_class(klass):
  1367. if is_unittest_TestCase(klass):
  1368. return decorate_unittest_TestCase_setUp(klass)
  1369. return decorate_test_methods(klass)
  1370. def decorate_callable(test):
  1371. @functools.wraps(test)
  1372. def wrapper(*args, **kw):
  1373. with httprettized(allow_net_connect):
  1374. return test(*args, **kw)
  1375. return wrapper
  1376. if isinstance(test, ClassTypes):
  1377. return decorate_class(test)
  1378. elif callable(test):
  1379. return decorate_callable(test)
  1380. return decorate_callable