MicroPython新版本中已经将ussl改名为ssl,但不知道为什么官方提供的umail.py没改
把掌控板中的umail替换为以下代码即可:
- import usocket
- import gc
- import ssl # 替换 ussl 为 ssl
-
- DEFAULT_TIMEOUT = 10 # sec
- LOCAL_DOMAIN = '127.0.0.1'
- CMD_EHLO = 'EHLO'
- CMD_STARTTLS = 'STARTTLS'
- CMD_AUTH = 'AUTH'
- CMD_MAIL = 'MAIL'
- AUTH_PLAIN = 'PLAIN'
- AUTH_LOGIN = 'LOGIN'
-
- # 加密过程
- def encrypt_string(message):
- encode_result = ""
- for char in message:
- char_int = ord(char)
- if char.isalpha(): # 判断是否为字母
- if 64 < char_int < 78 or 96 < char_int < 110: # 针对其中的部分字母进行加密
- encode_result += "00" + str((char_int + 13) * 2) + "|"
- else: # 对剩下字母进行加密
- encode_result += "01" + str(char_int - 23) + "|"
-
- elif '\u4e00' <= char <= '\u9fff': # 单个汉字可以这么判断
- encode_result += "02" + str(char_int + 24) + "|"
- else: # 对数字、特殊字符进行加密
- encode_result += "03" + str(char_int) + "|"
-
- return encode_result
-
- # 解密过程
- def decrypt_string(message):
- decode_result = ""
-
- # 将message转换为list
- message_list = message.split("|")
- message_list.remove("") # 移除list中的空元素
-
- for i in message_list:
- type_ = i[:2]
- char_number = int(i[2:])
- if type_ == "00":
- char_number = int(char_number / 2 - 13)
- elif type_ == "01":
- char_number = char_number + 23
- elif type_ == "02":
- char_number = char_number - 24
- else:
- char_number = char_number
-
- decode_result += chr(char_number)
-
- return decode_result
-
- DEFAULT_EMAIL = 'zhangkongban@163.com'
- DEFAULT_PASSWORD = encrypt_string('NTJTTHERKCSJFEXM')
-
- class SMTP:
- def cmd(self, cmd_str):
- sock = self._sock
- sock.write('%s\r\n' % cmd_str)
- resp = []
- next = True
- while next:
- code = sock.read(3)
- if not code: # 添加超时或断开连接检查
- break
- next = sock.read(1) == b'-'
- line = sock.readline().strip()
- if line:
- resp.append(line.decode())
- return int(code), resp
-
- def __init__(self, host, port, ssl_mode=False, username=None, password=None):
- self.username = username
- addr = usocket.getaddrinfo(host, port)[0][-1]
- sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
- sock.settimeout(DEFAULT_TIMEOUT)
- sock.connect(addr)
-
- # 使用 ssl 替代 ussl
- if ssl_mode:
- sock = ssl.wrap_socket(sock)
- code = int(sock.read(3))
- sock.readline() # 消耗剩余响应
-
- self._sock = sock
-
- # 读取初始响应(如果不是 SSL 模式)
- if not ssl_mode:
- code = int(sock.read(3))
- sock.readline() # 消耗剩余响应
- assert code == 220, '无法连接到服务器 %d' % code
-
- code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
- assert code == 250, 'EHLO 失败 %d' % code
-
- # 支持 STARTTLS
- if not ssl_mode and any(CMD_STARTTLS in r for r in resp):
- code, resp = self.cmd(CMD_STARTTLS)
- assert code == 220, 'STARTTLS 失败 %d, %s' % (code, resp)
- self._sock = ssl.wrap_socket(sock)
-
- if username and password:
- self.login(username, password)
-
- def login(self, username, password):
- self.username = username
- code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
- assert code == 250, 'EHLO 错误 %d, %s' % (code, resp)
-
- auths = None
- for feature in resp:
- if feature[:4].upper() == CMD_AUTH:
- auths = feature[4:].strip('=').upper().split()
- assert auths is not None, "服务器不支持认证方式"
-
- from ubinascii import b2a_base64 as b64
- if AUTH_PLAIN in auths:
- creds = "\0%s\0%s" % (username, password)
- cren = b64(creds.encode()).decode().strip()
- code, resp = self.cmd('%s %s %s' % (CMD_AUTH, AUTH_PLAIN, cren))
- elif AUTH_LOGIN in auths:
- user_b64 = b64(username.encode()).decode().strip()
- code, resp = self.cmd("%s %s %s" % (CMD_AUTH, AUTH_LOGIN, user_b64))
- assert code == 334, '用户名错误 %d, %s' % (code, resp)
- pass_b64 = b64(password.encode()).decode().strip()
- code, resp = self.cmd(pass_b64)
- else:
- raise Exception("认证方式(%s)不被支持" % ', '.join(auths))
-
- assert code in (235, 503), '认证错误 %d, %s' % (code, resp)
- return code, resp
-
- def to(self, addrs, mail_from=None):
- mail_from = self.username if mail_from is None else mail_from
- code, resp = self.cmd('MAIL FROM: <%s>' % mail_from)
- assert code == 250, '发件人拒绝 %d, %s' % (code, resp)
-
- if isinstance(addrs, str):
- addrs = [addrs]
- count = 0
- for addr in addrs:
- code, resp = self.cmd('RCPT TO: <%s>' % addr)
- if code not in (250, 251):
- print('%s 拒绝, %s' % (addr, resp))
- count += 1
- assert count != len(addrs), '收件人全部拒绝, %d, %s' % (code, resp)
-
- code, resp = self.cmd('DATA')
- assert code == 354, '数据拒绝, %d, %s' % (code, resp)
- return code, resp
-
- def write(self, content):
- if isinstance(content, str):
- content = content.encode('utf-8')
- self._sock.write(content)
-
- def send(self, content=''):
- if content:
- self.write(content)
- self._sock.write(b'\r\n.\r\n') # 结束标记
- code = int(self._sock.read(3))
- line = self._sock.readline().decode().strip()
- return code, line
-
- def quit(self):
- try:
- self.cmd("QUIT")
- finally:
- self._sock.close()
-
- def send_email(myusername, mypassword, target_email, SMTP_SERVER, subject, text):
- server_map = {
- 1: ('smtp.office365.com', 587),
- 2: ('smtp.qq.com', 587),
- 3: ('smtp.126.com', 25),
- 4: ('smtp.163.com', 25)
- }
-
- if SMTP_SERVER in server_map:
- host, port = server_map[SMTP_SERVER]
- ssl_mode = (port == 465) # 465端口使用SSL
- else:
- raise ValueError("无效的SMTP服务器选择")
-
- if myusername == DEFAULT_EMAIL:
- mypassword = decrypt_string(mypassword)
-
- try:
- gc.collect()
- smtp = SMTP(host, port, ssl_mode=ssl_mode)
- smtp.login(myusername, mypassword)
- smtp.to(target_email)
-
- # 邮件头
- headers = [
- "From: 掌控板 <{}>".format(myusername),
- "To: <{}>".format(target_email),
- "Subject: {}".format(subject),
- ""
- ]
-
- for header in headers:
- smtp.write(header + "\r\n")
-
- smtp.write(text + "\r\n")
- code, resp = smtp.send()
- smtp.quit()
-
- if code == 250:
- print("邮件发送成功!")
- else:
- print("邮件发送失败! 错误代码: {}, 响应: {}".format(code, resp))
- except Exception as e:
- print("邮件发送失败! 错误详情:")
- import sys
- sys.print_exception(e)
- raise e
复制代码
实测有效
|