test_pcp.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. # -*- Python -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. __version__ = '$Revision: 1.5 $'[11:-2]
  5. from twisted.trial import unittest
  6. from twisted.protocols import pcp
  7. # Goal:
  8. # Take a Protocol instance. Own all outgoing data - anything that
  9. # would go to p.transport.write. Own all incoming data - anything
  10. # that comes to p.dataReceived.
  11. # I need:
  12. # Something with the AbstractFileDescriptor interface.
  13. # That is:
  14. # - acts as a Transport
  15. # - has a method write()
  16. # - which buffers
  17. # - acts as a Consumer
  18. # - has a registerProducer, unRegisterProducer
  19. # - tells the Producer to back off (pauseProducing) when its buffer is full.
  20. # - tells the Producer to resumeProducing when its buffer is not so full.
  21. # - acts as a Producer
  22. # - calls registerProducer
  23. # - calls write() on consumers
  24. # - honors requests to pause/resume producing
  25. # - honors stopProducing, and passes it along to upstream Producers
  26. class DummyTransport:
  27. """A dumb transport to wrap around."""
  28. def __init__(self):
  29. self._writes = []
  30. def write(self, data):
  31. self._writes.append(data)
  32. def getvalue(self):
  33. return ''.join(self._writes)
  34. class DummyProducer:
  35. resumed = False
  36. stopped = False
  37. paused = False
  38. def __init__(self, consumer):
  39. self.consumer = consumer
  40. def resumeProducing(self):
  41. self.resumed = True
  42. self.paused = False
  43. def pauseProducing(self):
  44. self.paused = True
  45. def stopProducing(self):
  46. self.stopped = True
  47. class DummyConsumer(DummyTransport):
  48. producer = None
  49. finished = False
  50. unregistered = True
  51. def registerProducer(self, producer, streaming):
  52. self.producer = (producer, streaming)
  53. def unregisterProducer(self):
  54. self.unregistered = True
  55. def finish(self):
  56. self.finished = True
  57. class TransportInterfaceTests(unittest.TestCase):
  58. proxyClass = pcp.BasicProducerConsumerProxy
  59. def setUp(self):
  60. self.underlying = DummyConsumer()
  61. self.transport = self.proxyClass(self.underlying)
  62. def testWrite(self):
  63. self.transport.write("some bytes")
  64. class ConsumerInterfaceTest:
  65. """Test ProducerConsumerProxy as a Consumer.
  66. Normally we have ProducingServer -> ConsumingTransport.
  67. If I am to go between (Server -> Shaper -> Transport), I have to
  68. play the role of Consumer convincingly for the ProducingServer.
  69. """
  70. def setUp(self):
  71. self.underlying = DummyConsumer()
  72. self.consumer = self.proxyClass(self.underlying)
  73. self.producer = DummyProducer(self.consumer)
  74. def testRegisterPush(self):
  75. self.consumer.registerProducer(self.producer, True)
  76. ## Consumer should NOT have called PushProducer.resumeProducing
  77. self.assertFalse(self.producer.resumed)
  78. ## I'm I'm just a proxy, should I only do resumeProducing when
  79. ## I get poked myself?
  80. #def testRegisterPull(self):
  81. # self.consumer.registerProducer(self.producer, False)
  82. # ## Consumer SHOULD have called PushProducer.resumeProducing
  83. # self.assertTrue(self.producer.resumed)
  84. def testUnregister(self):
  85. self.consumer.registerProducer(self.producer, False)
  86. self.consumer.unregisterProducer()
  87. # Now when the consumer would ordinarily want more data, it
  88. # shouldn't ask producer for it.
  89. # The most succinct way to trigger "want more data" is to proxy for
  90. # a PullProducer and have someone ask me for data.
  91. self.producer.resumed = False
  92. self.consumer.resumeProducing()
  93. self.assertFalse(self.producer.resumed)
  94. def testFinish(self):
  95. self.consumer.registerProducer(self.producer, False)
  96. self.consumer.finish()
  97. # I guess finish should behave like unregister?
  98. self.producer.resumed = False
  99. self.consumer.resumeProducing()
  100. self.assertFalse(self.producer.resumed)
  101. class ProducerInterfaceTest:
  102. """Test ProducerConsumerProxy as a Producer.
  103. Normally we have ProducingServer -> ConsumingTransport.
  104. If I am to go between (Server -> Shaper -> Transport), I have to
  105. play the role of Producer convincingly for the ConsumingTransport.
  106. """
  107. def setUp(self):
  108. self.consumer = DummyConsumer()
  109. self.producer = self.proxyClass(self.consumer)
  110. def testRegistersProducer(self):
  111. self.assertEqual(self.consumer.producer[0], self.producer)
  112. def testPause(self):
  113. self.producer.pauseProducing()
  114. self.producer.write("yakkity yak")
  115. self.assertFalse(self.consumer.getvalue(),
  116. "Paused producer should not have sent data.")
  117. def testResume(self):
  118. self.producer.pauseProducing()
  119. self.producer.resumeProducing()
  120. self.producer.write("yakkity yak")
  121. self.assertEqual(self.consumer.getvalue(), "yakkity yak")
  122. def testResumeNoEmptyWrite(self):
  123. self.producer.pauseProducing()
  124. self.producer.resumeProducing()
  125. self.assertEqual(len(self.consumer._writes), 0,
  126. "Resume triggered an empty write.")
  127. def testResumeBuffer(self):
  128. self.producer.pauseProducing()
  129. self.producer.write("buffer this")
  130. self.producer.resumeProducing()
  131. self.assertEqual(self.consumer.getvalue(), "buffer this")
  132. def testStop(self):
  133. self.producer.stopProducing()
  134. self.producer.write("yakkity yak")
  135. self.assertFalse(self.consumer.getvalue(),
  136. "Stopped producer should not have sent data.")
  137. class PCP_ConsumerInterfaceTests(ConsumerInterfaceTest, unittest.TestCase):
  138. proxyClass = pcp.BasicProducerConsumerProxy
  139. class PCPII_ConsumerInterfaceTests(ConsumerInterfaceTest, unittest.TestCase):
  140. proxyClass = pcp.ProducerConsumerProxy
  141. class PCP_ProducerInterfaceTests(ProducerInterfaceTest, unittest.TestCase):
  142. proxyClass = pcp.BasicProducerConsumerProxy
  143. class PCPII_ProducerInterfaceTests(ProducerInterfaceTest, unittest.TestCase):
  144. proxyClass = pcp.ProducerConsumerProxy
  145. class ProducerProxyTests(unittest.TestCase):
  146. """Producer methods on me should be relayed to the Producer I proxy.
  147. """
  148. proxyClass = pcp.BasicProducerConsumerProxy
  149. def setUp(self):
  150. self.proxy = self.proxyClass(None)
  151. self.parentProducer = DummyProducer(self.proxy)
  152. self.proxy.registerProducer(self.parentProducer, True)
  153. def testStop(self):
  154. self.proxy.stopProducing()
  155. self.assertTrue(self.parentProducer.stopped)
  156. class ConsumerProxyTests(unittest.TestCase):
  157. """Consumer methods on me should be relayed to the Consumer I proxy.
  158. """
  159. proxyClass = pcp.BasicProducerConsumerProxy
  160. def setUp(self):
  161. self.underlying = DummyConsumer()
  162. self.consumer = self.proxyClass(self.underlying)
  163. def testWrite(self):
  164. # NOTE: This test only valid for streaming (Push) systems.
  165. self.consumer.write("some bytes")
  166. self.assertEqual(self.underlying.getvalue(), "some bytes")
  167. def testFinish(self):
  168. self.consumer.finish()
  169. self.assertTrue(self.underlying.finished)
  170. def testUnregister(self):
  171. self.consumer.unregisterProducer()
  172. self.assertTrue(self.underlying.unregistered)
  173. class PullProducerTest:
  174. def setUp(self):
  175. self.underlying = DummyConsumer()
  176. self.proxy = self.proxyClass(self.underlying)
  177. self.parentProducer = DummyProducer(self.proxy)
  178. self.proxy.registerProducer(self.parentProducer, True)
  179. def testHoldWrites(self):
  180. self.proxy.write("hello")
  181. # Consumer should get no data before it says resumeProducing.
  182. self.assertFalse(self.underlying.getvalue(),
  183. "Pulling Consumer got data before it pulled.")
  184. def testPull(self):
  185. self.proxy.write("hello")
  186. self.proxy.resumeProducing()
  187. self.assertEqual(self.underlying.getvalue(), "hello")
  188. def testMergeWrites(self):
  189. self.proxy.write("hello ")
  190. self.proxy.write("sunshine")
  191. self.proxy.resumeProducing()
  192. nwrites = len(self.underlying._writes)
  193. self.assertEqual(nwrites, 1, "Pull resulted in %d writes instead "
  194. "of 1." % (nwrites,))
  195. self.assertEqual(self.underlying.getvalue(), "hello sunshine")
  196. def testLateWrite(self):
  197. # consumer sends its initial pull before we have data
  198. self.proxy.resumeProducing()
  199. self.proxy.write("data")
  200. # This data should answer that pull request.
  201. self.assertEqual(self.underlying.getvalue(), "data")
  202. class PCP_PullProducerTests(PullProducerTest, unittest.TestCase):
  203. class proxyClass(pcp.BasicProducerConsumerProxy):
  204. iAmStreaming = False
  205. class PCPII_PullProducerTests(PullProducerTest, unittest.TestCase):
  206. class proxyClass(pcp.ProducerConsumerProxy):
  207. iAmStreaming = False
  208. # Buffering!
  209. class BufferedConsumerTests(unittest.TestCase):
  210. """As a consumer, ask the producer to pause after too much data."""
  211. proxyClass = pcp.ProducerConsumerProxy
  212. def setUp(self):
  213. self.underlying = DummyConsumer()
  214. self.proxy = self.proxyClass(self.underlying)
  215. self.proxy.bufferSize = 100
  216. self.parentProducer = DummyProducer(self.proxy)
  217. self.proxy.registerProducer(self.parentProducer, True)
  218. def testRegisterPull(self):
  219. self.proxy.registerProducer(self.parentProducer, False)
  220. ## Consumer SHOULD have called PushProducer.resumeProducing
  221. self.assertTrue(self.parentProducer.resumed)
  222. def testPauseIntercept(self):
  223. self.proxy.pauseProducing()
  224. self.assertFalse(self.parentProducer.paused)
  225. def testResumeIntercept(self):
  226. self.proxy.pauseProducing()
  227. self.proxy.resumeProducing()
  228. # With a streaming producer, just because the proxy was resumed is
  229. # not necessarily a reason to resume the parent producer. The state
  230. # of the buffer should decide that.
  231. self.assertFalse(self.parentProducer.resumed)
  232. def testTriggerPause(self):
  233. """Make sure I say \"when.\""""
  234. # Pause the proxy so data sent to it builds up in its buffer.
  235. self.proxy.pauseProducing()
  236. self.assertFalse(self.parentProducer.paused, "don't pause yet")
  237. self.proxy.write("x" * 51)
  238. self.assertFalse(self.parentProducer.paused, "don't pause yet")
  239. self.proxy.write("x" * 51)
  240. self.assertTrue(self.parentProducer.paused)
  241. def testTriggerResume(self):
  242. """Make sure I resumeProducing when my buffer empties."""
  243. self.proxy.pauseProducing()
  244. self.proxy.write("x" * 102)
  245. self.assertTrue(self.parentProducer.paused, "should be paused")
  246. self.proxy.resumeProducing()
  247. # Resuming should have emptied my buffer, so I should tell my
  248. # parent to resume too.
  249. self.assertFalse(self.parentProducer.paused,
  250. "Producer should have resumed.")
  251. self.assertFalse(self.proxy.producerPaused)
  252. class BufferedPullTests(unittest.TestCase):
  253. class proxyClass(pcp.ProducerConsumerProxy):
  254. iAmStreaming = False
  255. def _writeSomeData(self, data):
  256. pcp.ProducerConsumerProxy._writeSomeData(self, data[:100])
  257. return min(len(data), 100)
  258. def setUp(self):
  259. self.underlying = DummyConsumer()
  260. self.proxy = self.proxyClass(self.underlying)
  261. self.proxy.bufferSize = 100
  262. self.parentProducer = DummyProducer(self.proxy)
  263. self.proxy.registerProducer(self.parentProducer, False)
  264. def testResumePull(self):
  265. # If proxy has no data to send on resumeProducing, it had better pull
  266. # some from its PullProducer.
  267. self.parentProducer.resumed = False
  268. self.proxy.resumeProducing()
  269. self.assertTrue(self.parentProducer.resumed)
  270. def testLateWriteBuffering(self):
  271. # consumer sends its initial pull before we have data
  272. self.proxy.resumeProducing()
  273. self.proxy.write("datum" * 21)
  274. # This data should answer that pull request.
  275. self.assertEqual(self.underlying.getvalue(), "datum" * 20)
  276. # but there should be some left over
  277. self.assertEqual(self.proxy._buffer, ["datum"])
  278. # TODO:
  279. # test that web request finishing bug (when we weren't proxying
  280. # unregisterProducer but were proxying finish, web file transfers
  281. # would hang on the last block.)
  282. # test what happens if writeSomeBytes decided to write zero bytes.