test_database.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.news.database}.
  5. """
  6. __metaclass__ = type
  7. from email.Parser import Parser
  8. from socket import gethostname
  9. from twisted.trial.unittest import TestCase
  10. from twisted.internet.defer import succeed
  11. from twisted.mail.smtp import messageid
  12. from twisted.news.database import Article, PickleStorage, NewsShelf
  13. class ModerationTestsMixin:
  14. """
  15. Tests for the moderation features of L{INewsStorage} implementations.
  16. """
  17. def setUp(self):
  18. self._email = []
  19. def sendmail(self, smtphost, from_addr, to_addrs, msg,
  20. senderDomainName=None, port=25):
  21. """
  22. Fake of L{twisted.mail.smtp.sendmail} which records attempts to send
  23. email and immediately pretends success.
  24. Subclasses should arrange for their storage implementation to call this
  25. instead of the real C{sendmail} function.
  26. """
  27. self._email.append((
  28. smtphost, from_addr, to_addrs, msg, senderDomainName, port))
  29. return succeed(None)
  30. _messageTemplate = """\
  31. From: some dude
  32. To: another person
  33. Subject: activities etc
  34. Message-ID: %(articleID)s
  35. Newsgroups: %(newsgroup)s
  36. %(approved)s
  37. Body of the message is such.
  38. """.replace('\n', '\r\n')
  39. def getApprovedMessage(self, articleID, group):
  40. """
  41. Return a C{str} containing an RFC 2822 formatted message including an
  42. I{Approved} header indicating it has passed through moderation.
  43. """
  44. return self._messageTemplate % {
  45. 'articleID': articleID,
  46. 'newsgroup': group,
  47. 'approved': 'Approved: yup\r\n'}
  48. def getUnapprovedMessage(self, articleID, group):
  49. """
  50. Return a C{str} containing an RFC 2822 formatted message with no
  51. I{Approved} header indicating it may require moderation.
  52. """
  53. return self._messageTemplate % {
  54. 'articleID': articleID,
  55. 'newsgroup': group,
  56. 'approved': '\r\n'}
  57. def getStorage(self, groups, moderators, mailhost, sender):
  58. """
  59. Override in a subclass to return a L{INewsStorage} provider to test for
  60. correct moderation behavior.
  61. @param groups: A C{list} of C{str} naming the groups which should exist
  62. in the resulting storage object.
  63. @param moderators: A C{dict} mapping C{str} each group name to a C{list}
  64. of C{str} giving moderator email (RFC 2821) addresses.
  65. """
  66. raise NotImplementedError()
  67. def test_postApproved(self):
  68. """
  69. L{INewsStorage.postRequest} posts the message if it includes an
  70. I{Approved} header.
  71. """
  72. group = "example.group"
  73. moderator = "alice@example.com"
  74. mailhost = "127.0.0.1"
  75. sender = "bob@example.org"
  76. articleID = messageid()
  77. storage = self.getStorage(
  78. [group], {group: [moderator]}, mailhost, sender)
  79. message = self.getApprovedMessage(articleID, group)
  80. result = storage.postRequest(message)
  81. def cbPosted(ignored):
  82. self.assertEqual(self._email, [])
  83. exists = storage.articleExistsRequest(articleID)
  84. exists.addCallback(self.assertTrue)
  85. return exists
  86. result.addCallback(cbPosted)
  87. return result
  88. def test_postModerated(self):
  89. """
  90. L{INewsStorage.postRequest} forwards a message to the moderator if it
  91. does not include an I{Approved} header.
  92. """
  93. group = "example.group"
  94. moderator = "alice@example.com"
  95. mailhost = "127.0.0.1"
  96. sender = "bob@example.org"
  97. articleID = messageid()
  98. storage = self.getStorage(
  99. [group], {group: [moderator]}, mailhost, sender)
  100. message = self.getUnapprovedMessage(articleID, group)
  101. result = storage.postRequest(message)
  102. def cbModerated(ignored):
  103. self.assertEqual(len(self._email), 1)
  104. self.assertEqual(self._email[0][0], mailhost)
  105. self.assertEqual(self._email[0][1], sender)
  106. self.assertEqual(self._email[0][2], [moderator])
  107. self._checkModeratorMessage(
  108. self._email[0][3], sender, moderator, group, message)
  109. self.assertEqual(self._email[0][4], None)
  110. self.assertEqual(self._email[0][5], 25)
  111. exists = storage.articleExistsRequest(articleID)
  112. exists.addCallback(self.assertFalse)
  113. return exists
  114. result.addCallback(cbModerated)
  115. return result
  116. def _checkModeratorMessage(self, messageText, sender, moderator, group, postingText):
  117. p = Parser()
  118. msg = p.parsestr(messageText)
  119. headers = dict(msg.items())
  120. del headers['Message-ID']
  121. self.assertEqual(
  122. headers,
  123. {'From': sender,
  124. 'To': moderator,
  125. 'Subject': 'Moderate new %s message: activities etc' % (group,),
  126. 'Content-Type': 'message/rfc822'})
  127. posting = p.parsestr(postingText)
  128. attachment = msg.get_payload()[0]
  129. for header in ['from', 'to', 'subject', 'message-id', 'newsgroups']:
  130. self.assertEqual(posting[header], attachment[header])
  131. self.assertEqual(posting.get_payload(), attachment.get_payload())
  132. class PickleStorageTests(ModerationTestsMixin, TestCase):
  133. """
  134. Tests for L{PickleStorage}.
  135. """
  136. def getStorage(self, groups, moderators, mailhost, sender):
  137. """
  138. Create and return a L{PickleStorage} instance configured to require
  139. moderation.
  140. """
  141. storageFilename = self.mktemp()
  142. storage = PickleStorage(
  143. storageFilename, groups, moderators, mailhost, sender)
  144. storage.sendmail = self.sendmail
  145. self.addCleanup(PickleStorage.sharedDBs.pop, storageFilename)
  146. return storage
  147. class NewsShelfTests(ModerationTestsMixin, TestCase):
  148. """
  149. Tests for L{NewsShelf}.
  150. """
  151. def getStorage(self, groups, moderators, mailhost, sender):
  152. """
  153. Create and return a L{NewsShelf} instance configured to require
  154. moderation.
  155. """
  156. storageFilename = self.mktemp()
  157. shelf = NewsShelf(mailhost, storageFilename, sender)
  158. for name in groups:
  159. shelf.addGroup(name, 'm') # Dial 'm' for moderator
  160. for address in moderators.get(name, []):
  161. shelf.addModerator(name, address)
  162. shelf.sendmail = self.sendmail
  163. return shelf
  164. def test_notifyModerator(self):
  165. """
  166. L{NewsShelf.notifyModerator} sends a moderation email to a single
  167. moderator.
  168. """
  169. shelf = NewsShelf('example.com', self.mktemp(), 'alice@example.com')
  170. shelf.sendmail = self.sendmail
  171. shelf.notifyModerator('bob@example.org', Article('Foo: bar', 'Some text'))
  172. self.assertEqual(len(self._email), 1)
  173. def test_defaultSender(self):
  174. """
  175. If no sender is specified to L{NewsShelf.notifyModerators}, a default
  176. address based on the system hostname is used for both the envelope and
  177. RFC 2822 sender addresses.
  178. """
  179. shelf = NewsShelf('example.com', self.mktemp())
  180. shelf.sendmail = self.sendmail
  181. shelf.notifyModerators(['bob@example.org'], Article('Foo: bar', 'Some text'))
  182. self.assertEqual(self._email[0][1], 'twisted-news@' + gethostname())
  183. self.assertIn('From: twisted-news@' + gethostname(), self._email[0][3])