websocket.py 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342
  1. """Implementation of the WebSocket protocol.
  2. `WebSockets <http://dev.w3.org/html5/websockets/>`_ allow for bidirectional
  3. communication between the browser and server.
  4. WebSockets are supported in the current versions of all major browsers,
  5. although older versions that do not support WebSockets are still in use
  6. (refer to http://caniuse.com/websockets for details).
  7. This module implements the final version of the WebSocket protocol as
  8. defined in `RFC 6455 <http://tools.ietf.org/html/rfc6455>`_. Certain
  9. browser versions (notably Safari 5.x) implemented an earlier draft of
  10. the protocol (known as "draft 76") and are not compatible with this module.
  11. .. versionchanged:: 4.0
  12. Removed support for the draft 76 protocol version.
  13. """
  14. from __future__ import absolute_import, division, print_function
  15. import base64
  16. import hashlib
  17. import os
  18. import sys
  19. import struct
  20. import tornado.escape
  21. import tornado.web
  22. import zlib
  23. from tornado.concurrent import Future, future_set_result_unless_cancelled
  24. from tornado.escape import utf8, native_str, to_unicode
  25. from tornado import gen, httpclient, httputil
  26. from tornado.ioloop import IOLoop, PeriodicCallback
  27. from tornado.iostream import StreamClosedError
  28. from tornado.log import gen_log
  29. from tornado import simple_httpclient
  30. from tornado.queues import Queue
  31. from tornado.tcpclient import TCPClient
  32. from tornado.util import _websocket_mask, PY3
  33. if PY3:
  34. from urllib.parse import urlparse # py2
  35. xrange = range
  36. else:
  37. from urlparse import urlparse # py3
  38. _default_max_message_size = 10 * 1024 * 1024
  39. class WebSocketError(Exception):
  40. pass
  41. class WebSocketClosedError(WebSocketError):
  42. """Raised by operations on a closed connection.
  43. .. versionadded:: 3.2
  44. """
  45. pass
  46. class _DecompressTooLargeError(Exception):
  47. pass
  48. class WebSocketHandler(tornado.web.RequestHandler):
  49. """Subclass this class to create a basic WebSocket handler.
  50. Override `on_message` to handle incoming messages, and use
  51. `write_message` to send messages to the client. You can also
  52. override `open` and `on_close` to handle opened and closed
  53. connections.
  54. Custom upgrade response headers can be sent by overriding
  55. `~tornado.web.RequestHandler.set_default_headers` or
  56. `~tornado.web.RequestHandler.prepare`.
  57. See http://dev.w3.org/html5/websockets/ for details on the
  58. JavaScript interface. The protocol is specified at
  59. http://tools.ietf.org/html/rfc6455.
  60. Here is an example WebSocket handler that echos back all received messages
  61. back to the client:
  62. .. testcode::
  63. class EchoWebSocket(tornado.websocket.WebSocketHandler):
  64. def open(self):
  65. print("WebSocket opened")
  66. def on_message(self, message):
  67. self.write_message(u"You said: " + message)
  68. def on_close(self):
  69. print("WebSocket closed")
  70. .. testoutput::
  71. :hide:
  72. WebSockets are not standard HTTP connections. The "handshake" is
  73. HTTP, but after the handshake, the protocol is
  74. message-based. Consequently, most of the Tornado HTTP facilities
  75. are not available in handlers of this type. The only communication
  76. methods available to you are `write_message()`, `ping()`, and
  77. `close()`. Likewise, your request handler class should implement
  78. `open()` method rather than ``get()`` or ``post()``.
  79. If you map the handler above to ``/websocket`` in your application, you can
  80. invoke it in JavaScript with::
  81. var ws = new WebSocket("ws://localhost:8888/websocket");
  82. ws.onopen = function() {
  83. ws.send("Hello, world");
  84. };
  85. ws.onmessage = function (evt) {
  86. alert(evt.data);
  87. };
  88. This script pops up an alert box that says "You said: Hello, world".
  89. Web browsers allow any site to open a websocket connection to any other,
  90. instead of using the same-origin policy that governs other network
  91. access from javascript. This can be surprising and is a potential
  92. security hole, so since Tornado 4.0 `WebSocketHandler` requires
  93. applications that wish to receive cross-origin websockets to opt in
  94. by overriding the `~WebSocketHandler.check_origin` method (see that
  95. method's docs for details). Failure to do so is the most likely
  96. cause of 403 errors when making a websocket connection.
  97. When using a secure websocket connection (``wss://``) with a self-signed
  98. certificate, the connection from a browser may fail because it wants
  99. to show the "accept this certificate" dialog but has nowhere to show it.
  100. You must first visit a regular HTML page using the same certificate
  101. to accept it before the websocket connection will succeed.
  102. If the application setting ``websocket_ping_interval`` has a non-zero
  103. value, a ping will be sent periodically, and the connection will be
  104. closed if a response is not received before the ``websocket_ping_timeout``.
  105. Messages larger than the ``websocket_max_message_size`` application setting
  106. (default 10MiB) will not be accepted.
  107. .. versionchanged:: 4.5
  108. Added ``websocket_ping_interval``, ``websocket_ping_timeout``, and
  109. ``websocket_max_message_size``.
  110. """
  111. def __init__(self, application, request, **kwargs):
  112. super(WebSocketHandler, self).__init__(application, request, **kwargs)
  113. self.ws_connection = None
  114. self.close_code = None
  115. self.close_reason = None
  116. self.stream = None
  117. self._on_close_called = False
  118. def get(self, *args, **kwargs):
  119. self.open_args = args
  120. self.open_kwargs = kwargs
  121. # Upgrade header should be present and should be equal to WebSocket
  122. if self.request.headers.get("Upgrade", "").lower() != 'websocket':
  123. self.set_status(400)
  124. log_msg = "Can \"Upgrade\" only to \"WebSocket\"."
  125. self.finish(log_msg)
  126. gen_log.debug(log_msg)
  127. return
  128. # Connection header should be upgrade.
  129. # Some proxy servers/load balancers
  130. # might mess with it.
  131. headers = self.request.headers
  132. connection = map(lambda s: s.strip().lower(),
  133. headers.get("Connection", "").split(","))
  134. if 'upgrade' not in connection:
  135. self.set_status(400)
  136. log_msg = "\"Connection\" must be \"Upgrade\"."
  137. self.finish(log_msg)
  138. gen_log.debug(log_msg)
  139. return
  140. # Handle WebSocket Origin naming convention differences
  141. # The difference between version 8 and 13 is that in 8 the
  142. # client sends a "Sec-Websocket-Origin" header and in 13 it's
  143. # simply "Origin".
  144. if "Origin" in self.request.headers:
  145. origin = self.request.headers.get("Origin")
  146. else:
  147. origin = self.request.headers.get("Sec-Websocket-Origin", None)
  148. # If there was an origin header, check to make sure it matches
  149. # according to check_origin. When the origin is None, we assume it
  150. # did not come from a browser and that it can be passed on.
  151. if origin is not None and not self.check_origin(origin):
  152. self.set_status(403)
  153. log_msg = "Cross origin websockets not allowed"
  154. self.finish(log_msg)
  155. gen_log.debug(log_msg)
  156. return
  157. self.ws_connection = self.get_websocket_protocol()
  158. if self.ws_connection:
  159. self.ws_connection.accept_connection()
  160. else:
  161. self.set_status(426, "Upgrade Required")
  162. self.set_header("Sec-WebSocket-Version", "7, 8, 13")
  163. self.finish()
  164. stream = None
  165. @property
  166. def ping_interval(self):
  167. """The interval for websocket keep-alive pings.
  168. Set websocket_ping_interval = 0 to disable pings.
  169. """
  170. return self.settings.get('websocket_ping_interval', None)
  171. @property
  172. def ping_timeout(self):
  173. """If no ping is received in this many seconds,
  174. close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
  175. Default is max of 3 pings or 30 seconds.
  176. """
  177. return self.settings.get('websocket_ping_timeout', None)
  178. @property
  179. def max_message_size(self):
  180. """Maximum allowed message size.
  181. If the remote peer sends a message larger than this, the connection
  182. will be closed.
  183. Default is 10MiB.
  184. """
  185. return self.settings.get('websocket_max_message_size', _default_max_message_size)
  186. def write_message(self, message, binary=False):
  187. """Sends the given message to the client of this Web Socket.
  188. The message may be either a string or a dict (which will be
  189. encoded as json). If the ``binary`` argument is false, the
  190. message will be sent as utf8; in binary mode any byte string
  191. is allowed.
  192. If the connection is already closed, raises `WebSocketClosedError`.
  193. Returns a `.Future` which can be used for flow control.
  194. .. versionchanged:: 3.2
  195. `WebSocketClosedError` was added (previously a closed connection
  196. would raise an `AttributeError`)
  197. .. versionchanged:: 4.3
  198. Returns a `.Future` which can be used for flow control.
  199. .. versionchanged:: 5.0
  200. Consistently raises `WebSocketClosedError`. Previously could
  201. sometimes raise `.StreamClosedError`.
  202. """
  203. if self.ws_connection is None:
  204. raise WebSocketClosedError()
  205. if isinstance(message, dict):
  206. message = tornado.escape.json_encode(message)
  207. return self.ws_connection.write_message(message, binary=binary)
  208. def select_subprotocol(self, subprotocols):
  209. """Override to implement subprotocol negotiation.
  210. ``subprotocols`` is a list of strings identifying the
  211. subprotocols proposed by the client. This method may be
  212. overridden to return one of those strings to select it, or
  213. ``None`` to not select a subprotocol.
  214. Failure to select a subprotocol does not automatically abort
  215. the connection, although clients may close the connection if
  216. none of their proposed subprotocols was selected.
  217. The list may be empty, in which case this method must return
  218. None. This method is always called exactly once even if no
  219. subprotocols were proposed so that the handler can be advised
  220. of this fact.
  221. .. versionchanged:: 5.1
  222. Previously, this method was called with a list containing
  223. an empty string instead of an empty list if no subprotocols
  224. were proposed by the client.
  225. """
  226. return None
  227. @property
  228. def selected_subprotocol(self):
  229. """The subprotocol returned by `select_subprotocol`.
  230. .. versionadded:: 5.1
  231. """
  232. return self.ws_connection.selected_subprotocol
  233. def get_compression_options(self):
  234. """Override to return compression options for the connection.
  235. If this method returns None (the default), compression will
  236. be disabled. If it returns a dict (even an empty one), it
  237. will be enabled. The contents of the dict may be used to
  238. control the following compression options:
  239. ``compression_level`` specifies the compression level.
  240. ``mem_level`` specifies the amount of memory used for the internal compression state.
  241. These parameters are documented in details here:
  242. https://docs.python.org/3.6/library/zlib.html#zlib.compressobj
  243. .. versionadded:: 4.1
  244. .. versionchanged:: 4.5
  245. Added ``compression_level`` and ``mem_level``.
  246. """
  247. # TODO: Add wbits option.
  248. return None
  249. def open(self, *args, **kwargs):
  250. """Invoked when a new WebSocket is opened.
  251. The arguments to `open` are extracted from the `tornado.web.URLSpec`
  252. regular expression, just like the arguments to
  253. `tornado.web.RequestHandler.get`.
  254. `open` may be a coroutine. `on_message` will not be called until
  255. `open` has returned.
  256. .. versionchanged:: 5.1
  257. ``open`` may be a coroutine.
  258. """
  259. pass
  260. def on_message(self, message):
  261. """Handle incoming messages on the WebSocket
  262. This method must be overridden.
  263. .. versionchanged:: 4.5
  264. ``on_message`` can be a coroutine.
  265. """
  266. raise NotImplementedError
  267. def ping(self, data=b''):
  268. """Send ping frame to the remote end.
  269. The data argument allows a small amount of data (up to 125
  270. bytes) to be sent as a part of the ping message. Note that not
  271. all websocket implementations expose this data to
  272. applications.
  273. Consider using the ``websocket_ping_interval`` application
  274. setting instead of sending pings manually.
  275. .. versionchanged:: 5.1
  276. The data argument is now optional.
  277. """
  278. data = utf8(data)
  279. if self.ws_connection is None:
  280. raise WebSocketClosedError()
  281. self.ws_connection.write_ping(data)
  282. def on_pong(self, data):
  283. """Invoked when the response to a ping frame is received."""
  284. pass
  285. def on_ping(self, data):
  286. """Invoked when the a ping frame is received."""
  287. pass
  288. def on_close(self):
  289. """Invoked when the WebSocket is closed.
  290. If the connection was closed cleanly and a status code or reason
  291. phrase was supplied, these values will be available as the attributes
  292. ``self.close_code`` and ``self.close_reason``.
  293. .. versionchanged:: 4.0
  294. Added ``close_code`` and ``close_reason`` attributes.
  295. """
  296. pass
  297. def close(self, code=None, reason=None):
  298. """Closes this Web Socket.
  299. Once the close handshake is successful the socket will be closed.
  300. ``code`` may be a numeric status code, taken from the values
  301. defined in `RFC 6455 section 7.4.1
  302. <https://tools.ietf.org/html/rfc6455#section-7.4.1>`_.
  303. ``reason`` may be a textual message about why the connection is
  304. closing. These values are made available to the client, but are
  305. not otherwise interpreted by the websocket protocol.
  306. .. versionchanged:: 4.0
  307. Added the ``code`` and ``reason`` arguments.
  308. """
  309. if self.ws_connection:
  310. self.ws_connection.close(code, reason)
  311. self.ws_connection = None
  312. def check_origin(self, origin):
  313. """Override to enable support for allowing alternate origins.
  314. The ``origin`` argument is the value of the ``Origin`` HTTP
  315. header, the url responsible for initiating this request. This
  316. method is not called for clients that do not send this header;
  317. such requests are always allowed (because all browsers that
  318. implement WebSockets support this header, and non-browser
  319. clients do not have the same cross-site security concerns).
  320. Should return True to accept the request or False to reject it.
  321. By default, rejects all requests with an origin on a host other
  322. than this one.
  323. This is a security protection against cross site scripting attacks on
  324. browsers, since WebSockets are allowed to bypass the usual same-origin
  325. policies and don't use CORS headers.
  326. .. warning::
  327. This is an important security measure; don't disable it
  328. without understanding the security implications. In
  329. particular, if your authentication is cookie-based, you
  330. must either restrict the origins allowed by
  331. ``check_origin()`` or implement your own XSRF-like
  332. protection for websocket connections. See `these
  333. <https://www.christian-schneider.net/CrossSiteWebSocketHijacking.html>`_
  334. `articles
  335. <https://devcenter.heroku.com/articles/websocket-security>`_
  336. for more.
  337. To accept all cross-origin traffic (which was the default prior to
  338. Tornado 4.0), simply override this method to always return true::
  339. def check_origin(self, origin):
  340. return True
  341. To allow connections from any subdomain of your site, you might
  342. do something like::
  343. def check_origin(self, origin):
  344. parsed_origin = urllib.parse.urlparse(origin)
  345. return parsed_origin.netloc.endswith(".mydomain.com")
  346. .. versionadded:: 4.0
  347. """
  348. parsed_origin = urlparse(origin)
  349. origin = parsed_origin.netloc
  350. origin = origin.lower()
  351. host = self.request.headers.get("Host")
  352. # Check to see that origin matches host directly, including ports
  353. return origin == host
  354. def set_nodelay(self, value):
  355. """Set the no-delay flag for this stream.
  356. By default, small messages may be delayed and/or combined to minimize
  357. the number of packets sent. This can sometimes cause 200-500ms delays
  358. due to the interaction between Nagle's algorithm and TCP delayed
  359. ACKs. To reduce this delay (at the expense of possibly increasing
  360. bandwidth usage), call ``self.set_nodelay(True)`` once the websocket
  361. connection is established.
  362. See `.BaseIOStream.set_nodelay` for additional details.
  363. .. versionadded:: 3.1
  364. """
  365. self.stream.set_nodelay(value)
  366. def on_connection_close(self):
  367. if self.ws_connection:
  368. self.ws_connection.on_connection_close()
  369. self.ws_connection = None
  370. if not self._on_close_called:
  371. self._on_close_called = True
  372. self.on_close()
  373. self._break_cycles()
  374. def _break_cycles(self):
  375. # WebSocketHandlers call finish() early, but we don't want to
  376. # break up reference cycles (which makes it impossible to call
  377. # self.render_string) until after we've really closed the
  378. # connection (if it was established in the first place,
  379. # indicated by status code 101).
  380. if self.get_status() != 101 or self._on_close_called:
  381. super(WebSocketHandler, self)._break_cycles()
  382. def send_error(self, *args, **kwargs):
  383. if self.stream is None:
  384. super(WebSocketHandler, self).send_error(*args, **kwargs)
  385. else:
  386. # If we get an uncaught exception during the handshake,
  387. # we have no choice but to abruptly close the connection.
  388. # TODO: for uncaught exceptions after the handshake,
  389. # we can close the connection more gracefully.
  390. self.stream.close()
  391. def get_websocket_protocol(self):
  392. websocket_version = self.request.headers.get("Sec-WebSocket-Version")
  393. if websocket_version in ("7", "8", "13"):
  394. return WebSocketProtocol13(
  395. self, compression_options=self.get_compression_options())
  396. def _attach_stream(self):
  397. self.stream = self.detach()
  398. self.stream.set_close_callback(self.on_connection_close)
  399. # disable non-WS methods
  400. for method in ["write", "redirect", "set_header", "set_cookie",
  401. "set_status", "flush", "finish"]:
  402. setattr(self, method, _raise_not_supported_for_websockets)
  403. def _raise_not_supported_for_websockets(*args, **kwargs):
  404. raise RuntimeError("Method not supported for Web Sockets")
  405. class WebSocketProtocol(object):
  406. """Base class for WebSocket protocol versions.
  407. """
  408. def __init__(self, handler):
  409. self.handler = handler
  410. self.request = handler.request
  411. self.stream = handler.stream
  412. self.client_terminated = False
  413. self.server_terminated = False
  414. def _run_callback(self, callback, *args, **kwargs):
  415. """Runs the given callback with exception handling.
  416. If the callback is a coroutine, returns its Future. On error, aborts the
  417. websocket connection and returns None.
  418. """
  419. try:
  420. result = callback(*args, **kwargs)
  421. except Exception:
  422. self.handler.log_exception(*sys.exc_info())
  423. self._abort()
  424. else:
  425. if result is not None:
  426. result = gen.convert_yielded(result)
  427. self.stream.io_loop.add_future(result, lambda f: f.result())
  428. return result
  429. def on_connection_close(self):
  430. self._abort()
  431. def _abort(self):
  432. """Instantly aborts the WebSocket connection by closing the socket"""
  433. self.client_terminated = True
  434. self.server_terminated = True
  435. self.stream.close() # forcibly tear down the connection
  436. self.close() # let the subclass cleanup
  437. class _PerMessageDeflateCompressor(object):
  438. def __init__(self, persistent, max_wbits, compression_options=None):
  439. if max_wbits is None:
  440. max_wbits = zlib.MAX_WBITS
  441. # There is no symbolic constant for the minimum wbits value.
  442. if not (8 <= max_wbits <= zlib.MAX_WBITS):
  443. raise ValueError("Invalid max_wbits value %r; allowed range 8-%d",
  444. max_wbits, zlib.MAX_WBITS)
  445. self._max_wbits = max_wbits
  446. if compression_options is None or 'compression_level' not in compression_options:
  447. self._compression_level = tornado.web.GZipContentEncoding.GZIP_LEVEL
  448. else:
  449. self._compression_level = compression_options['compression_level']
  450. if compression_options is None or 'mem_level' not in compression_options:
  451. self._mem_level = 8
  452. else:
  453. self._mem_level = compression_options['mem_level']
  454. if persistent:
  455. self._compressor = self._create_compressor()
  456. else:
  457. self._compressor = None
  458. def _create_compressor(self):
  459. return zlib.compressobj(self._compression_level,
  460. zlib.DEFLATED, -self._max_wbits, self._mem_level)
  461. def compress(self, data):
  462. compressor = self._compressor or self._create_compressor()
  463. data = (compressor.compress(data) +
  464. compressor.flush(zlib.Z_SYNC_FLUSH))
  465. assert data.endswith(b'\x00\x00\xff\xff')
  466. return data[:-4]
  467. class _PerMessageDeflateDecompressor(object):
  468. def __init__(self, persistent, max_wbits, max_message_size, compression_options=None):
  469. self._max_message_size = max_message_size
  470. if max_wbits is None:
  471. max_wbits = zlib.MAX_WBITS
  472. if not (8 <= max_wbits <= zlib.MAX_WBITS):
  473. raise ValueError("Invalid max_wbits value %r; allowed range 8-%d",
  474. max_wbits, zlib.MAX_WBITS)
  475. self._max_wbits = max_wbits
  476. if persistent:
  477. self._decompressor = self._create_decompressor()
  478. else:
  479. self._decompressor = None
  480. def _create_decompressor(self):
  481. return zlib.decompressobj(-self._max_wbits)
  482. def decompress(self, data):
  483. decompressor = self._decompressor or self._create_decompressor()
  484. result = decompressor.decompress(data + b'\x00\x00\xff\xff', self._max_message_size)
  485. if decompressor.unconsumed_tail:
  486. raise _DecompressTooLargeError()
  487. return result
  488. class WebSocketProtocol13(WebSocketProtocol):
  489. """Implementation of the WebSocket protocol from RFC 6455.
  490. This class supports versions 7 and 8 of the protocol in addition to the
  491. final version 13.
  492. """
  493. # Bit masks for the first byte of a frame.
  494. FIN = 0x80
  495. RSV1 = 0x40
  496. RSV2 = 0x20
  497. RSV3 = 0x10
  498. RSV_MASK = RSV1 | RSV2 | RSV3
  499. OPCODE_MASK = 0x0f
  500. def __init__(self, handler, mask_outgoing=False,
  501. compression_options=None):
  502. WebSocketProtocol.__init__(self, handler)
  503. self.mask_outgoing = mask_outgoing
  504. self._final_frame = False
  505. self._frame_opcode = None
  506. self._masked_frame = None
  507. self._frame_mask = None
  508. self._frame_length = None
  509. self._fragmented_message_buffer = None
  510. self._fragmented_message_opcode = None
  511. self._waiting = None
  512. self._compression_options = compression_options
  513. self._decompressor = None
  514. self._compressor = None
  515. self._frame_compressed = None
  516. # The total uncompressed size of all messages received or sent.
  517. # Unicode messages are encoded to utf8.
  518. # Only for testing; subject to change.
  519. self._message_bytes_in = 0
  520. self._message_bytes_out = 0
  521. # The total size of all packets received or sent. Includes
  522. # the effect of compression, frame overhead, and control frames.
  523. self._wire_bytes_in = 0
  524. self._wire_bytes_out = 0
  525. self.ping_callback = None
  526. self.last_ping = 0
  527. self.last_pong = 0
  528. def accept_connection(self):
  529. try:
  530. self._handle_websocket_headers()
  531. except ValueError:
  532. self.handler.set_status(400)
  533. log_msg = "Missing/Invalid WebSocket headers"
  534. self.handler.finish(log_msg)
  535. gen_log.debug(log_msg)
  536. return
  537. try:
  538. self._accept_connection()
  539. except ValueError:
  540. gen_log.debug("Malformed WebSocket request received",
  541. exc_info=True)
  542. self._abort()
  543. return
  544. def _handle_websocket_headers(self):
  545. """Verifies all invariant- and required headers
  546. If a header is missing or have an incorrect value ValueError will be
  547. raised
  548. """
  549. fields = ("Host", "Sec-Websocket-Key", "Sec-Websocket-Version")
  550. if not all(map(lambda f: self.request.headers.get(f), fields)):
  551. raise ValueError("Missing/Invalid WebSocket headers")
  552. @staticmethod
  553. def compute_accept_value(key):
  554. """Computes the value for the Sec-WebSocket-Accept header,
  555. given the value for Sec-WebSocket-Key.
  556. """
  557. sha1 = hashlib.sha1()
  558. sha1.update(utf8(key))
  559. sha1.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11") # Magic value
  560. return native_str(base64.b64encode(sha1.digest()))
  561. def _challenge_response(self):
  562. return WebSocketProtocol13.compute_accept_value(
  563. self.request.headers.get("Sec-Websocket-Key"))
  564. @gen.coroutine
  565. def _accept_connection(self):
  566. subprotocol_header = self.request.headers.get("Sec-WebSocket-Protocol")
  567. if subprotocol_header:
  568. subprotocols = [s.strip() for s in subprotocol_header.split(',')]
  569. else:
  570. subprotocols = []
  571. self.selected_subprotocol = self.handler.select_subprotocol(subprotocols)
  572. if self.selected_subprotocol:
  573. assert self.selected_subprotocol in subprotocols
  574. self.handler.set_header("Sec-WebSocket-Protocol", self.selected_subprotocol)
  575. extensions = self._parse_extensions_header(self.request.headers)
  576. for ext in extensions:
  577. if (ext[0] == 'permessage-deflate' and
  578. self._compression_options is not None):
  579. # TODO: negotiate parameters if compression_options
  580. # specifies limits.
  581. self._create_compressors('server', ext[1], self._compression_options)
  582. if ('client_max_window_bits' in ext[1] and
  583. ext[1]['client_max_window_bits'] is None):
  584. # Don't echo an offered client_max_window_bits
  585. # parameter with no value.
  586. del ext[1]['client_max_window_bits']
  587. self.handler.set_header("Sec-WebSocket-Extensions",
  588. httputil._encode_header(
  589. 'permessage-deflate', ext[1]))
  590. break
  591. self.handler.clear_header("Content-Type")
  592. self.handler.set_status(101)
  593. self.handler.set_header("Upgrade", "websocket")
  594. self.handler.set_header("Connection", "Upgrade")
  595. self.handler.set_header("Sec-WebSocket-Accept", self._challenge_response())
  596. self.handler.finish()
  597. self.handler._attach_stream()
  598. self.stream = self.handler.stream
  599. self.start_pinging()
  600. open_result = self._run_callback(self.handler.open, *self.handler.open_args,
  601. **self.handler.open_kwargs)
  602. if open_result is not None:
  603. yield open_result
  604. yield self._receive_frame_loop()
  605. def _parse_extensions_header(self, headers):
  606. extensions = headers.get("Sec-WebSocket-Extensions", '')
  607. if extensions:
  608. return [httputil._parse_header(e.strip())
  609. for e in extensions.split(',')]
  610. return []
  611. def _process_server_headers(self, key, headers):
  612. """Process the headers sent by the server to this client connection.
  613. 'key' is the websocket handshake challenge/response key.
  614. """
  615. assert headers['Upgrade'].lower() == 'websocket'
  616. assert headers['Connection'].lower() == 'upgrade'
  617. accept = self.compute_accept_value(key)
  618. assert headers['Sec-Websocket-Accept'] == accept
  619. extensions = self._parse_extensions_header(headers)
  620. for ext in extensions:
  621. if (ext[0] == 'permessage-deflate' and
  622. self._compression_options is not None):
  623. self._create_compressors('client', ext[1])
  624. else:
  625. raise ValueError("unsupported extension %r", ext)
  626. self.selected_subprotocol = headers.get('Sec-WebSocket-Protocol', None)
  627. def _get_compressor_options(self, side, agreed_parameters, compression_options=None):
  628. """Converts a websocket agreed_parameters set to keyword arguments
  629. for our compressor objects.
  630. """
  631. options = dict(
  632. persistent=(side + '_no_context_takeover') not in agreed_parameters)
  633. wbits_header = agreed_parameters.get(side + '_max_window_bits', None)
  634. if wbits_header is None:
  635. options['max_wbits'] = zlib.MAX_WBITS
  636. else:
  637. options['max_wbits'] = int(wbits_header)
  638. options['compression_options'] = compression_options
  639. return options
  640. def _create_compressors(self, side, agreed_parameters, compression_options=None):
  641. # TODO: handle invalid parameters gracefully
  642. allowed_keys = set(['server_no_context_takeover',
  643. 'client_no_context_takeover',
  644. 'server_max_window_bits',
  645. 'client_max_window_bits'])
  646. for key in agreed_parameters:
  647. if key not in allowed_keys:
  648. raise ValueError("unsupported compression parameter %r" % key)
  649. other_side = 'client' if (side == 'server') else 'server'
  650. self._compressor = _PerMessageDeflateCompressor(
  651. **self._get_compressor_options(side, agreed_parameters, compression_options))
  652. self._decompressor = _PerMessageDeflateDecompressor(
  653. max_message_size=self.handler.max_message_size,
  654. **self._get_compressor_options(other_side, agreed_parameters, compression_options))
  655. def _write_frame(self, fin, opcode, data, flags=0):
  656. data_len = len(data)
  657. if opcode & 0x8:
  658. # All control frames MUST have a payload length of 125
  659. # bytes or less and MUST NOT be fragmented.
  660. if not fin:
  661. raise ValueError("control frames may not be fragmented")
  662. if data_len > 125:
  663. raise ValueError("control frame payloads may not exceed 125 bytes")
  664. if fin:
  665. finbit = self.FIN
  666. else:
  667. finbit = 0
  668. frame = struct.pack("B", finbit | opcode | flags)
  669. if self.mask_outgoing:
  670. mask_bit = 0x80
  671. else:
  672. mask_bit = 0
  673. if data_len < 126:
  674. frame += struct.pack("B", data_len | mask_bit)
  675. elif data_len <= 0xFFFF:
  676. frame += struct.pack("!BH", 126 | mask_bit, data_len)
  677. else:
  678. frame += struct.pack("!BQ", 127 | mask_bit, data_len)
  679. if self.mask_outgoing:
  680. mask = os.urandom(4)
  681. data = mask + _websocket_mask(mask, data)
  682. frame += data
  683. self._wire_bytes_out += len(frame)
  684. return self.stream.write(frame)
  685. def write_message(self, message, binary=False):
  686. """Sends the given message to the client of this Web Socket."""
  687. if binary:
  688. opcode = 0x2
  689. else:
  690. opcode = 0x1
  691. message = tornado.escape.utf8(message)
  692. assert isinstance(message, bytes)
  693. self._message_bytes_out += len(message)
  694. flags = 0
  695. if self._compressor:
  696. message = self._compressor.compress(message)
  697. flags |= self.RSV1
  698. # For historical reasons, write methods in Tornado operate in a semi-synchronous
  699. # mode in which awaiting the Future they return is optional (But errors can
  700. # still be raised). This requires us to go through an awkward dance here
  701. # to transform the errors that may be returned while presenting the same
  702. # semi-synchronous interface.
  703. try:
  704. fut = self._write_frame(True, opcode, message, flags=flags)
  705. except StreamClosedError:
  706. raise WebSocketClosedError()
  707. @gen.coroutine
  708. def wrapper():
  709. try:
  710. yield fut
  711. except StreamClosedError:
  712. raise WebSocketClosedError()
  713. return wrapper()
  714. def write_ping(self, data):
  715. """Send ping frame."""
  716. assert isinstance(data, bytes)
  717. self._write_frame(True, 0x9, data)
  718. @gen.coroutine
  719. def _receive_frame_loop(self):
  720. try:
  721. while not self.client_terminated:
  722. yield self._receive_frame()
  723. except StreamClosedError:
  724. self._abort()
  725. def _read_bytes(self, n):
  726. self._wire_bytes_in += n
  727. return self.stream.read_bytes(n)
  728. @gen.coroutine
  729. def _receive_frame(self):
  730. # Read the frame header.
  731. data = yield self._read_bytes(2)
  732. header, mask_payloadlen = struct.unpack("BB", data)
  733. is_final_frame = header & self.FIN
  734. reserved_bits = header & self.RSV_MASK
  735. opcode = header & self.OPCODE_MASK
  736. opcode_is_control = opcode & 0x8
  737. if self._decompressor is not None and opcode != 0:
  738. # Compression flag is present in the first frame's header,
  739. # but we can't decompress until we have all the frames of
  740. # the message.
  741. self._frame_compressed = bool(reserved_bits & self.RSV1)
  742. reserved_bits &= ~self.RSV1
  743. if reserved_bits:
  744. # client is using as-yet-undefined extensions; abort
  745. self._abort()
  746. return
  747. is_masked = bool(mask_payloadlen & 0x80)
  748. payloadlen = mask_payloadlen & 0x7f
  749. # Parse and validate the length.
  750. if opcode_is_control and payloadlen >= 126:
  751. # control frames must have payload < 126
  752. self._abort()
  753. return
  754. if payloadlen < 126:
  755. self._frame_length = payloadlen
  756. elif payloadlen == 126:
  757. data = yield self._read_bytes(2)
  758. payloadlen = struct.unpack("!H", data)[0]
  759. elif payloadlen == 127:
  760. data = yield self._read_bytes(8)
  761. payloadlen = struct.unpack("!Q", data)[0]
  762. new_len = payloadlen
  763. if self._fragmented_message_buffer is not None:
  764. new_len += len(self._fragmented_message_buffer)
  765. if new_len > self.handler.max_message_size:
  766. self.close(1009, "message too big")
  767. self._abort()
  768. return
  769. # Read the payload, unmasking if necessary.
  770. if is_masked:
  771. self._frame_mask = yield self._read_bytes(4)
  772. data = yield self._read_bytes(payloadlen)
  773. if is_masked:
  774. data = _websocket_mask(self._frame_mask, data)
  775. # Decide what to do with this frame.
  776. if opcode_is_control:
  777. # control frames may be interleaved with a series of fragmented
  778. # data frames, so control frames must not interact with
  779. # self._fragmented_*
  780. if not is_final_frame:
  781. # control frames must not be fragmented
  782. self._abort()
  783. return
  784. elif opcode == 0: # continuation frame
  785. if self._fragmented_message_buffer is None:
  786. # nothing to continue
  787. self._abort()
  788. return
  789. self._fragmented_message_buffer += data
  790. if is_final_frame:
  791. opcode = self._fragmented_message_opcode
  792. data = self._fragmented_message_buffer
  793. self._fragmented_message_buffer = None
  794. else: # start of new data message
  795. if self._fragmented_message_buffer is not None:
  796. # can't start new message until the old one is finished
  797. self._abort()
  798. return
  799. if not is_final_frame:
  800. self._fragmented_message_opcode = opcode
  801. self._fragmented_message_buffer = data
  802. if is_final_frame:
  803. handled_future = self._handle_message(opcode, data)
  804. if handled_future is not None:
  805. yield handled_future
  806. def _handle_message(self, opcode, data):
  807. """Execute on_message, returning its Future if it is a coroutine."""
  808. if self.client_terminated:
  809. return
  810. if self._frame_compressed:
  811. try:
  812. data = self._decompressor.decompress(data)
  813. except _DecompressTooLargeError:
  814. self.close(1009, "message too big after decompression")
  815. self._abort()
  816. return
  817. if opcode == 0x1:
  818. # UTF-8 data
  819. self._message_bytes_in += len(data)
  820. try:
  821. decoded = data.decode("utf-8")
  822. except UnicodeDecodeError:
  823. self._abort()
  824. return
  825. return self._run_callback(self.handler.on_message, decoded)
  826. elif opcode == 0x2:
  827. # Binary data
  828. self._message_bytes_in += len(data)
  829. return self._run_callback(self.handler.on_message, data)
  830. elif opcode == 0x8:
  831. # Close
  832. self.client_terminated = True
  833. if len(data) >= 2:
  834. self.handler.close_code = struct.unpack('>H', data[:2])[0]
  835. if len(data) > 2:
  836. self.handler.close_reason = to_unicode(data[2:])
  837. # Echo the received close code, if any (RFC 6455 section 5.5.1).
  838. self.close(self.handler.close_code)
  839. elif opcode == 0x9:
  840. # Ping
  841. try:
  842. self._write_frame(True, 0xA, data)
  843. except StreamClosedError:
  844. self._abort()
  845. self._run_callback(self.handler.on_ping, data)
  846. elif opcode == 0xA:
  847. # Pong
  848. self.last_pong = IOLoop.current().time()
  849. return self._run_callback(self.handler.on_pong, data)
  850. else:
  851. self._abort()
  852. def close(self, code=None, reason=None):
  853. """Closes the WebSocket connection."""
  854. if not self.server_terminated:
  855. if not self.stream.closed():
  856. if code is None and reason is not None:
  857. code = 1000 # "normal closure" status code
  858. if code is None:
  859. close_data = b''
  860. else:
  861. close_data = struct.pack('>H', code)
  862. if reason is not None:
  863. close_data += utf8(reason)
  864. try:
  865. self._write_frame(True, 0x8, close_data)
  866. except StreamClosedError:
  867. self._abort()
  868. self.server_terminated = True
  869. if self.client_terminated:
  870. if self._waiting is not None:
  871. self.stream.io_loop.remove_timeout(self._waiting)
  872. self._waiting = None
  873. self.stream.close()
  874. elif self._waiting is None:
  875. # Give the client a few seconds to complete a clean shutdown,
  876. # otherwise just close the connection.
  877. self._waiting = self.stream.io_loop.add_timeout(
  878. self.stream.io_loop.time() + 5, self._abort)
  879. @property
  880. def ping_interval(self):
  881. interval = self.handler.ping_interval
  882. if interval is not None:
  883. return interval
  884. return 0
  885. @property
  886. def ping_timeout(self):
  887. timeout = self.handler.ping_timeout
  888. if timeout is not None:
  889. return timeout
  890. return max(3 * self.ping_interval, 30)
  891. def start_pinging(self):
  892. """Start sending periodic pings to keep the connection alive"""
  893. if self.ping_interval > 0:
  894. self.last_ping = self.last_pong = IOLoop.current().time()
  895. self.ping_callback = PeriodicCallback(
  896. self.periodic_ping, self.ping_interval * 1000)
  897. self.ping_callback.start()
  898. def periodic_ping(self):
  899. """Send a ping to keep the websocket alive
  900. Called periodically if the websocket_ping_interval is set and non-zero.
  901. """
  902. if self.stream.closed() and self.ping_callback is not None:
  903. self.ping_callback.stop()
  904. return
  905. # Check for timeout on pong. Make sure that we really have
  906. # sent a recent ping in case the machine with both server and
  907. # client has been suspended since the last ping.
  908. now = IOLoop.current().time()
  909. since_last_pong = now - self.last_pong
  910. since_last_ping = now - self.last_ping
  911. if (since_last_ping < 2 * self.ping_interval and
  912. since_last_pong > self.ping_timeout):
  913. self.close()
  914. return
  915. self.write_ping(b'')
  916. self.last_ping = now
  917. class WebSocketClientConnection(simple_httpclient._HTTPConnection):
  918. """WebSocket client connection.
  919. This class should not be instantiated directly; use the
  920. `websocket_connect` function instead.
  921. """
  922. def __init__(self, request, on_message_callback=None,
  923. compression_options=None, ping_interval=None, ping_timeout=None,
  924. max_message_size=None, subprotocols=[]):
  925. self.compression_options = compression_options
  926. self.connect_future = Future()
  927. self.protocol = None
  928. self.read_queue = Queue(1)
  929. self.key = base64.b64encode(os.urandom(16))
  930. self._on_message_callback = on_message_callback
  931. self.close_code = self.close_reason = None
  932. self.ping_interval = ping_interval
  933. self.ping_timeout = ping_timeout
  934. self.max_message_size = max_message_size
  935. scheme, sep, rest = request.url.partition(':')
  936. scheme = {'ws': 'http', 'wss': 'https'}[scheme]
  937. request.url = scheme + sep + rest
  938. request.headers.update({
  939. 'Upgrade': 'websocket',
  940. 'Connection': 'Upgrade',
  941. 'Sec-WebSocket-Key': self.key,
  942. 'Sec-WebSocket-Version': '13',
  943. })
  944. if subprotocols is not None:
  945. request.headers['Sec-WebSocket-Protocol'] = ','.join(subprotocols)
  946. if self.compression_options is not None:
  947. # Always offer to let the server set our max_wbits (and even though
  948. # we don't offer it, we will accept a client_no_context_takeover
  949. # from the server).
  950. # TODO: set server parameters for deflate extension
  951. # if requested in self.compression_options.
  952. request.headers['Sec-WebSocket-Extensions'] = (
  953. 'permessage-deflate; client_max_window_bits')
  954. self.tcp_client = TCPClient()
  955. super(WebSocketClientConnection, self).__init__(
  956. None, request, lambda: None, self._on_http_response,
  957. 104857600, self.tcp_client, 65536, 104857600)
  958. def close(self, code=None, reason=None):
  959. """Closes the websocket connection.
  960. ``code`` and ``reason`` are documented under
  961. `WebSocketHandler.close`.
  962. .. versionadded:: 3.2
  963. .. versionchanged:: 4.0
  964. Added the ``code`` and ``reason`` arguments.
  965. """
  966. if self.protocol is not None:
  967. self.protocol.close(code, reason)
  968. self.protocol = None
  969. def on_connection_close(self):
  970. if not self.connect_future.done():
  971. self.connect_future.set_exception(StreamClosedError())
  972. self.on_message(None)
  973. self.tcp_client.close()
  974. super(WebSocketClientConnection, self).on_connection_close()
  975. def _on_http_response(self, response):
  976. if not self.connect_future.done():
  977. if response.error:
  978. self.connect_future.set_exception(response.error)
  979. else:
  980. self.connect_future.set_exception(WebSocketError(
  981. "Non-websocket response"))
  982. def headers_received(self, start_line, headers):
  983. if start_line.code != 101:
  984. return super(WebSocketClientConnection, self).headers_received(
  985. start_line, headers)
  986. self.headers = headers
  987. self.protocol = self.get_websocket_protocol()
  988. self.protocol._process_server_headers(self.key, self.headers)
  989. self.protocol.start_pinging()
  990. IOLoop.current().add_callback(self.protocol._receive_frame_loop)
  991. if self._timeout is not None:
  992. self.io_loop.remove_timeout(self._timeout)
  993. self._timeout = None
  994. self.stream = self.connection.detach()
  995. self.stream.set_close_callback(self.on_connection_close)
  996. # Once we've taken over the connection, clear the final callback
  997. # we set on the http request. This deactivates the error handling
  998. # in simple_httpclient that would otherwise interfere with our
  999. # ability to see exceptions.
  1000. self.final_callback = None
  1001. future_set_result_unless_cancelled(self.connect_future, self)
  1002. def write_message(self, message, binary=False):
  1003. """Sends a message to the WebSocket server.
  1004. If the stream is closed, raises `WebSocketClosedError`.
  1005. Returns a `.Future` which can be used for flow control.
  1006. .. versionchanged:: 5.0
  1007. Exception raised on a closed stream changed from `.StreamClosedError`
  1008. to `WebSocketClosedError`.
  1009. """
  1010. return self.protocol.write_message(message, binary=binary)
  1011. def read_message(self, callback=None):
  1012. """Reads a message from the WebSocket server.
  1013. If on_message_callback was specified at WebSocket
  1014. initialization, this function will never return messages
  1015. Returns a future whose result is the message, or None
  1016. if the connection is closed. If a callback argument
  1017. is given it will be called with the future when it is
  1018. ready.
  1019. """
  1020. future = self.read_queue.get()
  1021. if callback is not None:
  1022. self.io_loop.add_future(future, callback)
  1023. return future
  1024. def on_message(self, message):
  1025. if self._on_message_callback:
  1026. self._on_message_callback(message)
  1027. else:
  1028. return self.read_queue.put(message)
  1029. def ping(self, data=b''):
  1030. """Send ping frame to the remote end.
  1031. The data argument allows a small amount of data (up to 125
  1032. bytes) to be sent as a part of the ping message. Note that not
  1033. all websocket implementations expose this data to
  1034. applications.
  1035. Consider using the ``ping_interval`` argument to
  1036. `websocket_connect` instead of sending pings manually.
  1037. .. versionadded:: 5.1
  1038. """
  1039. data = utf8(data)
  1040. if self.protocol is None:
  1041. raise WebSocketClosedError()
  1042. self.protocol.write_ping(data)
  1043. def on_pong(self, data):
  1044. pass
  1045. def on_ping(self, data):
  1046. pass
  1047. def get_websocket_protocol(self):
  1048. return WebSocketProtocol13(self, mask_outgoing=True,
  1049. compression_options=self.compression_options)
  1050. @property
  1051. def selected_subprotocol(self):
  1052. """The subprotocol selected by the server.
  1053. .. versionadded:: 5.1
  1054. """
  1055. return self.protocol.selected_subprotocol
  1056. def websocket_connect(url, callback=None, connect_timeout=None,
  1057. on_message_callback=None, compression_options=None,
  1058. ping_interval=None, ping_timeout=None,
  1059. max_message_size=_default_max_message_size, subprotocols=None):
  1060. """Client-side websocket support.
  1061. Takes a url and returns a Future whose result is a
  1062. `WebSocketClientConnection`.
  1063. ``compression_options`` is interpreted in the same way as the
  1064. return value of `.WebSocketHandler.get_compression_options`.
  1065. The connection supports two styles of operation. In the coroutine
  1066. style, the application typically calls
  1067. `~.WebSocketClientConnection.read_message` in a loop::
  1068. conn = yield websocket_connect(url)
  1069. while True:
  1070. msg = yield conn.read_message()
  1071. if msg is None: break
  1072. # Do something with msg
  1073. In the callback style, pass an ``on_message_callback`` to
  1074. ``websocket_connect``. In both styles, a message of ``None``
  1075. indicates that the connection has been closed.
  1076. ``subprotocols`` may be a list of strings specifying proposed
  1077. subprotocols. The selected protocol may be found on the
  1078. ``selected_subprotocol`` attribute of the connection object
  1079. when the connection is complete.
  1080. .. versionchanged:: 3.2
  1081. Also accepts ``HTTPRequest`` objects in place of urls.
  1082. .. versionchanged:: 4.1
  1083. Added ``compression_options`` and ``on_message_callback``.
  1084. .. versionchanged:: 4.5
  1085. Added the ``ping_interval``, ``ping_timeout``, and ``max_message_size``
  1086. arguments, which have the same meaning as in `WebSocketHandler`.
  1087. .. versionchanged:: 5.0
  1088. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  1089. .. versionchanged:: 5.1
  1090. Added the ``subprotocols`` argument.
  1091. """
  1092. if isinstance(url, httpclient.HTTPRequest):
  1093. assert connect_timeout is None
  1094. request = url
  1095. # Copy and convert the headers dict/object (see comments in
  1096. # AsyncHTTPClient.fetch)
  1097. request.headers = httputil.HTTPHeaders(request.headers)
  1098. else:
  1099. request = httpclient.HTTPRequest(url, connect_timeout=connect_timeout)
  1100. request = httpclient._RequestProxy(
  1101. request, httpclient.HTTPRequest._DEFAULTS)
  1102. conn = WebSocketClientConnection(request,
  1103. on_message_callback=on_message_callback,
  1104. compression_options=compression_options,
  1105. ping_interval=ping_interval,
  1106. ping_timeout=ping_timeout,
  1107. max_message_size=max_message_size,
  1108. subprotocols=subprotocols)
  1109. if callback is not None:
  1110. IOLoop.current().add_future(conn.connect_future, callback)
  1111. return conn.connect_future