email.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. from __future__ import absolute_import
  2. from django.conf import settings
  3. from django.core.mail import send_mail, EmailMessage
  4. from django.template.loader import render_to_string
  5. import email
  6. import email.utils
  7. try:
  8. from email.Iterators import typed_subpart_iterator
  9. except ImportError:
  10. from email.iterators import typed_subpart_iterator
  11. import mailbox
  12. from . import const
  13. import logging
  14. log = logging.getLogger(__name__)
  15. EMAIL_PRIVATE_ANNOUNCES = getattr(settings, "EMAIL_PRIVATE_ANNOUNCES", "nm@debian.org")
  16. def get_charset(message, default="ascii"):
  17. """Get the message charset"""
  18. if message.get_content_charset():
  19. return message.get_content_charset()
  20. if message.get_charset():
  21. return message.get_charset()
  22. return default
  23. def get_body(message):
  24. """Get the body of the email message"""
  25. if message.is_multipart():
  26. #get the plain text version only
  27. text_parts = [part
  28. for part in typed_subpart_iterator(message,
  29. 'text',
  30. 'plain')]
  31. body = []
  32. for part in text_parts:
  33. charset = get_charset(part, get_charset(message))
  34. body.append(unicode(part.get_payload(decode=True),
  35. charset,
  36. "replace"))
  37. return u"\n".join(body).strip()
  38. else: # if it is not multipart, the payload will be a string
  39. # representing the message body
  40. body = unicode(message.get_payload(decode=True),
  41. get_charset(message),
  42. "replace")
  43. return body.strip()
  44. def parse_recipient_list(s):
  45. """
  46. Parse a string like "Foo <a@b.c>, bar@example.com"
  47. and return a list like ["Foo <a@b.c>", "bar@example.com"]
  48. """
  49. res = []
  50. for name, addr in email.utils.getaddresses([s]):
  51. res.append(email.utils.formataddr((name, addr)))
  52. return res
  53. def template_to_email(template_name, context):
  54. """
  55. Render a template with its context, parse the result and build an
  56. EmailMessage from it.
  57. """
  58. context.setdefault("default_from", "nm@debian.org")
  59. context.setdefault("default_subject", "Notification from nm.debian.org")
  60. text = render_to_string(template_name, context).strip()
  61. m = email.message_from_string(text.encode('utf-8'))
  62. msg = EmailMessage()
  63. msg.from_email = m.get("From", context["default_from"])
  64. msg.to = parse_recipient_list(m.get("To", EMAIL_PRIVATE_ANNOUNCES))
  65. if "Cc" in m: msg.cc = parse_recipient_list(m.get("Cc"))
  66. if "Bcc" in m: msg.bcc = parse_recipient_list(m.get("Bcc"))
  67. rt = m.get("Reply-To", None)
  68. if rt is not None: msg.extra_headers["Reply-To"] = rt
  69. msg.subject = m.get("Subject", context["default_subject"])
  70. msg.body = m.get_payload()
  71. return msg
  72. def send_notification(template_name, log_next, log_prev=None):
  73. """
  74. Render a notification email template for a transition from log_prev to log,
  75. then send the resulting email.
  76. """
  77. try:
  78. ctx = {
  79. "process": log_next.process,
  80. "log": log_next,
  81. "log_prev": log_prev,
  82. "default_subject": "Notification from nm.debian.org",
  83. }
  84. if log_next.changed_by is not None:
  85. ctx["default_from"] = log_next.changed_by.preferred_email
  86. msg = template_to_email(template_name, ctx)
  87. msg.send()
  88. log.debug("sent mail from %s to %s cc %s bcc %s subject %s",
  89. msg.from_email,
  90. ", ".join(msg.to),
  91. ", ".join(msg.cc),
  92. ", ".join(msg.bcc),
  93. msg.subject)
  94. except:
  95. # TODO: remove raise once it works
  96. raise
  97. log.debug("failed to sent mail for log %s", log_next)
  98. def send_nonce(template_name, person, nonce=None, encrypted_nonce=None):
  99. """
  100. Render an email template to send a nonce to a person,
  101. then send the resulting email.
  102. """
  103. if nonce is None:
  104. nonce = person.pending
  105. try:
  106. ctx = {
  107. "person": person,
  108. "nonce": nonce,
  109. "encrypted_nonce": encrypted_nonce,
  110. "reply_to": "todo@example.com",
  111. "default_subject": "Confirmation from nm.debian.org",
  112. }
  113. msg = template_to_email(template_name, ctx)
  114. msg.send()
  115. log.debug("sent mail from %s to %s cc %s bcc %s subject %s",
  116. msg.from_email,
  117. ", ".join(msg.to),
  118. ", ".join(msg.cc),
  119. ", ".join(msg.bcc),
  120. msg.subject)
  121. except:
  122. # TODO: remove raise once it works
  123. raise
  124. log.debug("failed to sent mail for person %s", person)
  125. def decode_header(val):
  126. res = []
  127. for buf, charset in email.header.decode_header(val):
  128. if charset is None:
  129. res.append(buf)
  130. elif charset == "unknown-8bit":
  131. res.append(buf.decode("utf-8", errors="replace"))
  132. else:
  133. res.append(buf.decode(charset, errors="replace"))
  134. return " ".join(res)
  135. def get_mbox_as_dicts(filename):
  136. try: ## we are reading, have not to flush with close
  137. for message in mailbox.mbox(filename, create=False):
  138. msg_dict = {'Body': get_body(message)}
  139. for hkey, hval in message.items():
  140. msg_dict[hkey] = decode_header(hval)
  141. yield msg_dict
  142. except mailbox.NoSuchMailboxError:
  143. return