test_irc_service.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for IRC portions of L{twisted.words.service}.
  5. """
  6. from twisted.cred import checkers, portal
  7. from twisted.test import proto_helpers
  8. from twisted.words.protocols import irc
  9. from twisted.words.service import InMemoryWordsRealm, IRCFactory, IRCUser
  10. from twisted.words.test.test_irc import IRCTestCase
  11. class IRCUserTests(IRCTestCase):
  12. """
  13. Isolated tests for L{IRCUser}
  14. """
  15. def setUp(self):
  16. """
  17. Sets up a Realm, Portal, Factory, IRCUser, Transport, and Connection
  18. for our tests.
  19. """
  20. self.realm = InMemoryWordsRealm("example.com")
  21. self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
  22. self.portal = portal.Portal(self.realm, [self.checker])
  23. self.checker.addUser(u"john", u"pass")
  24. self.factory = IRCFactory(self.realm, self.portal)
  25. self.ircUser = self.factory.buildProtocol(None)
  26. self.stringTransport = proto_helpers.StringTransport()
  27. self.ircUser.makeConnection(self.stringTransport)
  28. def test_sendMessage(self):
  29. """
  30. Sending a message to a user after they have sent NICK, but before they
  31. have authenticated, results in a message from "example.com".
  32. """
  33. self.ircUser.irc_NICK("", ["mynick"])
  34. self.stringTransport.clear()
  35. self.ircUser.sendMessage("foo")
  36. self.assertEqualBufferValue(self.stringTransport.value(), ":example.com foo mynick\r\n")
  37. def test_utf8Messages(self):
  38. """
  39. When a UTF8 message is sent with sendMessage and the current IRCUser
  40. has a UTF8 nick and is set to UTF8 encoding, the message will be
  41. written to the transport.
  42. """
  43. expectedResult = (u":example.com \u0442\u0435\u0441\u0442 "
  44. u"\u043d\u0438\u043a\r\n").encode('utf-8')
  45. self.ircUser.irc_NICK("", [u"\u043d\u0438\u043a".encode('utf-8')])
  46. self.stringTransport.clear()
  47. self.ircUser.sendMessage(u"\u0442\u0435\u0441\u0442".encode('utf-8'))
  48. self.assertEqualBufferValue(self.stringTransport.value(), expectedResult)
  49. def test_invalidEncodingNick(self):
  50. """
  51. A NICK command sent with a nickname that cannot be decoded with the
  52. current IRCUser's encoding results in a PRIVMSG from NickServ
  53. indicating that the nickname could not be decoded.
  54. """
  55. self.ircUser.irc_NICK("", [b"\xd4\xc5\xd3\xd4"])
  56. self.assertRaises(UnicodeError)
  57. def response(self):
  58. """
  59. Grabs our responses and then clears the transport
  60. """
  61. response = self.ircUser.transport.value()
  62. self.ircUser.transport.clear()
  63. if bytes != str and isinstance(response, bytes):
  64. response = response.decode("utf-8")
  65. response = response.splitlines()
  66. return [irc.parsemsg(r) for r in response]
  67. def scanResponse(self, response, messageType):
  68. """
  69. Gets messages out of a response
  70. @param response: The parsed IRC messages of the response, as returned
  71. by L{IRCUserTests.response}
  72. @param messageType: The string type of the desired messages.
  73. @return: An iterator which yields 2-tuples of C{(index, ircMessage)}
  74. """
  75. for n, message in enumerate(response):
  76. if (message[1] == messageType):
  77. yield n, message
  78. def test_sendNickSendsGreeting(self):
  79. """
  80. Receiving NICK without authenticating sends the MOTD Start and MOTD End
  81. messages, which is required by certain popular IRC clients (such as
  82. Pidgin) before a connection is considered to be fully established.
  83. """
  84. self.ircUser.irc_NICK("", ["mynick"])
  85. response = self.response()
  86. start = list(self.scanResponse(response, irc.RPL_MOTDSTART))
  87. end = list(self.scanResponse(response, irc.RPL_ENDOFMOTD))
  88. self.assertEqual(start,
  89. [(0, ('example.com', '375', ['mynick', '- example.com Message of the Day - ']))])
  90. self.assertEqual(end,
  91. [(1, ('example.com', '376', ['mynick', 'End of /MOTD command.']))])
  92. def test_fullLogin(self):
  93. """
  94. Receiving USER, PASS, NICK will log in the user, and transmit the
  95. appropriate response messages.
  96. """
  97. self.ircUser.irc_USER("", ["john doe"])
  98. self.ircUser.irc_PASS("", ["pass"])
  99. self.ircUser.irc_NICK("", ["john"])
  100. version = ('Your host is example.com, running version %s' %
  101. (self.factory._serverInfo["serviceVersion"],))
  102. creation = ('This server was created on %s' %
  103. (self.factory._serverInfo["creationDate"],))
  104. self.assertEqual(self.response(),
  105. [('example.com', '375',
  106. ['john', '- example.com Message of the Day - ']),
  107. ('example.com', '376', ['john', 'End of /MOTD command.']),
  108. ('example.com', '001', ['john', 'connected to Twisted IRC']),
  109. ('example.com', '002', ['john', version]),
  110. ('example.com', '003', ['john', creation]),
  111. ('example.com', '004',
  112. ['john', 'example.com', self.factory._serverInfo["serviceVersion"],
  113. 'w', 'n'])])
  114. def test_PART(self):
  115. """
  116. irc_PART
  117. """
  118. self.ircUser.irc_NICK("testuser", ["mynick"])
  119. response = self.response()
  120. self.ircUser.transport.clear()
  121. self.assertEqual(response[0][1], irc.RPL_MOTDSTART)
  122. self.ircUser.irc_JOIN("testuser", ["somechannel"])
  123. response = self.response()
  124. self.ircUser.transport.clear()
  125. self.assertEqual(response[0][1], irc.ERR_NOSUCHCHANNEL)
  126. self.ircUser.irc_PART("testuser", [b"somechannel", b"booga"])
  127. response = self.response()
  128. self.ircUser.transport.clear()
  129. self.assertEqual(response[0][1], irc.ERR_NOTONCHANNEL)
  130. self.ircUser.irc_PART("testuser", [u"somechannel", u"booga"])
  131. response = self.response()
  132. self.ircUser.transport.clear()
  133. self.assertEqual(response[0][1], irc.ERR_NOTONCHANNEL)
  134. def test_NAMES(self):
  135. """
  136. irc_NAMES
  137. """
  138. self.ircUser.irc_NICK("", ["testuser"])
  139. self.ircUser.irc_JOIN("", ["somechannel"])
  140. self.ircUser.transport.clear()
  141. self.ircUser.irc_NAMES("", ["somechannel"])
  142. response = self.response()
  143. self.assertEqual(response[0][1], irc.RPL_ENDOFNAMES)
  144. class MocksyIRCUser(IRCUser):
  145. def __init__(self):
  146. self.realm = InMemoryWordsRealm("example.com")
  147. self.mockedCodes = []
  148. def sendMessage(self, code, *_, **__):
  149. self.mockedCodes.append(code)
  150. BADTEXT = b'\xff'
  151. class IRCUserBadEncodingTests(IRCTestCase):
  152. """
  153. Verifies that L{IRCUser} sends the correct error messages back to clients
  154. when given indecipherable bytes
  155. """
  156. # TODO: irc_NICK -- but NICKSERV is used for that, so it isn't as easy.
  157. def setUp(self):
  158. self.ircUser = MocksyIRCUser()
  159. def assertChokesOnBadBytes(self, irc_x, error):
  160. """
  161. Asserts that IRCUser sends the relevant error code when a given irc_x
  162. dispatch method is given undecodable bytes.
  163. @param irc_x: the name of the irc_FOO method to test.
  164. For example, irc_x = 'PRIVMSG' will check irc_PRIVMSG
  165. @param error: the error code irc_x should send. For example,
  166. irc.ERR_NOTONCHANNEL
  167. """
  168. getattr(self.ircUser, 'irc_%s' % irc_x)(None, [BADTEXT])
  169. self.assertEqual(self.ircUser.mockedCodes, [error])
  170. # No such channel
  171. def test_JOIN(self):
  172. """
  173. Tests that irc_JOIN sends ERR_NOSUCHCHANNEL if the channel name can't
  174. be decoded.
  175. """
  176. self.assertChokesOnBadBytes('JOIN', irc.ERR_NOSUCHCHANNEL)
  177. def test_NAMES(self):
  178. """
  179. Tests that irc_NAMES sends ERR_NOSUCHCHANNEL if the channel name can't
  180. be decoded.
  181. """
  182. self.assertChokesOnBadBytes('NAMES', irc.ERR_NOSUCHCHANNEL)
  183. def test_TOPIC(self):
  184. """
  185. Tests that irc_TOPIC sends ERR_NOSUCHCHANNEL if the channel name can't
  186. be decoded.
  187. """
  188. self.assertChokesOnBadBytes('TOPIC', irc.ERR_NOSUCHCHANNEL)
  189. def test_LIST(self):
  190. """
  191. Tests that irc_LIST sends ERR_NOSUCHCHANNEL if the channel name can't
  192. be decoded.
  193. """
  194. self.assertChokesOnBadBytes('LIST', irc.ERR_NOSUCHCHANNEL)
  195. # No such nick
  196. def test_MODE(self):
  197. """
  198. Tests that irc_MODE sends ERR_NOSUCHNICK if the target name can't
  199. be decoded.
  200. """
  201. self.assertChokesOnBadBytes('MODE', irc.ERR_NOSUCHNICK)
  202. def test_PRIVMSG(self):
  203. """
  204. Tests that irc_PRIVMSG sends ERR_NOSUCHNICK if the target name can't
  205. be decoded.
  206. """
  207. self.assertChokesOnBadBytes('PRIVMSG', irc.ERR_NOSUCHNICK)
  208. def test_WHOIS(self):
  209. """
  210. Tests that irc_WHOIS sends ERR_NOSUCHNICK if the target name can't
  211. be decoded.
  212. """
  213. self.assertChokesOnBadBytes('WHOIS', irc.ERR_NOSUCHNICK)
  214. # Not on channel
  215. def test_PART(self):
  216. """
  217. Tests that irc_PART sends ERR_NOTONCHANNEL if the target name can't
  218. be decoded.
  219. """
  220. self.assertChokesOnBadBytes('PART', irc.ERR_NOTONCHANNEL)
  221. # Probably nothing
  222. def test_WHO(self):
  223. """
  224. Tests that irc_WHO immediately ends the WHO list if the target name
  225. can't be decoded.
  226. """
  227. self.assertChokesOnBadBytes('WHO', irc.RPL_ENDOFWHO)