from __future__ import absolute_import import socket from mock import Mock from pickle import loads, dumps from celery import states from celery.exceptions import ImproperlyConfigured from celery.tests.case import AppCase, mock_module, depends_on_current_app class Object(object): pass def install_exceptions(mod): # py3k: cannot catch exceptions not ineheriting from BaseException. class NotFoundException(Exception): pass class TException(Exception): pass class InvalidRequestException(Exception): pass class UnavailableException(Exception): pass class TimedOutException(Exception): pass class AllServersUnavailable(Exception): pass mod.NotFoundException = NotFoundException mod.TException = TException mod.InvalidRequestException = InvalidRequestException mod.TimedOutException = TimedOutException mod.UnavailableException = UnavailableException mod.AllServersUnavailable = AllServersUnavailable class test_CassandraBackend(AppCase): def setup(self): self.app.conf.update( CASSANDRA_SERVERS=['example.com'], CASSANDRA_KEYSPACE='keyspace', CASSANDRA_COLUMN_FAMILY='columns', ) def test_init_no_pycassa(self): with mock_module('pycassa'): from celery.backends import cassandra as mod prev, mod.pycassa = mod.pycassa, None try: with self.assertRaises(ImproperlyConfigured): mod.CassandraBackend(app=self.app) finally: mod.pycassa = prev def test_init_with_and_without_LOCAL_QUROM(self): with mock_module('pycassa'): from celery.backends import cassandra as mod mod.pycassa = Mock() install_exceptions(mod.pycassa) cons = mod.pycassa.ConsistencyLevel = Object() cons.LOCAL_QUORUM = 'foo' self.app.conf.CASSANDRA_READ_CONSISTENCY = 'LOCAL_FOO' self.app.conf.CASSANDRA_WRITE_CONSISTENCY = 'LOCAL_FOO' mod.CassandraBackend(app=self.app) cons.LOCAL_FOO = 'bar' mod.CassandraBackend(app=self.app) # no servers raises ImproperlyConfigured with self.assertRaises(ImproperlyConfigured): self.app.conf.CASSANDRA_SERVERS = None mod.CassandraBackend( app=self.app, keyspace='b', column_family='c', ) @depends_on_current_app def test_reduce(self): with mock_module('pycassa'): from celery.backends.cassandra import CassandraBackend self.assertTrue(loads(dumps(CassandraBackend(app=self.app)))) def test_get_task_meta_for(self): with mock_module('pycassa'): from celery.backends import cassandra as mod mod.pycassa = Mock() install_exceptions(mod.pycassa) mod.Thrift = Mock() install_exceptions(mod.Thrift) x = mod.CassandraBackend(app=self.app) Get_Column = x._get_column_family = Mock() get_column = Get_Column.return_value = Mock() get = get_column.get META = get.return_value = { 'task_id': 'task_id', 'status': states.SUCCESS, 'result': '1', 'date_done': 'date', 'traceback': '', 'children': None, } x.decode = Mock() x.detailed_mode = False meta = x._get_task_meta_for('task_id') self.assertEqual(meta['status'], states.SUCCESS) x.detailed_mode = True row = get.return_value = Mock() row.values.return_value = [Mock()] x.decode.return_value = META meta = x._get_task_meta_for('task_id') self.assertEqual(meta['status'], states.SUCCESS) x.decode.return_value = Mock() x.detailed_mode = False get.side_effect = KeyError() meta = x._get_task_meta_for('task_id') self.assertEqual(meta['status'], states.PENDING) calls = [0] end = [10] def work_eventually(*arg): try: if calls[0] > end[0]: return META raise socket.error() finally: calls[0] += 1 get.side_effect = work_eventually x._retry_timeout = 10 x._retry_wait = 0.01 meta = x._get_task_meta_for('task') self.assertEqual(meta['status'], states.SUCCESS) x._retry_timeout = 0.1 calls[0], end[0] = 0, 100 with self.assertRaises(socket.error): x._get_task_meta_for('task') def test_store_result(self): with mock_module('pycassa'): from celery.backends import cassandra as mod mod.pycassa = Mock() install_exceptions(mod.pycassa) mod.Thrift = Mock() install_exceptions(mod.Thrift) x = mod.CassandraBackend(app=self.app) Get_Column = x._get_column_family = Mock() cf = Get_Column.return_value = Mock() x.detailed_mode = False x._store_result('task_id', 'result', states.SUCCESS) self.assertTrue(cf.insert.called) cf.insert.reset() x.detailed_mode = True x._store_result('task_id', 'result', states.SUCCESS) self.assertTrue(cf.insert.called) def test_process_cleanup(self): with mock_module('pycassa'): from celery.backends import cassandra as mod x = mod.CassandraBackend(app=self.app) x._column_family = None x.process_cleanup() x._column_family = True x.process_cleanup() self.assertIsNone(x._column_family) def test_get_column_family(self): with mock_module('pycassa'): from celery.backends import cassandra as mod mod.pycassa = Mock() install_exceptions(mod.pycassa) x = mod.CassandraBackend(app=self.app) self.assertTrue(x._get_column_family()) self.assertIsNotNone(x._column_family) self.assertIs(x._get_column_family(), x._column_family)