test_state.py 16 KB


  1. from __future__ import absolute_import
  2. import pickle
  3. from random import shuffle
  4. from time import time
  5. from itertools import count
  6. from mock import patch
  7. from celery import states
  8. from celery.events import Event
  9. from celery.events.state import (
  10. State,
  11. Worker,
  12. Task,
  13. HEARTBEAT_EXPIRE_WINDOW,
  14. HEARTBEAT_DRIFT_MAX,
  15. )
  16. from celery.five import range
  17. from celery.utils import uuid
  18. from celery.tests.case import AppCase
  19. class replay(object):
  20. def __init__(self, state):
  21. self.state = state
  22. self.rewind()
  23. self.setup()
  24. self.current_clock = 0
  25. def setup(self):
  26. pass
  27. def next_event(self):
  28. ev = self.events[next(self.position)]
  29. ev['local_received'] = ev['timestamp']
  30. self.current_clock = ev.get('clock') or self.current_clock + 1
  31. return ev
  32. def __iter__(self):
  33. return self
  34. def __next__(self):
  35. try:
  36. self.state.event(self.next_event())
  37. except IndexError:
  38. raise StopIteration()
  39. next = __next__
  40. def rewind(self):
  41. self.position = count(0)
  42. return self
  43. def play(self):
  44. for _ in self:
  45. pass
  46. class ev_worker_online_offline(replay):
  47. def setup(self):
  48. self.events = [
  49. Event('worker-online', hostname='utest1'),
  50. Event('worker-offline', hostname='utest1'),
  51. ]
  52. class ev_worker_heartbeats(replay):
  53. def setup(self):
  54. self.events = [
  55. Event('worker-heartbeat', hostname='utest1',
  56. timestamp=time() - HEARTBEAT_EXPIRE_WINDOW * 2),
  57. Event('worker-heartbeat', hostname='utest1'),
  58. ]
  59. class ev_task_states(replay):
  60. def setup(self):
  61. tid = self.tid = uuid()
  62. self.events = [
  63. Event('task-received', uuid=tid, name='task1',
  64. args='(2, 2)', kwargs="{'foo': 'bar'}",
  65. retries=0, eta=None, hostname='utest1'),
  66. Event('task-started', uuid=tid, hostname='utest1'),
  67. Event('task-revoked', uuid=tid, hostname='utest1'),
  68. Event('task-retried', uuid=tid, exception="KeyError('bar')",
  69. traceback='line 2 at main', hostname='utest1'),
  70. Event('task-failed', uuid=tid, exception="KeyError('foo')",
  71. traceback='line 1 at main', hostname='utest1'),
  72. Event('task-succeeded', uuid=tid, result='4',
  73. runtime=0.1234, hostname='utest1'),
  74. ]
  75. def QTEV(type, uuid, hostname, clock, timestamp=None):
  76. """Quick task event."""
  77. return Event('task-{0}'.format(type), uuid=uuid, hostname=hostname,
  78. clock=clock, timestamp=timestamp or time())
  79. class ev_logical_clock_ordering(replay):
  80. def __init__(self, state, offset=0, uids=None):
  81. self.offset = offset or 0
  82. self.uids = self.setuids(uids)
  83. super(ev_logical_clock_ordering, self).__init__(state)
  84. def setuids(self, uids):
  85. uids = self.tA, self.tB, self.tC = uids or [uuid(), uuid(), uuid()]
  86. return uids
  87. def setup(self):
  88. offset = self.offset
  89. tA, tB, tC = self.uids
  90. self.events = [
  91. QTEV('received', tA, 'w1', clock=offset + 1),
  92. QTEV('received', tB, 'w2', clock=offset + 1),
  93. QTEV('started', tA, 'w1', clock=offset + 3),
  94. QTEV('received', tC, 'w2', clock=offset + 3),
  95. QTEV('started', tB, 'w2', clock=offset + 5),
  96. QTEV('retried', tA, 'w1', clock=offset + 7),
  97. QTEV('succeeded', tB, 'w2', clock=offset + 9),
  98. QTEV('started', tC, 'w2', clock=offset + 10),
  99. QTEV('received', tA, 'w3', clock=offset + 13),
  100. QTEV('succeded', tC, 'w2', clock=offset + 12),
  101. QTEV('started', tA, 'w3', clock=offset + 14),
  102. QTEV('succeeded', tA, 'w3', clock=offset + 16),
  103. ]
  104. def rewind_with_offset(self, offset, uids=None):
  105. self.offset = offset
  106. self.uids = self.setuids(uids or self.uids)
  107. self.setup()
  108. self.rewind()
  109. class ev_snapshot(replay):
  110. def setup(self):
  111. self.events = [
  112. Event('worker-online', hostname='utest1'),
  113. Event('worker-online', hostname='utest2'),
  114. Event('worker-online', hostname='utest3'),
  115. ]
  116. for i in range(20):
  117. worker = not i % 2 and 'utest2' or 'utest1'
  118. type = not i % 2 and 'task2' or 'task1'
  119. self.events.append(Event('task-received', name=type,
  120. uuid=uuid(), hostname=worker))
  121. class test_Worker(AppCase):
  122. def test_equality(self):
  123. self.assertEqual(Worker(hostname='foo').hostname, 'foo')
  124. self.assertEqual(
  125. Worker(hostname='foo'), Worker(hostname='foo'),
  126. )
  127. self.assertNotEqual(
  128. Worker(hostname='foo'), Worker(hostname='bar'),
  129. )
  130. self.assertEqual(
  131. hash(Worker(hostname='foo')), hash(Worker(hostname='foo')),
  132. )
  133. self.assertNotEqual(
  134. hash(Worker(hostname='foo')), hash(Worker(hostname='bar')),
  135. )
  136. def test_survives_missing_timestamp(self):
  137. worker = Worker(hostname='foo')
  138. worker.on_heartbeat(timestamp=None)
  139. self.assertEqual(worker.heartbeats, [])
  140. def test_repr(self):
  141. self.assertTrue(repr(Worker(hostname='foo')))
  142. def test_drift_warning(self):
  143. worker = Worker(hostname='foo')
  144. with patch('celery.events.state.warn') as warn:
  145. worker.update_heartbeat(time(), time() + (HEARTBEAT_DRIFT_MAX * 2))
  146. self.assertTrue(warn.called)
  147. self.assertIn('Substantial drift', warn.call_args[0][0])
  148. def test_update_heartbeat(self):
  149. worker = Worker(hostname='foo')
  150. worker.update_heartbeat(time(), time())
  151. self.assertEqual(len(worker.heartbeats), 1)
  152. worker.update_heartbeat(time() - 10, time())
  153. self.assertEqual(len(worker.heartbeats), 1)
  154. class test_Task(AppCase):
  155. def test_equality(self):
  156. self.assertEqual(Task(uuid='foo').uuid, 'foo')
  157. self.assertEqual(
  158. Task(uuid='foo'), Task(uuid='foo'),
  159. )
  160. self.assertNotEqual(
  161. Task(uuid='foo'), Task(uuid='bar'),
  162. )
  163. self.assertEqual(
  164. hash(Task(uuid='foo')), hash(Task(uuid='foo')),
  165. )
  166. self.assertNotEqual(
  167. hash(Task(uuid='foo')), hash(Task(uuid='bar')),
  168. )
  169. def test_info(self):
  170. task = Task(uuid='abcdefg',
  171. name='tasks.add',
  172. args='(2, 2)',
  173. kwargs='{}',
  174. retries=2,
  175. result=42,
  176. eta=1,
  177. runtime=0.0001,
  178. expires=1,
  179. foo=None,
  180. exception=1,
  181. received=time() - 10,
  182. started=time() - 8,
  183. exchange='celery',
  184. routing_key='celery',
  185. succeeded=time())
  186. self.assertEqual(sorted(list(task._info_fields)),
  187. sorted(task.info().keys()))
  188. self.assertEqual(sorted(list(task._info_fields + ('received', ))),
  189. sorted(task.info(extra=('received', ))))
  190. self.assertEqual(sorted(['args', 'kwargs']),
  191. sorted(task.info(['args', 'kwargs']).keys()))
  192. self.assertFalse(list(task.info('foo')))
  193. def test_ready(self):
  194. task = Task(uuid='abcdefg',
  195. name='tasks.add')
  196. task.on_received(timestamp=time())
  197. self.assertFalse(task.ready)
  198. task.on_succeeded(timestamp=time())
  199. self.assertTrue(task.ready)
  200. def test_sent(self):
  201. task = Task(uuid='abcdefg',
  202. name='tasks.add')
  203. task.on_sent(timestamp=time())
  204. self.assertEqual(task.state, states.PENDING)
  205. def test_merge(self):
  206. task = Task()
  207. task.on_failed(timestamp=time())
  208. task.on_started(timestamp=time())
  209. task.on_received(timestamp=time(), name='tasks.add', args=(2, 2))
  210. self.assertEqual(task.state, states.FAILURE)
  211. self.assertEqual(task.name, 'tasks.add')
  212. self.assertTupleEqual(task.args, (2, 2))
  213. task.on_retried(timestamp=time())
  214. self.assertEqual(task.state, states.RETRY)
  215. def test_repr(self):
  216. self.assertTrue(repr(Task(uuid='xxx', name='tasks.add')))
  217. class test_State(AppCase):
  218. def test_repr(self):
  219. self.assertTrue(repr(State()))
  220. def test_pickleable(self):
  221. self.assertTrue(pickle.loads(pickle.dumps(State())))
  222. def test_task_logical_clock_ordering(self):
  223. state = State()
  224. r = ev_logical_clock_ordering(state)
  225. tA, tB, tC = r.uids
  226. r.play()
  227. now = list(state.tasks_by_time())
  228. self.assertEqual(now[0][0], tA)
  229. self.assertEqual(now[1][0], tC)
  230. self.assertEqual(now[2][0], tB)
  231. for _ in range(1000):
  232. shuffle(r.uids)
  233. tA, tB, tC = r.uids
  234. r.rewind_with_offset(r.current_clock + 1, r.uids)
  235. r.play()
  236. now = list(state.tasks_by_time())
  237. self.assertEqual(now[0][0], tA)
  238. self.assertEqual(now[1][0], tC)
  239. self.assertEqual(now[2][0], tB)
  240. def test_worker_online_offline(self):
  241. r = ev_worker_online_offline(State())
  242. next(r)
  243. self.assertTrue(r.state.alive_workers())
  244. self.assertTrue(r.state.workers['utest1'].alive)
  245. r.play()
  246. self.assertFalse(r.state.alive_workers())
  247. self.assertFalse(r.state.workers['utest1'].alive)
  248. def test_itertasks(self):
  249. s = State()
  250. s.tasks = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'}
  251. self.assertEqual(len(list(s.itertasks(limit=2))), 2)
  252. def test_worker_heartbeat_expire(self):
  253. r = ev_worker_heartbeats(State())
  254. next(r)
  255. self.assertFalse(r.state.alive_workers())
  256. self.assertFalse(r.state.workers['utest1'].alive)
  257. r.play()
  258. self.assertTrue(r.state.alive_workers())
  259. self.assertTrue(r.state.workers['utest1'].alive)
  260. def test_task_states(self):
  261. r = ev_task_states(State())
  262. # RECEIVED
  263. next(r)
  264. self.assertTrue(r.tid in r.state.tasks)
  265. task = r.state.tasks[r.tid]
  266. self.assertEqual(task.state, states.RECEIVED)
  267. self.assertTrue(task.received)
  268. self.assertEqual(task.timestamp, task.received)
  269. self.assertEqual(task.worker.hostname, 'utest1')
  270. # STARTED
  271. next(r)
  272. self.assertTrue(r.state.workers['utest1'].alive,
  273. 'any task event adds worker heartbeat')
  274. self.assertEqual(task.state, states.STARTED)
  275. self.assertTrue(task.started)
  276. self.assertEqual(task.timestamp, task.started)
  277. self.assertEqual(task.worker.hostname, 'utest1')
  278. # REVOKED
  279. next(r)
  280. self.assertEqual(task.state, states.REVOKED)
  281. self.assertTrue(task.revoked)
  282. self.assertEqual(task.timestamp, task.revoked)
  283. self.assertEqual(task.worker.hostname, 'utest1')
  284. # RETRY
  285. next(r)
  286. self.assertEqual(task.state, states.RETRY)
  287. self.assertTrue(task.retried)
  288. self.assertEqual(task.timestamp, task.retried)
  289. self.assertEqual(task.worker.hostname, 'utest1')
  290. self.assertEqual(task.exception, "KeyError('bar')")
  291. self.assertEqual(task.traceback, 'line 2 at main')
  292. # FAILURE
  293. next(r)
  294. self.assertEqual(task.state, states.FAILURE)
  295. self.assertTrue(task.failed)
  296. self.assertEqual(task.timestamp, task.failed)
  297. self.assertEqual(task.worker.hostname, 'utest1')
  298. self.assertEqual(task.exception, "KeyError('foo')")
  299. self.assertEqual(task.traceback, 'line 1 at main')
  300. # SUCCESS
  301. next(r)
  302. self.assertEqual(task.state, states.SUCCESS)
  303. self.assertTrue(task.succeeded)
  304. self.assertEqual(task.timestamp, task.succeeded)
  305. self.assertEqual(task.worker.hostname, 'utest1')
  306. self.assertEqual(task.result, '4')
  307. self.assertEqual(task.runtime, 0.1234)
  308. def assertStateEmpty(self, state):
  309. self.assertFalse(state.tasks)
  310. self.assertFalse(state.workers)
  311. self.assertFalse(state.event_count)
  312. self.assertFalse(state.task_count)
  313. def assertState(self, state):
  314. self.assertTrue(state.tasks)
  315. self.assertTrue(state.workers)
  316. self.assertTrue(state.event_count)
  317. self.assertTrue(state.task_count)
  318. def test_freeze_while(self):
  319. s = State()
  320. r = ev_snapshot(s)
  321. r.play()
  322. def work():
  323. pass
  324. s.freeze_while(work, clear_after=True)
  325. self.assertFalse(s.event_count)
  326. s2 = State()
  327. r = ev_snapshot(s2)
  328. r.play()
  329. s2.freeze_while(work, clear_after=False)
  330. self.assertTrue(s2.event_count)
  331. def test_clear_tasks(self):
  332. s = State()
  333. r = ev_snapshot(s)
  334. r.play()
  335. self.assertTrue(s.tasks)
  336. s.clear_tasks(ready=False)
  337. self.assertFalse(s.tasks)
  338. def test_clear(self):
  339. r = ev_snapshot(State())
  340. r.play()
  341. self.assertTrue(r.state.event_count)
  342. self.assertTrue(r.state.workers)
  343. self.assertTrue(r.state.tasks)
  344. self.assertTrue(r.state.task_count)
  345. r.state.clear()
  346. self.assertFalse(r.state.event_count)
  347. self.assertFalse(r.state.workers)
  348. self.assertTrue(r.state.tasks)
  349. self.assertFalse(r.state.task_count)
  350. r.state.clear(False)
  351. self.assertFalse(r.state.tasks)
  352. def test_task_types(self):
  353. r = ev_snapshot(State())
  354. r.play()
  355. self.assertEqual(sorted(r.state.task_types()), ['task1', 'task2'])
  356. def test_tasks_by_timestamp(self):
  357. r = ev_snapshot(State())
  358. r.play()
  359. self.assertEqual(len(list(r.state.tasks_by_timestamp())), 20)
  360. def test_tasks_by_type(self):
  361. r = ev_snapshot(State())
  362. r.play()
  363. self.assertEqual(len(list(r.state.tasks_by_type('task1'))), 10)
  364. self.assertEqual(len(list(r.state.tasks_by_type('task2'))), 10)
  365. def test_alive_workers(self):
  366. r = ev_snapshot(State())
  367. r.play()
  368. self.assertEqual(len(r.state.alive_workers()), 3)
  369. def test_tasks_by_worker(self):
  370. r = ev_snapshot(State())
  371. r.play()
  372. self.assertEqual(len(list(r.state.tasks_by_worker('utest1'))), 10)
  373. self.assertEqual(len(list(r.state.tasks_by_worker('utest2'))), 10)
  374. def test_survives_unknown_worker_event(self):
  375. s = State()
  376. s.worker_event('worker-unknown-event-xxx', {'foo': 'bar'})
  377. s.worker_event('worker-unknown-event-xxx', {'hostname': 'xxx',
  378. 'foo': 'bar'})
  379. def test_survives_unknown_task_event(self):
  380. s = State()
  381. s.task_event('task-unknown-event-xxx', {'foo': 'bar',
  382. 'uuid': 'x',
  383. 'hostname': 'y'})
  384. def test_limits_maxtasks(self):
  385. s = State()
  386. s.max_tasks_in_memory = 1
  387. s.task_event('task-unknown-event-xxx', {'foo': 'bar',
  388. 'uuid': 'x',
  389. 'hostname': 'y',
  390. 'clock': 3})
  391. s.task_event('task-unknown-event-xxx', {'foo': 'bar',
  392. 'uuid': 'y',
  393. 'hostname': 'y',
  394. 'clock': 4})
  395. s.task_event('task-unknown-event-xxx', {'foo': 'bar',
  396. 'uuid': 'z',
  397. 'hostname': 'y',
  398. 'clock': 5})
  399. self.assertEqual(len(s._taskheap), 2)
  400. self.assertEqual(s._taskheap[0].clock, 4)
  401. self.assertEqual(s._taskheap[1].clock, 5)
  402. s._taskheap.append(s._taskheap[0])
  403. self.assertTrue(list(s.tasks_by_time()))
  404. def test_callback(self):
  405. scratch = {}
  406. def callback(state, event):
  407. scratch['recv'] = True
  408. s = State(callback=callback)
  409. s.event({'type': 'worker-online'})
  410. self.assertTrue(scratch.get('recv'))