fakeendpoint.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. # -*- test-case-name: twisted.internet.test.test_endpoints -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Fake client and server endpoint string parser plugins for testing purposes.
  6. """
  7. from __future__ import absolute_import, division
  8. from zope.interface.declarations import implementer
  9. from twisted.plugin import IPlugin
  10. from twisted.internet.interfaces import (
  11. IStreamClientEndpoint, IStreamServerEndpoint,
  12. IStreamServerEndpointStringParser,
  13. IStreamClientEndpointStringParserWithReactor)
  14. @implementer(IPlugin)
  15. class PluginBase(object):
  16. def __init__(self, pfx):
  17. self.prefix = pfx
  18. @implementer(IStreamClientEndpointStringParserWithReactor)
  19. class FakeClientParserWithReactor(PluginBase):
  20. def parseStreamClient(self, *a, **kw):
  21. return StreamClient(self, a, kw)
  22. @implementer(IStreamServerEndpointStringParser)
  23. class FakeParser(PluginBase):
  24. def parseStreamServer(self, *a, **kw):
  25. return StreamServer(self, a, kw)
  26. class EndpointBase(object):
  27. def __init__(self, parser, args, kwargs):
  28. self.parser = parser
  29. self.args = args
  30. self.kwargs = kwargs
  31. @implementer(IStreamClientEndpoint)
  32. class StreamClient(EndpointBase):
  33. pass
  34. @implementer(IStreamServerEndpoint)
  35. class StreamServer(EndpointBase):
  36. pass
  37. # Instantiate plugin interface providers to register them.
  38. fake = FakeParser('fake')
  39. fakeClientWithReactor = FakeClientParserWithReactor('crfake')
  40. fakeClientWithReactorAndPreference = FakeClientParserWithReactor('cpfake')