stdio_test_producer.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. # -*- test-case-name: twisted.test.test_stdio.StandardInputOutputTests.test_producer -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Main program for the child process run by
  6. L{twisted.test.test_stdio.StandardInputOutputTests.test_producer} to test
  7. that process transports implement IProducer properly.
  8. """
  9. from __future__ import absolute_import, division
  10. import sys
  11. from twisted.internet import stdio, protocol
  12. from twisted.python import log, reflect
  13. class ProducerChild(protocol.Protocol):
  14. _paused = False
  15. buf = b''
  16. def connectionLost(self, reason):
  17. log.msg("*****OVER*****")
  18. reactor.callLater(1, reactor.stop)
  19. def dataReceived(self, data):
  20. self.buf += data
  21. if self._paused:
  22. log.startLogging(sys.stderr)
  23. log.msg("dataReceived while transport paused!")
  24. self.transport.loseConnection()
  25. else:
  26. self.transport.write(data)
  27. if self.buf.endswith(b'\n0\n'):
  28. self.transport.loseConnection()
  29. else:
  30. self.pause()
  31. def pause(self):
  32. self._paused = True
  33. self.transport.pauseProducing()
  34. reactor.callLater(0.01, self.unpause)
  35. def unpause(self):
  36. self._paused = False
  37. self.transport.resumeProducing()
  38. if __name__ == '__main__':
  39. reflect.namedAny(sys.argv[1]).install()
  40. from twisted.internet import reactor
  41. stdio.StandardIO(ProducerChild())
  42. reactor.run()