streaming.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. # Tweepy
  2. # Copyright 2009-2010 Joshua Roesslein
  3. # See LICENSE for details.
  4. # Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
  5. from __future__ import absolute_import, print_function
  6. import logging
  7. import re
  8. import requests
  9. from requests.exceptions import Timeout
  10. from threading import Thread
  11. from time import sleep
  12. import six
  13. import ssl
  14. from tweepy.models import Status
  15. from tweepy.api import API
  16. from tweepy.error import TweepError
  17. from tweepy.utils import import_simplejson
  18. json = import_simplejson()
  19. STREAM_VERSION = '1.1'
  20. class StreamListener(object):
  21. def __init__(self, api=None):
  22. self.api = api or API()
  23. def on_connect(self):
  24. """Called once connected to streaming server.
  25. This will be invoked once a successful response
  26. is received from the server. Allows the listener
  27. to perform some work prior to entering the read loop.
  28. """
  29. pass
  30. def on_data(self, raw_data):
  31. """Called when raw data is received from connection.
  32. Override this method if you wish to manually handle
  33. the stream data. Return False to stop stream and close connection.
  34. """
  35. data = json.loads(raw_data)
  36. if 'in_reply_to_status_id' in data:
  37. status = Status.parse(self.api, data)
  38. if self.on_status(status) is False:
  39. return False
  40. elif 'delete' in data:
  41. delete = data['delete']['status']
  42. if self.on_delete(delete['id'], delete['user_id']) is False:
  43. return False
  44. elif 'event' in data:
  45. status = Status.parse(self.api, data)
  46. if self.on_event(status) is False:
  47. return False
  48. elif 'direct_message' in data:
  49. status = Status.parse(self.api, data)
  50. if self.on_direct_message(status) is False:
  51. return False
  52. elif 'friends' in data:
  53. if self.on_friends(data['friends']) is False:
  54. return False
  55. elif 'limit' in data:
  56. if self.on_limit(data['limit']['track']) is False:
  57. return False
  58. elif 'disconnect' in data:
  59. if self.on_disconnect(data['disconnect']) is False:
  60. return False
  61. elif 'warning' in data:
  62. if self.on_warning(data['warning']) is False:
  63. return False
  64. else:
  65. logging.error("Unknown message type: " + str(raw_data))
  66. def keep_alive(self):
  67. """Called when a keep-alive arrived"""
  68. return
  69. def on_status(self, status):
  70. """Called when a new status arrives"""
  71. return
  72. def on_exception(self, exception):
  73. """Called when an unhandled exception occurs."""
  74. return
  75. def on_delete(self, status_id, user_id):
  76. """Called when a delete notice arrives for a status"""
  77. return
  78. def on_event(self, status):
  79. """Called when a new event arrives"""
  80. return
  81. def on_direct_message(self, status):
  82. """Called when a new direct message arrives"""
  83. return
  84. def on_friends(self, friends):
  85. """Called when a friends list arrives.
  86. friends is a list that contains user_id
  87. """
  88. return
  89. def on_limit(self, track):
  90. """Called when a limitation notice arrives"""
  91. return
  92. def on_error(self, status_code):
  93. """Called when a non-200 status code is returned"""
  94. return False
  95. def on_timeout(self):
  96. """Called when stream connection times out"""
  97. return
  98. def on_disconnect(self, notice):
  99. """Called when twitter sends a disconnect notice
  100. Disconnect codes are listed here:
  101. https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
  102. """
  103. return
  104. def on_warning(self, notice):
  105. """Called when a disconnection warning message arrives"""
  106. return
  107. class ReadBuffer(object):
  108. """Buffer data from the response in a smarter way than httplib/requests can.
  109. Tweets are roughly in the 2-12kb range, averaging around 3kb.
  110. Requests/urllib3/httplib/socket all use socket.read, which blocks
  111. until enough data is returned. On some systems (eg google appengine), socket
  112. reads are quite slow. To combat this latency we can read big chunks,
  113. but the blocking part means we won't get results until enough tweets
  114. have arrived. That may not be a big deal for high throughput systems.
  115. For low throughput systems we don't want to sacrafice latency, so we
  116. use small chunks so it can read the length and the tweet in 2 read calls.
  117. """
  118. def __init__(self, stream, chunk_size, encoding='utf-8'):
  119. self._stream = stream
  120. self._buffer = six.b('')
  121. self._chunk_size = chunk_size
  122. self._encoding = encoding
  123. def read_len(self, length):
  124. while not self._stream.closed:
  125. if len(self._buffer) >= length:
  126. return self._pop(length)
  127. read_len = max(self._chunk_size, length - len(self._buffer))
  128. self._buffer += self._stream.read(read_len)
  129. def read_line(self, sep=six.b('\n')):
  130. """Read the data stream until a given separator is found (default \n)
  131. :param sep: Separator to read until. Must by of the bytes type (str in python 2,
  132. bytes in python 3)
  133. :return: The str of the data read until sep
  134. """
  135. start = 0
  136. while not self._stream.closed:
  137. loc = self._buffer.find(sep, start)
  138. if loc >= 0:
  139. return self._pop(loc + len(sep))
  140. else:
  141. start = len(self._buffer)
  142. self._buffer += self._stream.read(self._chunk_size)
  143. def _pop(self, length):
  144. r = self._buffer[:length]
  145. self._buffer = self._buffer[length:]
  146. return r.decode(self._encoding)
  147. class Stream(object):
  148. host = 'stream.twitter.com'
  149. def __init__(self, auth, listener, **options):
  150. self.auth = auth
  151. self.listener = listener
  152. self.running = False
  153. self.timeout = options.get("timeout", 300.0)
  154. self.retry_count = options.get("retry_count")
  155. # values according to
  156. # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
  157. self.retry_time_start = options.get("retry_time", 5.0)
  158. self.retry_420_start = options.get("retry_420", 60.0)
  159. self.retry_time_cap = options.get("retry_time_cap", 320.0)
  160. self.snooze_time_step = options.get("snooze_time", 0.25)
  161. self.snooze_time_cap = options.get("snooze_time_cap", 16)
  162. # The default socket.read size. Default to less than half the size of
  163. # a tweet so that it reads tweets with the minimal latency of 2 reads
  164. # per tweet. Values higher than ~1kb will increase latency by waiting
  165. # for more data to arrive but may also increase throughput by doing
  166. # fewer socket read calls.
  167. self.chunk_size = options.get("chunk_size", 512)
  168. self.verify = options.get("verify", True)
  169. self.api = API()
  170. self.headers = options.get("headers") or {}
  171. self.new_session()
  172. self.body = None
  173. self.retry_time = self.retry_time_start
  174. self.snooze_time = self.snooze_time_step
  175. def new_session(self):
  176. self.session = requests.Session()
  177. self.session.headers = self.headers
  178. self.session.params = None
  179. def _run(self):
  180. # Authenticate
  181. url = "https://%s%s" % (self.host, self.url)
  182. # Connect and process the stream
  183. error_counter = 0
  184. resp = None
  185. exception = None
  186. while self.running:
  187. if self.retry_count is not None:
  188. if error_counter > self.retry_count:
  189. # quit if error count greater than retry count
  190. break
  191. try:
  192. auth = self.auth.apply_auth()
  193. resp = self.session.request('POST',
  194. url,
  195. data=self.body,
  196. timeout=self.timeout,
  197. stream=True,
  198. auth=auth,
  199. verify=self.verify)
  200. if resp.status_code != 200:
  201. if self.listener.on_error(resp.status_code) is False:
  202. break
  203. error_counter += 1
  204. if resp.status_code == 420:
  205. self.retry_time = max(self.retry_420_start,
  206. self.retry_time)
  207. sleep(self.retry_time)
  208. self.retry_time = min(self.retry_time * 2,
  209. self.retry_time_cap)
  210. else:
  211. error_counter = 0
  212. self.retry_time = self.retry_time_start
  213. self.snooze_time = self.snooze_time_step
  214. self.listener.on_connect()
  215. self._read_loop(resp)
  216. except (Timeout, ssl.SSLError) as exc:
  217. # This is still necessary, as a SSLError can actually be
  218. # thrown when using Requests
  219. # If it's not time out treat it like any other exception
  220. if isinstance(exc, ssl.SSLError):
  221. if not (exc.args and 'timed out' in str(exc.args[0])):
  222. exception = exc
  223. break
  224. if self.listener.on_timeout() is False:
  225. break
  226. if self.running is False:
  227. break
  228. sleep(self.snooze_time)
  229. self.snooze_time = min(self.snooze_time + self.snooze_time_step,
  230. self.snooze_time_cap)
  231. except Exception as exc:
  232. exception = exc
  233. # any other exception is fatal, so kill loop
  234. break
  235. # cleanup
  236. self.running = False
  237. if resp:
  238. resp.close()
  239. self.new_session()
  240. if exception:
  241. # call a handler first so that the exception can be logged.
  242. self.listener.on_exception(exception)
  243. raise exception
  244. def _data(self, data):
  245. if self.listener.on_data(data) is False:
  246. self.running = False
  247. def _read_loop(self, resp):
  248. charset = resp.headers.get('content-type', default='')
  249. enc_search = re.search('charset=(?P<enc>\S*)', charset)
  250. if enc_search is not None:
  251. encoding = enc_search.group('enc')
  252. else:
  253. encoding = 'utf-8'
  254. buf = ReadBuffer(resp.raw, self.chunk_size, encoding=encoding)
  255. while self.running and not resp.raw.closed:
  256. length = 0
  257. while not resp.raw.closed:
  258. line = buf.read_line().strip()
  259. if not line:
  260. self.listener.keep_alive() # keep-alive new lines are expected
  261. elif line.isdigit():
  262. length = int(line)
  263. break
  264. else:
  265. raise TweepError('Expecting length, unexpected value found')
  266. next_status_obj = buf.read_len(length)
  267. if self.running:
  268. self._data(next_status_obj)
  269. # # Note: keep-alive newlines might be inserted before each length value.
  270. # # read until we get a digit...
  271. # c = b'\n'
  272. # for c in resp.iter_content(decode_unicode=True):
  273. # if c == b'\n':
  274. # continue
  275. # break
  276. #
  277. # delimited_string = c
  278. #
  279. # # read rest of delimiter length..
  280. # d = b''
  281. # for d in resp.iter_content(decode_unicode=True):
  282. # if d != b'\n':
  283. # delimited_string += d
  284. # continue
  285. # break
  286. #
  287. # # read the next twitter status object
  288. # if delimited_string.decode('utf-8').strip().isdigit():
  289. # status_id = int(delimited_string)
  290. # next_status_obj = resp.raw.read(status_id)
  291. # if self.running:
  292. # self._data(next_status_obj.decode('utf-8'))
  293. if resp.raw.closed:
  294. self.on_closed(resp)
  295. def _start(self, async):
  296. self.running = True
  297. if async:
  298. self._thread = Thread(target=self._run)
  299. self._thread.start()
  300. else:
  301. self._run()
  302. def on_closed(self, resp):
  303. """ Called when the response has been closed by Twitter """
  304. pass
  305. def userstream(self,
  306. stall_warnings=False,
  307. _with=None,
  308. replies=None,
  309. track=None,
  310. locations=None,
  311. async=False,
  312. encoding='utf8'):
  313. self.session.params = {'delimited': 'length'}
  314. if self.running:
  315. raise TweepError('Stream object already connected!')
  316. self.url = '/%s/user.json' % STREAM_VERSION
  317. self.host = 'userstream.twitter.com'
  318. if stall_warnings:
  319. self.session.params['stall_warnings'] = stall_warnings
  320. if _with:
  321. self.session.params['with'] = _with
  322. if replies:
  323. self.session.params['replies'] = replies
  324. if locations and len(locations) > 0:
  325. if len(locations) % 4 != 0:
  326. raise TweepError("Wrong number of locations points, "
  327. "it has to be a multiple of 4")
  328. self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
  329. if track:
  330. self.session.params['track'] = u','.join(track).encode(encoding)
  331. self._start(async)
  332. def firehose(self, count=None, async=False):
  333. self.session.params = {'delimited': 'length'}
  334. if self.running:
  335. raise TweepError('Stream object already connected!')
  336. self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
  337. if count:
  338. self.url += '&count=%s' % count
  339. self._start(async)
  340. def retweet(self, async=False):
  341. self.session.params = {'delimited': 'length'}
  342. if self.running:
  343. raise TweepError('Stream object already connected!')
  344. self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
  345. self._start(async)
  346. def sample(self, async=False, languages=None):
  347. self.session.params = {'delimited': 'length'}
  348. if self.running:
  349. raise TweepError('Stream object already connected!')
  350. self.url = '/%s/statuses/sample.json' % STREAM_VERSION
  351. if languages:
  352. self.session.params['language'] = ','.join(map(str, languages))
  353. self._start(async)
  354. def filter(self, follow=None, track=None, async=False, locations=None,
  355. stall_warnings=False, languages=None, encoding='utf8', filter_level=None):
  356. self.body = {}
  357. self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
  358. if self.running:
  359. raise TweepError('Stream object already connected!')
  360. self.url = '/%s/statuses/filter.json' % STREAM_VERSION
  361. if follow:
  362. self.body['follow'] = u','.join(follow).encode(encoding)
  363. if track:
  364. self.body['track'] = u','.join(track).encode(encoding)
  365. if locations and len(locations) > 0:
  366. if len(locations) % 4 != 0:
  367. raise TweepError("Wrong number of locations points, "
  368. "it has to be a multiple of 4")
  369. self.body['locations'] = u','.join(['%.4f' % l for l in locations])
  370. if stall_warnings:
  371. self.body['stall_warnings'] = stall_warnings
  372. if languages:
  373. self.body['language'] = u','.join(map(str, languages))
  374. if filter_level:
  375. self.body['filter_level'] = unicode(filter_level, encoding)
  376. self.session.params = {'delimited': 'length'}
  377. self.host = 'stream.twitter.com'
  378. self._start(async)
  379. def sitestream(self, follow, stall_warnings=False,
  380. with_='user', replies=False, async=False):
  381. self.body = {}
  382. if self.running:
  383. raise TweepError('Stream object already connected!')
  384. self.url = '/%s/site.json' % STREAM_VERSION
  385. self.body['follow'] = u','.join(map(six.text_type, follow))
  386. self.body['delimited'] = 'length'
  387. if stall_warnings:
  388. self.body['stall_warnings'] = stall_warnings
  389. if with_:
  390. self.body['with'] = with_
  391. if replies:
  392. self.body['replies'] = replies
  393. self._start(async)
  394. def disconnect(self):
  395. if self.running is False:
  396. return
  397. self.running = False