from __future__ import absolute_import import errno import signal import sys from mock import Mock, patch from celery.bin.multi import ( main, MultiTool, findsig, abbreviations, parse_ns_range, format_opt, quote, NamespacedOptionParser, multi_args, __doc__ as doc, ) from celery.tests.case import AppCase, WhateverIO, SkipTest class test_functions(AppCase): def test_findsig(self): self.assertEqual(findsig(['a', 'b', 'c', '-1']), 1) self.assertEqual(findsig(['--foo=1', '-9']), 9) self.assertEqual(findsig(['-INT']), signal.SIGINT) self.assertEqual(findsig([]), signal.SIGTERM) self.assertEqual(findsig(['-s']), signal.SIGTERM) self.assertEqual(findsig(['-log']), signal.SIGTERM) def test_abbreviations(self): expander = abbreviations({'%s': 'START', '%x': 'STOP'}) self.assertEqual(expander('foo%s'), 'fooSTART') self.assertEqual(expander('foo%x'), 'fooSTOP') self.assertEqual(expander('foo%y'), 'foo%y') self.assertIsNone(expander(None)) def test_parse_ns_range(self): self.assertEqual(parse_ns_range('1-3', True), ['1', '2', '3']) self.assertEqual(parse_ns_range('1-3', False), ['1-3']) self.assertEqual(parse_ns_range( '1-3,10,11,20', True), ['1', '2', '3', '10', '11', '20'], ) def test_format_opt(self): self.assertEqual(format_opt('--foo', None), '--foo') self.assertEqual(format_opt('-c', 1), '-c 1') self.assertEqual(format_opt('--log', 'foo'), '--log=foo') def test_quote(self): self.assertEqual(quote("the 'quick"), "'the '\\''quick'") class test_NamespacedOptionParser(AppCase): def test_parse(self): x = NamespacedOptionParser(['-c:1,3', '4']) self.assertEqual(x.namespaces.get('1,3'), {'-c': '4'}) x = NamespacedOptionParser(['-c:jerry,elaine', '5', '--loglevel:kramer=DEBUG', '--flag', '--logfile=foo', '-Q', 'bar', 'a', 'b', '--', '.disable_rate_limits=1']) self.assertEqual(x.options, {'--logfile': 'foo', '-Q': 'bar', '--flag': None}) self.assertEqual(x.values, ['a', 'b']) self.assertEqual(x.namespaces.get('jerry,elaine'), {'-c': '5'}) self.assertEqual(x.namespaces.get('kramer'), {'--loglevel': 'DEBUG'}) self.assertEqual(x.passthrough, '-- .disable_rate_limits=1') class test_multi_args(AppCase): @patch('socket.gethostname') def test_parse(self, gethostname): p = NamespacedOptionParser([ '-c:jerry,elaine', '5', '--loglevel:kramer=DEBUG', '--flag', '--logfile=foo', '-Q', 'bar', 'jerry', 'elaine', 'kramer', '--', '.disable_rate_limits=1', ]) it = multi_args(p, cmd='COMMAND', append='*AP*', prefix='*P*', suffix='*S*') names = list(it) def assert_line_in(name, args): self.assertIn(name, [tup[0] for tup in names]) argv = None for item in names: if item[0] == name: argv = item[1] self.assertTrue(argv) for arg in args: self.assertIn(arg, argv) assert_line_in( '*P*jerry@*S*', ['COMMAND', '-n *P*jerry@*S*', '-Q bar', '-c 5', '--flag', '--logfile=foo', '-- .disable_rate_limits=1', '*AP*'], ) assert_line_in( '*P*elaine@*S*', ['COMMAND', '-n *P*elaine@*S*', '-Q bar', '-c 5', '--flag', '--logfile=foo', '-- .disable_rate_limits=1', '*AP*'], ) assert_line_in( '*P*kramer@*S*', ['COMMAND', '--loglevel=DEBUG', '-n *P*kramer@*S*', '-Q bar', '--flag', '--logfile=foo', '-- .disable_rate_limits=1', '*AP*'], ) expand = names[0][2] self.assertEqual(expand('%h'), '*P*jerry@*S*') self.assertEqual(expand('%n'), 'jerry') names2 = list(multi_args(p, cmd='COMMAND', append='', prefix='*P*', suffix='*S*')) self.assertEqual(names2[0][1][-1], '-- .disable_rate_limits=1') gethostname.return_value = 'example.com' p2 = NamespacedOptionParser(['10', '-c:1', '5']) names3 = list(multi_args(p2, cmd='COMMAND')) self.assertEqual(len(names3), 10) self.assertEqual( names3[0][0:2], ('celery1@example.com', ['COMMAND', '-n celery1@example.com', '-c 5', '']), ) for i, worker in enumerate(names3[1:]): self.assertEqual( worker[0:2], ('celery%s@example.com' % (i + 2), ['COMMAND', '-n celery%s@example.com' % (i + 2), '']), ) names4 = list(multi_args(p2, cmd='COMMAND', suffix='""')) self.assertEqual(len(names4), 10) self.assertEqual( names4[0][0:2], ('celery1@', ['COMMAND', '-n celery1@', '-c 5', '']), ) p3 = NamespacedOptionParser(['foo@', '-c:foo', '5']) names5 = list(multi_args(p3, cmd='COMMAND', suffix='""')) self.assertEqual( names5[0][0:2], ('foo@', ['COMMAND', '-n foo@', '-c 5', '']), ) class test_MultiTool(AppCase): def setup(self): self.fh = WhateverIO() self.env = {} self.t = MultiTool(env=self.env, fh=self.fh) def test_note(self): self.t.note('hello world') self.assertEqual(self.fh.getvalue(), 'hello world\n') def test_note_quiet(self): self.t.quiet = True self.t.note('hello world') self.assertFalse(self.fh.getvalue()) def test_info(self): self.t.verbose = True self.t.info('hello info') self.assertEqual(self.fh.getvalue(), 'hello info\n') def test_info_not_verbose(self): self.t.verbose = False self.t.info('hello info') self.assertFalse(self.fh.getvalue()) def test_error(self): self.t.say = Mock() self.t.usage = Mock() self.assertEqual(self.t.error('foo'), 1) self.t.say.assert_called_with('foo') self.t.usage.assert_called_with() self.t.say = Mock() self.assertEqual(self.t.error(), 1) self.assertFalse(self.t.say.called) self.assertEqual(self.t.retcode, 1) @patch('celery.bin.multi.Popen') def test_waitexec(self, Popen): self.t.note = Mock() pipe = Popen.return_value = Mock() pipe.wait.return_value = -10 self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 10) Popen.assert_called_with(['path', '-m', 'foo'], env=self.t.env) self.t.note.assert_called_with('* Child was terminated by signal 10') pipe.wait.return_value = 2 self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 2) self.t.note.assert_called_with( '* Child terminated with errorcode 2', ) pipe.wait.return_value = 0 self.assertFalse(self.t.waitexec(['-m', 'foo', 'path'])) def test_nosplash(self): self.t.nosplash = True self.t.splash() self.assertFalse(self.fh.getvalue()) def test_splash(self): self.t.nosplash = False self.t.splash() self.assertIn('celery multi', self.fh.getvalue()) def test_usage(self): self.t.usage() self.assertTrue(self.fh.getvalue()) def test_help(self): self.t.help([]) self.assertIn(doc, self.fh.getvalue()) def test_expand(self): self.t.expand(['foo%n', 'ask', 'klask', 'dask']) self.assertEqual( self.fh.getvalue(), 'fooask\nfooklask\nfoodask\n', ) def test_restart(self): stop = self.t._stop_nodes = Mock() self.t.restart(['jerry', 'george'], 'celery worker') waitexec = self.t.waitexec = Mock() self.assertTrue(stop.called) callback = stop.call_args[1]['callback'] self.assertTrue(callback) waitexec.return_value = 0 callback('jerry', ['arg'], 13) waitexec.assert_called_with(['arg']) self.assertIn('OK', self.fh.getvalue()) self.fh.seek(0) self.fh.truncate() waitexec.return_value = 1 callback('jerry', ['arg'], 13) self.assertIn('FAILED', self.fh.getvalue()) def test_stop(self): self.t.getpids = Mock() self.t.getpids.return_value = [2, 3, 4] self.t.shutdown_nodes = Mock() self.t.stop(['a', 'b', '-INT'], 'celery worker') self.t.shutdown_nodes.assert_called_with( [2, 3, 4], sig=signal.SIGINT, retry=None, callback=None, ) def test_kill(self): if not hasattr(signal, 'SIGKILL'): raise SkipTest('SIGKILL not supported by this platform') self.t.getpids = Mock() self.t.getpids.return_value = [ ('a', None, 10), ('b', None, 11), ('c', None, 12) ] sig = self.t.signal_node = Mock() self.t.kill(['a', 'b', 'c'], 'celery worker') sigs = sig.call_args_list self.assertEqual(len(sigs), 3) self.assertEqual(sigs[0][0], ('a', 10, signal.SIGKILL)) self.assertEqual(sigs[1][0], ('b', 11, signal.SIGKILL)) self.assertEqual(sigs[2][0], ('c', 12, signal.SIGKILL)) def prepare_pidfile_for_getpids(self, Pidfile): class pids(object): def __init__(self, path): self.path = path def read_pid(self): try: return {'foo.pid': 10, 'bar.pid': 11}[self.path] except KeyError: raise ValueError() Pidfile.side_effect = pids @patch('celery.bin.multi.Pidfile') @patch('socket.gethostname') def test_getpids(self, gethostname, Pidfile): gethostname.return_value = 'e.com' self.prepare_pidfile_for_getpids(Pidfile) callback = Mock() p = NamespacedOptionParser(['foo', 'bar', 'baz']) nodes = self.t.getpids(p, 'celery worker', callback=callback) node_0, node_1 = nodes self.assertEqual(node_0[0], 'foo@e.com') self.assertEqual( sorted(node_0[1]), sorted(('celery worker', '--pidfile=foo.pid', '-n foo@e.com', '')), ) self.assertEqual(node_0[2], 10) self.assertEqual(node_1[0], 'bar@e.com') self.assertEqual( sorted(node_1[1]), sorted(('celery worker', '--pidfile=bar.pid', '-n bar@e.com', '')), ) self.assertEqual(node_1[2], 11) self.assertTrue(callback.called) cargs, _ = callback.call_args self.assertEqual(cargs[0], 'baz@e.com') self.assertItemsEqual( cargs[1], ['celery worker', '--pidfile=baz.pid', '-n baz@e.com', ''], ) self.assertIsNone(cargs[2]) self.assertIn('DOWN', self.fh.getvalue()) # without callback, should work nodes = self.t.getpids(p, 'celery worker', callback=None) @patch('celery.bin.multi.Pidfile') @patch('socket.gethostname') @patch('celery.bin.multi.sleep') def test_shutdown_nodes(self, slepp, gethostname, Pidfile): gethostname.return_value = 'e.com' self.prepare_pidfile_for_getpids(Pidfile) self.assertIsNone(self.t.shutdown_nodes([])) self.t.signal_node = Mock() node_alive = self.t.node_alive = Mock() self.t.node_alive.return_value = False callback = Mock() self.t.stop(['foo', 'bar', 'baz'], 'celery worker', callback=callback) sigs = sorted(self.t.signal_node.call_args_list) self.assertEqual(len(sigs), 2) self.assertIn( ('foo@e.com', 10, signal.SIGTERM), [tup[0] for tup in sigs], ) self.assertIn( ('bar@e.com', 11, signal.SIGTERM), [tup[0] for tup in sigs], ) self.t.signal_node.return_value = False self.assertTrue(callback.called) self.t.stop(['foo', 'bar', 'baz'], 'celery worker', callback=None) def on_node_alive(pid): if node_alive.call_count > 4: return True return False self.t.signal_node.return_value = True self.t.node_alive.side_effect = on_node_alive self.t.stop(['foo', 'bar', 'baz'], 'celery worker', retry=True) @patch('os.kill') def test_node_alive(self, kill): kill.return_value = True self.assertTrue(self.t.node_alive(13)) esrch = OSError() esrch.errno = errno.ESRCH kill.side_effect = esrch self.assertFalse(self.t.node_alive(13)) kill.assert_called_with(13, 0) enoent = OSError() enoent.errno = errno.ENOENT kill.side_effect = enoent with self.assertRaises(OSError): self.t.node_alive(13) @patch('os.kill') def test_signal_node(self, kill): kill.return_value = True self.assertTrue(self.t.signal_node('foo', 13, 9)) esrch = OSError() esrch.errno = errno.ESRCH kill.side_effect = esrch self.assertFalse(self.t.signal_node('foo', 13, 9)) kill.assert_called_with(13, 9) self.assertIn('Could not signal foo', self.fh.getvalue()) enoent = OSError() enoent.errno = errno.ENOENT kill.side_effect = enoent with self.assertRaises(OSError): self.t.signal_node('foo', 13, 9) def test_start(self): self.t.waitexec = Mock() self.t.waitexec.return_value = 0 self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celery worker')) self.t.waitexec.return_value = 1 self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celery worker')) def test_show(self): self.t.show(['foo', 'bar', 'baz'], 'celery worker') self.assertTrue(self.fh.getvalue()) @patch('socket.gethostname') def test_get(self, gethostname): gethostname.return_value = 'e.com' self.t.get(['xuzzy@e.com', 'foo', 'bar', 'baz'], 'celery worker') self.assertFalse(self.fh.getvalue()) self.t.get(['foo@e.com', 'foo', 'bar', 'baz'], 'celery worker') self.assertTrue(self.fh.getvalue()) @patch('socket.gethostname') def test_names(self, gethostname): gethostname.return_value = 'e.com' self.t.names(['foo', 'bar', 'baz'], 'celery worker') self.assertIn('foo@e.com\nbar@e.com\nbaz@e.com', self.fh.getvalue()) def test_execute_from_commandline(self): start = self.t.commands['start'] = Mock() self.t.error = Mock() self.t.execute_from_commandline(['multi', 'start', 'foo', 'bar']) self.assertFalse(self.t.error.called) start.assert_called_with(['foo', 'bar'], 'celery worker') self.t.error = Mock() self.t.execute_from_commandline(['multi', 'frob', 'foo', 'bar']) self.t.error.assert_called_with('Invalid command: frob') self.t.error = Mock() self.t.execute_from_commandline(['multi']) self.t.error.assert_called_with() self.t.error = Mock() self.t.execute_from_commandline(['multi', '-foo']) self.t.error.assert_called_with() self.t.execute_from_commandline( ['multi', 'start', 'foo', '--nosplash', '--quiet', '-q', '--verbose', '--no-color'], ) self.assertTrue(self.t.nosplash) self.assertTrue(self.t.quiet) self.assertTrue(self.t.verbose) self.assertTrue(self.t.no_color) def test_stopwait(self): self.t._stop_nodes = Mock() self.t.stopwait(['foo', 'bar', 'baz'], 'celery worker') self.assertEqual(self.t._stop_nodes.call_args[1]['retry'], 2) @patch('celery.bin.multi.MultiTool') def test_main(self, MultiTool): m = MultiTool.return_value = Mock() with self.assertRaises(SystemExit): main() m.execute_from_commandline.assert_called_with(sys.argv)