2025-7-29 22:45:15 [显示全部楼层]
10浏览
查看: 10|回复: 0

[求助] mpython 使用smtp发送邮件时ussl库未找到的解决方案

[复制链接]
MicroPython新版本中已经将ussl改名为ssl,但不知道为什么官方提供的umail.py没改
掌控板中的umail替换为以下代码即可:
  1. import usocket
  2. import gc
  3. import ssl  # 替换 ussl 为 ssl
  4. DEFAULT_TIMEOUT = 10  # sec
  5. LOCAL_DOMAIN = '127.0.0.1'
  6. CMD_EHLO = 'EHLO'
  7. CMD_STARTTLS = 'STARTTLS'
  8. CMD_AUTH = 'AUTH'
  9. CMD_MAIL = 'MAIL'
  10. AUTH_PLAIN = 'PLAIN'
  11. AUTH_LOGIN = 'LOGIN'
  12. # 加密过程
  13. def encrypt_string(message):
  14.     encode_result = ""
  15.     for char in message:
  16.         char_int = ord(char)
  17.         if char.isalpha():  # 判断是否为字母
  18.             if 64 < char_int < 78 or 96 < char_int < 110:  # 针对其中的部分字母进行加密
  19.                 encode_result += "00" + str((char_int + 13) * 2) + "|"
  20.             else:  # 对剩下字母进行加密
  21.                 encode_result += "01" + str(char_int - 23) + "|"
  22.         elif '\u4e00' <= char <= '\u9fff':  # 单个汉字可以这么判断
  23.             encode_result += "02" + str(char_int + 24) + "|"
  24.         else:  # 对数字、特殊字符进行加密
  25.             encode_result += "03" + str(char_int) + "|"
  26.     return encode_result
  27. # 解密过程
  28. def decrypt_string(message):
  29.     decode_result = ""
  30.     # 将message转换为list
  31.     message_list = message.split("|")
  32.     message_list.remove("")  # 移除list中的空元素
  33.     for i in message_list:
  34.         type_ = i[:2]
  35.         char_number = int(i[2:])
  36.         if type_ == "00":
  37.             char_number = int(char_number / 2 - 13)
  38.         elif type_ == "01":
  39.             char_number = char_number + 23
  40.         elif type_ == "02":
  41.             char_number = char_number - 24
  42.         else:
  43.             char_number = char_number
  44.         decode_result += chr(char_number)
  45.     return decode_result
  46. DEFAULT_EMAIL = 'zhangkongban@163.com'
  47. DEFAULT_PASSWORD = encrypt_string('NTJTTHERKCSJFEXM')
  48. class SMTP:
  49.     def cmd(self, cmd_str):
  50.         sock = self._sock
  51.         sock.write('%s\r\n' % cmd_str)
  52.         resp = []
  53.         next = True
  54.         while next:
  55.             code = sock.read(3)
  56.             if not code:  # 添加超时或断开连接检查
  57.                 break
  58.             next = sock.read(1) == b'-'
  59.             line = sock.readline().strip()
  60.             if line:
  61.                 resp.append(line.decode())
  62.         return int(code), resp
  63.     def __init__(self, host, port, ssl_mode=False, username=None, password=None):
  64.         self.username = username
  65.         addr = usocket.getaddrinfo(host, port)[0][-1]
  66.         sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
  67.         sock.settimeout(DEFAULT_TIMEOUT)
  68.         sock.connect(addr)
  69.         
  70.         # 使用 ssl 替代 ussl
  71.         if ssl_mode:
  72.             sock = ssl.wrap_socket(sock)
  73.             code = int(sock.read(3))
  74.             sock.readline()  # 消耗剩余响应
  75.         
  76.         self._sock = sock
  77.         
  78.         # 读取初始响应(如果不是 SSL 模式)
  79.         if not ssl_mode:
  80.             code = int(sock.read(3))
  81.             sock.readline()  # 消耗剩余响应
  82.             assert code == 220, '无法连接到服务器 %d' % code
  83.         code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
  84.         assert code == 250, 'EHLO 失败 %d' % code
  85.         
  86.         # 支持 STARTTLS
  87.         if not ssl_mode and any(CMD_STARTTLS in r for r in resp):
  88.             code, resp = self.cmd(CMD_STARTTLS)
  89.             assert code == 220, 'STARTTLS 失败 %d, %s' % (code, resp)
  90.             self._sock = ssl.wrap_socket(sock)
  91.         if username and password:
  92.             self.login(username, password)
  93.     def login(self, username, password):
  94.         self.username = username
  95.         code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
  96.         assert code == 250, 'EHLO 错误 %d, %s' % (code, resp)
  97.         auths = None
  98.         for feature in resp:
  99.             if feature[:4].upper() == CMD_AUTH:
  100.                 auths = feature[4:].strip('=').upper().split()
  101.         assert auths is not None, "服务器不支持认证方式"
  102.         from ubinascii import b2a_base64 as b64
  103.         if AUTH_PLAIN in auths:
  104.             creds = "\0%s\0%s" % (username, password)
  105.             cren = b64(creds.encode()).decode().strip()
  106.             code, resp = self.cmd('%s %s %s' % (CMD_AUTH, AUTH_PLAIN, cren))
  107.         elif AUTH_LOGIN in auths:
  108.             user_b64 = b64(username.encode()).decode().strip()
  109.             code, resp = self.cmd("%s %s %s" % (CMD_AUTH, AUTH_LOGIN, user_b64))
  110.             assert code == 334, '用户名错误 %d, %s' % (code, resp)
  111.             pass_b64 = b64(password.encode()).decode().strip()
  112.             code, resp = self.cmd(pass_b64)
  113.         else:
  114.             raise Exception("认证方式(%s)不被支持" % ', '.join(auths))
  115.         assert code in (235, 503), '认证错误 %d, %s' % (code, resp)
  116.         return code, resp
  117.     def to(self, addrs, mail_from=None):
  118.         mail_from = self.username if mail_from is None else mail_from
  119.         code, resp = self.cmd('MAIL FROM: <%s>' % mail_from)
  120.         assert code == 250, '发件人拒绝 %d, %s' % (code, resp)
  121.         if isinstance(addrs, str):
  122.             addrs = [addrs]
  123.         count = 0
  124.         for addr in addrs:
  125.             code, resp = self.cmd('RCPT TO: <%s>' % addr)
  126.             if code not in (250, 251):
  127.                 print('%s 拒绝, %s' % (addr, resp))
  128.                 count += 1
  129.         assert count != len(addrs), '收件人全部拒绝, %d, %s' % (code, resp)
  130.         code, resp = self.cmd('DATA')
  131.         assert code == 354, '数据拒绝, %d, %s' % (code, resp)
  132.         return code, resp
  133.     def write(self, content):
  134.         if isinstance(content, str):
  135.             content = content.encode('utf-8')
  136.         self._sock.write(content)
  137.     def send(self, content=''):
  138.         if content:
  139.             self.write(content)
  140.         self._sock.write(b'\r\n.\r\n')  # 结束标记
  141.         code = int(self._sock.read(3))
  142.         line = self._sock.readline().decode().strip()
  143.         return code, line
  144.     def quit(self):
  145.         try:
  146.             self.cmd("QUIT")
  147.         finally:
  148.             self._sock.close()
  149. def send_email(myusername, mypassword, target_email, SMTP_SERVER, subject, text):
  150.     server_map = {
  151.         1: ('smtp.office365.com', 587),
  152.         2: ('smtp.qq.com', 587),
  153.         3: ('smtp.126.com', 25),
  154.         4: ('smtp.163.com', 25)
  155.     }
  156.    
  157.     if SMTP_SERVER in server_map:
  158.         host, port = server_map[SMTP_SERVER]
  159.         ssl_mode = (port == 465)  # 465端口使用SSL
  160.     else:
  161.         raise ValueError("无效的SMTP服务器选择")
  162.     if myusername == DEFAULT_EMAIL:
  163.         mypassword = decrypt_string(mypassword)
  164.     try:
  165.         gc.collect()
  166.         smtp = SMTP(host, port, ssl_mode=ssl_mode)
  167.         smtp.login(myusername, mypassword)
  168.         smtp.to(target_email)
  169.         
  170.         # 邮件头
  171.         headers = [
  172.             "From: 掌控板 <{}>".format(myusername),
  173.             "To: <{}>".format(target_email),
  174.             "Subject: {}".format(subject),
  175.             ""
  176.         ]
  177.         
  178.         for header in headers:
  179.             smtp.write(header + "\r\n")
  180.         
  181.         smtp.write(text + "\r\n")
  182.         code, resp = smtp.send()
  183.         smtp.quit()
  184.         
  185.         if code == 250:
  186.             print("邮件发送成功!")
  187.         else:
  188.             print("邮件发送失败! 错误代码: {}, 响应: {}".format(code, resp))
  189.     except Exception as e:
  190.         print("邮件发送失败! 错误详情:")
  191.         import sys
  192.         sys.print_exception(e)
  193.         raise e
复制代码
实测有效
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

为本项目制作心愿单
购买心愿单
心愿单 编辑
[[wsData.name]]

硬件清单

  • [[d.name]]
btnicon
我也要做!
点击进入购买页面
上海智位机器人股份有限公司 沪ICP备09038501号-4 备案 沪公网安备31011502402448

© 2013-2025 Comsenz Inc. Powered by Discuz! X3.4 Licensed

mail