mail_debug.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # -*- coding: utf-8 -*-
  2. from __future__ import print_function
  3. import asyncore
  4. import sys
  5. from logging import getLogger
  6. from smtpd import SMTPServer
  7. from django.core.management.base import BaseCommand, CommandError
  8. from django_extensions.management.utils import setup_logger, signalcommand
  9. logger = getLogger(__name__)
  10. class ExtensionDebuggingServer(SMTPServer):
  11. """Duplication of smtpd.DebuggingServer, but using logging instead of print."""
  12. # Do something with the gathered message
  13. def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
  14. """Output will be sent to the module logger at INFO level."""
  15. inheaders = 1
  16. lines = data.split('\n')
  17. logger.info('---------- MESSAGE FOLLOWS ----------')
  18. for line in lines:
  19. # headers first
  20. if inheaders and not line:
  21. logger.info('X-Peer: %s' % peer[0])
  22. inheaders = 0
  23. logger.info(line)
  24. logger.info('------------ END MESSAGE ------------')
  25. class Command(BaseCommand):
  26. help = "Starts a test mail server for development."
  27. args = '[optional port number or ippaddr:port]'
  28. requires_system_checks = False
  29. def add_arguments(self, parser):
  30. super().add_arguments(parser)
  31. parser.add_argument(
  32. '--output', dest='output_file', default=None,
  33. help='Specifies an output file to send a copy of all messages (not flushed immediately).'
  34. )
  35. parser.add_argument(
  36. '--use-settings', dest='use_settings',
  37. action='store_true', default=False,
  38. help='Uses EMAIL_HOST and HOST_PORT from Django settings.'
  39. )
  40. @signalcommand
  41. def handle(self, addrport='', *args, **options):
  42. if not addrport:
  43. if options['use_settings']:
  44. from django.conf import settings
  45. addr = getattr(settings, 'EMAIL_HOST', '')
  46. port = str(getattr(settings, 'EMAIL_PORT', '1025'))
  47. else:
  48. addr = ''
  49. port = '1025'
  50. else:
  51. try:
  52. addr, port = addrport.split(':')
  53. except ValueError:
  54. addr, port = '', addrport
  55. if not addr:
  56. addr = '127.0.0.1'
  57. if not port.isdigit():
  58. raise CommandError("%r is not a valid port number." % port)
  59. else:
  60. port = int(port)
  61. # Add console handler
  62. setup_logger(logger, stream=self.stdout, filename=options['output_file'])
  63. def inner_run():
  64. quit_command = (sys.platform == 'win32') and 'CTRL-BREAK' or 'CONTROL-C'
  65. print("Now accepting mail at %s:%s -- use %s to quit" % (addr, port, quit_command))
  66. if sys.version_info < (3, 5):
  67. ExtensionDebuggingServer((addr, port), None)
  68. else:
  69. ExtensionDebuggingServer((addr, port), None, decode_data=True)
  70. asyncore.loop()
  71. try:
  72. inner_run()
  73. except KeyboardInterrupt:
  74. pass