123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- from __future__ import absolute_import
- import socket
- from kombu import Connection, Exchange, Queue, Consumer, Producer
- from kombu.tests.case import Case
- class test_MemoryTransport(Case):
- def setUp(self):
- self.c = Connection(transport='memory')
- self.e = Exchange('test_transport_memory')
- self.q = Queue('test_transport_memory',
- exchange=self.e,
- routing_key='test_transport_memory')
- self.q2 = Queue('test_transport_memory2',
- exchange=self.e,
- routing_key='test_transport_memory2')
- self.fanout = Exchange('test_transport_memory_fanout', type='fanout')
- self.q3 = Queue('test_transport_memory_fanout1',
- exchange=self.fanout)
- self.q4 = Queue('test_transport_memory_fanout2',
- exchange=self.fanout)
- def test_driver_version(self):
- self.assertTrue(self.c.transport.driver_version())
- def test_produce_consume_noack(self):
- channel = self.c.channel()
- producer = Producer(channel, self.e)
- consumer = Consumer(channel, self.q, no_ack=True)
- for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_memory')
- _received = []
- def callback(message_data, message):
- _received.append(message)
- consumer.register_callback(callback)
- consumer.consume()
- while 1:
- if len(_received) == 10:
- break
- self.c.drain_events()
- self.assertEqual(len(_received), 10)
- def test_produce_consume_fanout(self):
- producer = self.c.Producer()
- consumer = self.c.Consumer([self.q3, self.q4])
- producer.publish(
- {'hello': 'world'},
- declare=consumer.queues,
- exchange=self.fanout,
- )
- self.assertEqual(self.q3(self.c).get().payload, {'hello': 'world'})
- self.assertEqual(self.q4(self.c).get().payload, {'hello': 'world'})
- self.assertIsNone(self.q3(self.c).get())
- self.assertIsNone(self.q4(self.c).get())
- def test_produce_consume(self):
- channel = self.c.channel()
- producer = Producer(channel, self.e)
- consumer1 = Consumer(channel, self.q)
- consumer2 = Consumer(channel, self.q2)
- self.q2(channel).declare()
- for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_memory')
- for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_memory2')
- _received1 = []
- _received2 = []
- def callback1(message_data, message):
- _received1.append(message)
- message.ack()
- def callback2(message_data, message):
- _received2.append(message)
- message.ack()
- consumer1.register_callback(callback1)
- consumer2.register_callback(callback2)
- consumer1.consume()
- consumer2.consume()
- while 1:
- if len(_received1) + len(_received2) == 20:
- break
- self.c.drain_events()
- self.assertEqual(len(_received1) + len(_received2), 20)
- # compression
- producer.publish({'compressed': True},
- routing_key='test_transport_memory',
- compression='zlib')
- m = self.q(channel).get()
- self.assertDictEqual(m.payload, {'compressed': True})
- # queue.delete
- for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_memory')
- self.assertTrue(self.q(channel).get())
- self.q(channel).delete()
- self.q(channel).declare()
- self.assertIsNone(self.q(channel).get())
- # queue.purge
- for i in range(10):
- producer.publish({'foo': i}, routing_key='test_transport_memory2')
- self.assertTrue(self.q2(channel).get())
- self.q2(channel).purge()
- self.assertIsNone(self.q2(channel).get())
- def test_drain_events(self):
- with self.assertRaises(socket.timeout):
- self.c.drain_events(timeout=0.1)
- c1 = self.c.channel()
- c2 = self.c.channel()
- with self.assertRaises(socket.timeout):
- self.c.drain_events(timeout=0.1)
- del(c1) # so pyflakes doesn't complain.
- del(c2)
- def test_drain_events_unregistered_queue(self):
- c1 = self.c.channel()
- class Cycle(object):
- def get(self, timeout=None):
- return ('foo', 'foo'), c1
- self.c.transport.cycle = Cycle()
- with self.assertRaises(KeyError):
- self.c.drain_events()
- def test_queue_for(self):
- chan = self.c.channel()
- chan.queues.clear()
- x = chan._queue_for('foo')
- self.assertTrue(x)
- self.assertIs(chan._queue_for('foo'), x)
|