from __future__ import absolute_import from mock import Mock, patch from time import time from celery.events.dumper import ( humanize_type, Dumper, evdump, ) from celery.tests.case import AppCase, WhateverIO class test_Dumper(AppCase): def setup(self): self.out = WhateverIO() self.dumper = Dumper(out=self.out) def test_humanize_type(self): self.assertEqual(humanize_type('worker-offline'), 'shutdown') self.assertEqual(humanize_type('task-started'), 'task started') def test_format_task_event(self): self.dumper.format_task_event( 'worker@example.com', time(), 'task-started', 'tasks.add', {}) self.assertTrue(self.out.getvalue()) def test_on_event(self): event = { 'hostname': 'worker@example.com', 'timestamp': time(), 'uuid': '1ef', 'name': 'tasks.add', 'args': '(2, 2)', 'kwargs': '{}', } self.dumper.on_event(dict(event, type='task-received')) self.assertTrue(self.out.getvalue()) self.dumper.on_event(dict(event, type='task-revoked')) self.dumper.on_event(dict(event, type='worker-online')) @patch('celery.events.EventReceiver.capture') def test_evdump(self, capture): capture.side_effect = KeyboardInterrupt() evdump(app=self.app) def test_evdump_error_handler(self): app = Mock(name='app') with patch('celery.events.dumper.Dumper') as Dumper: Dumper.return_value = Mock(name='dumper') recv = app.events.Receiver.return_value = Mock() def se(*_a, **_k): recv.capture.side_effect = SystemExit() raise KeyError() recv.capture.side_effect = se Conn = app.connection.return_value = Mock(name='conn') conn = Conn.clone.return_value = Mock(name='cloned_conn') conn.connection_errors = (KeyError, ) conn.channel_errors = () evdump(app) self.assertTrue(conn.ensure_connection.called) errback = conn.ensure_connection.call_args[0][0] errback(KeyError(), 1) self.assertTrue(conn.as_uri.called)