from __future__ import absolute_import, unicode_literals from datetime import datetime from nose import SkipTest from pickle import loads, dumps from celery import states from celery.exceptions import ImproperlyConfigured from celery.utils import uuid from celery.tests.case import ( AppCase, depends_on_current_app, mask_modules, skip_if_pypy, skip_if_jython, ) try: import sqlalchemy # noqa except ImportError: DatabaseBackend = Task = TaskSet = retry = None # noqa else: from celery.backends.database import DatabaseBackend, retry from celery.backends.database.models import Task, TaskSet class SomeClass(object): def __init__(self, data): self.data = data class test_DatabaseBackend(AppCase): @skip_if_pypy @skip_if_jython def setup(self): if DatabaseBackend is None: raise SkipTest('sqlalchemy not installed') self.uri = 'sqlite:///test.db' def test_retry_helper(self): from celery.backends.database import OperationalError calls = [0] @retry def raises(): calls[0] += 1 raise OperationalError(1, 2, 3) with self.assertRaises(OperationalError): raises(max_retries=5) self.assertEqual(calls[0], 5) def test_missing_SQLAlchemy_raises_ImproperlyConfigured(self): with mask_modules('sqlalchemy'): from celery.backends.database import _sqlalchemy_installed with self.assertRaises(ImproperlyConfigured): _sqlalchemy_installed() def test_missing_dburi_raises_ImproperlyConfigured(self): self.app.conf.CELERY_RESULT_DBURI = None with self.assertRaises(ImproperlyConfigured): DatabaseBackend(app=self.app) def test_missing_task_id_is_PENDING(self): tb = DatabaseBackend(self.uri, app=self.app) self.assertEqual(tb.get_status('xxx-does-not-exist'), states.PENDING) def test_missing_task_meta_is_dict_with_pending(self): tb = DatabaseBackend(self.uri, app=self.app) self.assertDictContainsSubset({ 'status': states.PENDING, 'task_id': 'xxx-does-not-exist-at-all', 'result': None, 'traceback': None }, tb.get_task_meta('xxx-does-not-exist-at-all')) def test_mark_as_done(self): tb = DatabaseBackend(self.uri, app=self.app) tid = uuid() self.assertEqual(tb.get_status(tid), states.PENDING) self.assertIsNone(tb.get_result(tid)) tb.mark_as_done(tid, 42) self.assertEqual(tb.get_status(tid), states.SUCCESS) self.assertEqual(tb.get_result(tid), 42) def test_is_pickled(self): tb = DatabaseBackend(self.uri, app=self.app) tid2 = uuid() result = {'foo': 'baz', 'bar': SomeClass(12345)} tb.mark_as_done(tid2, result) # is serialized properly. rindb = tb.get_result(tid2) self.assertEqual(rindb.get('foo'), 'baz') self.assertEqual(rindb.get('bar').data, 12345) def test_mark_as_started(self): tb = DatabaseBackend(self.uri, app=self.app) tid = uuid() tb.mark_as_started(tid) self.assertEqual(tb.get_status(tid), states.STARTED) def test_mark_as_revoked(self): tb = DatabaseBackend(self.uri, app=self.app) tid = uuid() tb.mark_as_revoked(tid) self.assertEqual(tb.get_status(tid), states.REVOKED) def test_mark_as_retry(self): tb = DatabaseBackend(self.uri, app=self.app) tid = uuid() try: raise KeyError('foo') except KeyError as exception: import traceback trace = '\n'.join(traceback.format_stack()) tb.mark_as_retry(tid, exception, traceback=trace) self.assertEqual(tb.get_status(tid), states.RETRY) self.assertIsInstance(tb.get_result(tid), KeyError) self.assertEqual(tb.get_traceback(tid), trace) def test_mark_as_failure(self): tb = DatabaseBackend(self.uri, app=self.app) tid3 = uuid() try: raise KeyError('foo') except KeyError as exception: import traceback trace = '\n'.join(traceback.format_stack()) tb.mark_as_failure(tid3, exception, traceback=trace) self.assertEqual(tb.get_status(tid3), states.FAILURE) self.assertIsInstance(tb.get_result(tid3), KeyError) self.assertEqual(tb.get_traceback(tid3), trace) def test_forget(self): tb = DatabaseBackend(self.uri, backend='memory://', app=self.app) tid = uuid() tb.mark_as_done(tid, {'foo': 'bar'}) tb.mark_as_done(tid, {'foo': 'bar'}) x = self.app.AsyncResult(tid, backend=tb) x.forget() self.assertIsNone(x.result) def test_process_cleanup(self): tb = DatabaseBackend(self.uri, app=self.app) tb.process_cleanup() @depends_on_current_app def test_reduce(self): tb = DatabaseBackend(self.uri, app=self.app) self.assertTrue(loads(dumps(tb))) def test_save__restore__delete_group(self): tb = DatabaseBackend(self.uri, app=self.app) tid = uuid() res = {'something': 'special'} self.assertEqual(tb.save_group(tid, res), res) res2 = tb.restore_group(tid) self.assertEqual(res2, res) tb.delete_group(tid) self.assertIsNone(tb.restore_group(tid)) self.assertIsNone(tb.restore_group('xxx-nonexisting-id')) def test_cleanup(self): tb = DatabaseBackend(self.uri, app=self.app) for i in range(10): tb.mark_as_done(uuid(), 42) tb.save_group(uuid(), {'foo': 'bar'}) s = tb.ResultSession() for t in s.query(Task).all(): t.date_done = datetime.now() - tb.expires * 2 for t in s.query(TaskSet).all(): t.date_done = datetime.now() - tb.expires * 2 s.commit() s.close() tb.cleanup() def test_Task__repr__(self): self.assertIn('foo', repr(Task('foo'))) def test_TaskSet__repr__(self): self.assertIn('foo', repr(TaskSet('foo', None)))