pop3testserver.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. #!/usr/bin/env python
  2. # -*- test-case-name: twisted.mail.test.test_pop3client -*-
  3. # Copyright (c) Twisted Matrix Laboratories.
  4. # See LICENSE for details.
  5. from __future__ import print_function
  6. from twisted.internet.protocol import Factory
  7. from twisted.protocols import basic
  8. from twisted.internet import reactor
  9. import sys
  10. USER = "test"
  11. PASS = "twisted"
  12. PORT = 1100
  13. SSL_SUPPORT = True
  14. UIDL_SUPPORT = True
  15. INVALID_SERVER_RESPONSE = False
  16. INVALID_CAPABILITY_RESPONSE = False
  17. INVALID_LOGIN_RESPONSE = False
  18. DENY_CONNECTION = False
  19. DROP_CONNECTION = False
  20. BAD_TLS_RESPONSE = False
  21. TIMEOUT_RESPONSE = False
  22. TIMEOUT_DEFERRED = False
  23. SLOW_GREETING = False
  24. """Commands"""
  25. CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready"
  26. CAPABILITIES = [
  27. "TOP",
  28. "LOGIN-DELAY 180",
  29. "USER",
  30. "SASL LOGIN"
  31. ]
  32. CAPABILITIES_SSL = "STLS"
  33. CAPABILITIES_UIDL = "UIDL"
  34. INVALID_RESPONSE = "-ERR Unknown request"
  35. VALID_RESPONSE = "+OK Command Completed"
  36. AUTH_DECLINED = "-ERR LOGIN failed"
  37. AUTH_ACCEPTED = "+OK Mailbox open, 0 messages"
  38. TLS_ERROR = "-ERR server side error start TLS handshake"
  39. LOGOUT_COMPLETE = "+OK quit completed"
  40. NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command"
  41. STAT = "+OK 0 0"
  42. UIDL = "+OK Unique-ID listing follows\r\n."
  43. LIST = "+OK Mailbox scan listing follows\r\n."
  44. CAP_START = "+OK Capability list follows:"
  45. class POP3TestServer(basic.LineReceiver):
  46. def __init__(self, contextFactory = None):
  47. self.loggedIn = False
  48. self.caps = None
  49. self.tmpUser = None
  50. self.ctx = contextFactory
  51. def sendSTATResp(self, req):
  52. self.sendLine(STAT)
  53. def sendUIDLResp(self, req):
  54. self.sendLine(UIDL)
  55. def sendLISTResp(self, req):
  56. self.sendLine(LIST)
  57. def sendCapabilities(self):
  58. if self.caps is None:
  59. self.caps = [CAP_START]
  60. if UIDL_SUPPORT:
  61. self.caps.append(CAPABILITIES_UIDL)
  62. if SSL_SUPPORT:
  63. self.caps.append(CAPABILITIES_SSL)
  64. for cap in CAPABILITIES:
  65. self.caps.append(cap)
  66. resp = '\r\n'.join(self.caps)
  67. resp += "\r\n."
  68. self.sendLine(resp)
  69. def connectionMade(self):
  70. if DENY_CONNECTION:
  71. self.disconnect()
  72. return
  73. if SLOW_GREETING:
  74. reactor.callLater(20, self.sendGreeting)
  75. else:
  76. self.sendGreeting()
  77. def sendGreeting(self):
  78. self.sendLine(CONNECTION_MADE)
  79. def lineReceived(self, line):
  80. """Error Conditions"""
  81. uline = line.upper()
  82. find = lambda s: uline.find(s) != -1
  83. if TIMEOUT_RESPONSE:
  84. # Do not respond to clients request
  85. return
  86. if DROP_CONNECTION:
  87. self.disconnect()
  88. return
  89. elif find("CAPA"):
  90. if INVALID_CAPABILITY_RESPONSE:
  91. self.sendLine(INVALID_RESPONSE)
  92. else:
  93. self.sendCapabilities()
  94. elif find("STLS") and SSL_SUPPORT:
  95. self.startTLS()
  96. elif find("USER"):
  97. if INVALID_LOGIN_RESPONSE:
  98. self.sendLine(INVALID_RESPONSE)
  99. return
  100. resp = None
  101. try:
  102. self.tmpUser = line.split(" ")[1]
  103. resp = VALID_RESPONSE
  104. except:
  105. resp = AUTH_DECLINED
  106. self.sendLine(resp)
  107. elif find("PASS"):
  108. resp = None
  109. try:
  110. pwd = line.split(" ")[1]
  111. if self.tmpUser is None or pwd is None:
  112. resp = AUTH_DECLINED
  113. elif self.tmpUser == USER and pwd == PASS:
  114. resp = AUTH_ACCEPTED
  115. self.loggedIn = True
  116. else:
  117. resp = AUTH_DECLINED
  118. except:
  119. resp = AUTH_DECLINED
  120. self.sendLine(resp)
  121. elif find("QUIT"):
  122. self.loggedIn = False
  123. self.sendLine(LOGOUT_COMPLETE)
  124. self.disconnect()
  125. elif INVALID_SERVER_RESPONSE:
  126. self.sendLine(INVALID_RESPONSE)
  127. elif not self.loggedIn:
  128. self.sendLine(NOT_LOGGED_IN)
  129. elif find("NOOP"):
  130. self.sendLine(VALID_RESPONSE)
  131. elif find("STAT"):
  132. if TIMEOUT_DEFERRED:
  133. return
  134. self.sendLine(STAT)
  135. elif find("LIST"):
  136. if TIMEOUT_DEFERRED:
  137. return
  138. self.sendLine(LIST)
  139. elif find("UIDL"):
  140. if TIMEOUT_DEFERRED:
  141. return
  142. elif not UIDL_SUPPORT:
  143. self.sendLine(INVALID_RESPONSE)
  144. return
  145. self.sendLine(UIDL)
  146. def startTLS(self):
  147. if self.ctx is None:
  148. self.getContext()
  149. if SSL_SUPPORT and self.ctx is not None:
  150. self.sendLine('+OK Begin TLS negotiation now')
  151. self.transport.startTLS(self.ctx)
  152. else:
  153. self.sendLine('-ERR TLS not available')
  154. def disconnect(self):
  155. self.transport.loseConnection()
  156. def getContext(self):
  157. try:
  158. from twisted.internet import ssl
  159. except ImportError:
  160. self.ctx = None
  161. else:
  162. self.ctx = ssl.ClientContextFactory()
  163. self.ctx.method = ssl.SSL.TLSv1_METHOD
  164. usage = """popServer.py [arg] (default is Standard POP Server with no messages)
  165. no_ssl - Start with no SSL support
  166. no_uidl - Start with no UIDL support
  167. bad_resp - Send a non-RFC compliant response to the Client
  168. bad_cap_resp - send a non-RFC compliant response when the Client sends a 'CAPABILITY' request
  169. bad_login_resp - send a non-RFC compliant response when the Client sends a 'LOGIN' request
  170. deny - Deny the connection
  171. drop - Drop the connection after sending the greeting
  172. bad_tls - Send a bad response to a STARTTLS
  173. timeout - Do not return a response to a Client request
  174. to_deferred - Do not return a response on a 'Select' request. This
  175. will test Deferred callback handling
  176. slow - Wait 20 seconds after the connection is made to return a Server Greeting
  177. """
  178. def printMessage(msg):
  179. print("Server Starting in %s mode" % msg)
  180. def processArg(arg):
  181. if arg.lower() == 'no_ssl':
  182. global SSL_SUPPORT
  183. SSL_SUPPORT = False
  184. printMessage("NON-SSL")
  185. elif arg.lower() == 'no_uidl':
  186. global UIDL_SUPPORT
  187. UIDL_SUPPORT = False
  188. printMessage("NON-UIDL")
  189. elif arg.lower() == 'bad_resp':
  190. global INVALID_SERVER_RESPONSE
  191. INVALID_SERVER_RESPONSE = True
  192. printMessage("Invalid Server Response")
  193. elif arg.lower() == 'bad_cap_resp':
  194. global INVALID_CAPABILITY_RESPONSE
  195. INVALID_CAPABILITY_RESPONSE = True
  196. printMessage("Invalid Capability Response")
  197. elif arg.lower() == 'bad_login_resp':
  198. global INVALID_LOGIN_RESPONSE
  199. INVALID_LOGIN_RESPONSE = True
  200. printMessage("Invalid Capability Response")
  201. elif arg.lower() == 'deny':
  202. global DENY_CONNECTION
  203. DENY_CONNECTION = True
  204. printMessage("Deny Connection")
  205. elif arg.lower() == 'drop':
  206. global DROP_CONNECTION
  207. DROP_CONNECTION = True
  208. printMessage("Drop Connection")
  209. elif arg.lower() == 'bad_tls':
  210. global BAD_TLS_RESPONSE
  211. BAD_TLS_RESPONSE = True
  212. printMessage("Bad TLS Response")
  213. elif arg.lower() == 'timeout':
  214. global TIMEOUT_RESPONSE
  215. TIMEOUT_RESPONSE = True
  216. printMessage("Timeout Response")
  217. elif arg.lower() == 'to_deferred':
  218. global TIMEOUT_DEFERRED
  219. TIMEOUT_DEFERRED = True
  220. printMessage("Timeout Deferred Response")
  221. elif arg.lower() == 'slow':
  222. global SLOW_GREETING
  223. SLOW_GREETING = True
  224. printMessage("Slow Greeting")
  225. elif arg.lower() == '--help':
  226. print(usage)
  227. sys.exit()
  228. else:
  229. print(usage)
  230. sys.exit()
  231. def main():
  232. if len(sys.argv) < 2:
  233. printMessage("POP3 with no messages")
  234. else:
  235. args = sys.argv[1:]
  236. for arg in args:
  237. processArg(arg)
  238. f = Factory()
  239. f.protocol = POP3TestServer
  240. reactor.listenTCP(PORT, f)
  241. reactor.run()
  242. if __name__ == '__main__':
  243. main()