Merge branch 'mlimonci/encoding' into 'main'

Try both utf-8 and windows-1252 for decoding email

See merge request kernel-firmware/linux-firmware!61
main
Josh Boyer 2023-11-17 16:54:55 +00:00
commit 7124ce30ba
1 changed files with 44 additions and 15 deletions

View File

@ -34,6 +34,8 @@ content_types = {
def classify_content(content):
# load content into the email library
msg = email.message_from_string(content)
decoded = None
body = None

# check the subject
subject = msg["Subject"]
@ -42,17 +44,28 @@ def classify_content(content):
if "PATCH" in subject:
return ContentType.PATCH

for part in msg.walk():
if part.get_content_type() == "text/plain":
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True)
else:
body = msg.get_payload(decode=True)

if body:
for encoding in ["utf-8", "windows-1252"]:
try:
body = part.get_payload(decode=True).decode("utf-8")
for key in content_types.keys():
if key in body:
return content_types[key]
break
except UnicodeDecodeError as e:
logging.warning("Failed to decode email: %s, treating as SPAM" % e)
decoded = body.decode(encoding)
break
except UnicodeDecodeError:
pass

if decoded:
for key in content_types.keys():
if key in decoded:
return content_types[key]
else:
logging.warning("Failed to decode email: %s, treating as SPAM", body)

return ContentType.SPAM


@ -68,6 +81,11 @@ def quiet_cmd(cmd):


def reply_email(content, branch):
user = None
password = None
server = None
port = None

if "SMTP_USER" in os.environ:
user = os.environ["SMTP_USER"]
if "SMTP_PASS" in os.environ:
@ -83,15 +101,26 @@ def reply_email(content, branch):
reply = email.message.EmailMessage()

orig = email.message_from_string(content)
reply["To"] = ", ".join(
email.utils.formataddr(t)
for t in email.utils.getaddresses(
orig.get_all("from", []) + orig.get_all("to", []) + orig.get_all("cc", [])
try:
reply["To"] = ", ".join(
email.utils.formataddr(t)
for t in email.utils.getaddresses(
orig.get_all("from", [])
+ orig.get_all("to", [])
+ orig.get_all("cc", [])
)
)
)
except ValueError:
logging.warning("Failed to parse email addresses, not sending email")
return

reply["From"] = "linux-firmware@kernel.org"
reply["Subject"] = "Re: {}".format(orig["Subject"])
try:
reply["Subject"] = "Re: {}".format(orig["Subject"])
except ValueError:
logging.warning("Failed to parse subject, not sending email")
return

reply["In-Reply-To"] = orig["Message-Id"]
reply["References"] = orig["Message-Id"]
reply["Thread-Topic"] = orig["Thread-Topic"]