本帖最后由 得劲滋润爽 于 2023-6-2 13:30 编辑
thonny更新固件和手机远程点灯
板子:合宙ESP32-C3简约版
thonny:4.0.2
MQTT服务器:broker.emqx.io
在线调试助手:http://www.emqx.io/online-mqtt-client
手机APP:IOT MQTT panel
micropython固件:esp32c3-usb-20230426-v1.20.0
我的板子是已经有固件了,并且可以正常使用,如果你的条件不满足请使用乐鑫官方的flash_download_tool进行固件的烧写。(社区搜索ESP32 C3能搜到教程)
一、连接板子,烧写固件
报错了
[tr][/tr]
按住boot键再插数据线试一下
OK了,如果还是失败或者刷完固件无法正常运行可以再尝试重新烧写一次,还不行就请使用乐鑫官方的烧写工具进行烧写固件。!
二、测试MQTT通信
- '''
- 实验名称:MQTT通信
- 版本:v1.1
- 日期:2023.5
- 改编:maikemaker
- 说明:编程实现MQTT通信,实现接收到消息后打印出消息和topic并且往另一个topic发送一条消息
- MQTT助手:http://www.emqx.io/online-mqtt-client
- '''
- import network,time
- from simple import MQTTClient #导入MQTT板块
- from machine import Pin,Timer
-
-
- #WIFI连接函数
- def WIFI_Connect():
-
- WIFI_LED=Pin(12, Pin.OUT) #初始化WIFI指示灯
-
- wlan = network.WLAN(network.STA_IF) #STA模式
- wlan.active(True) #激活接口
- start_time=time.time() #记录时间做超时判断
-
- if not wlan.isconnected():
- print('connecting to network...')
- wlan.connect('waoo2111280', 'waoo2111280') #输入WIFI账号密码
-
- while not wlan.isconnected():
-
- #LED闪烁提示
- WIFI_LED.value(1)
- time.sleep_ms(300)
- WIFI_LED.value(0)
- time.sleep_ms(300)
-
- #超时判断,15秒没连接成功判定为超时
- if time.time()-start_time > 15 :
- print('WIFI Connected Timeout!')
- wlan.active(False) #反激活WiFi
- break
-
- if wlan.isconnected():
- #LED点亮
- WIFI_LED.value(1)
-
- #串口打印信息
- print('network information:', wlan.ifconfig())
-
- return True
-
- else:
- return False
-
- #MQTT发布数据任务
- def MQTT_Send(tim):
- client.publish(TOPIC2, 'Im fine.Thanks')
-
- #设置MQTT回调函数,有信息时候执行
- def MQTT_callback(topic, msg):
- print('topic: {}'.format(topic))
- print('msg: {}'.format(msg))
- MQTT_Send(None) #调用MQTT_Send()函数
-
- #接收数据任务
- def MQTT_Rev(tim):
- client.check_msg()
-
-
-
- #执行WIFI连接函数并判断是否已经连接成功
- if WIFI_Connect():
-
- SERVER = 'broker.emqx.io'# MQTT服务器
- PORT = 1883 #端口
- CLIENT_ID = '202306021143' # 客户端ID
复制代码
程序效果,给接收的话题发送一个消息,接收到之后串口打印消息并给回复的话题回复一条消息。
上传MQTT库
- import usocket as socket
- import ustruct as struct
- from ubinascii import hexlify
-
- class MQTTException(Exception):
- pass
-
- class MQTTClient:
-
- def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
- ssl=False, ssl_params={}):
- if port == 0:
- port = 8883 if ssl else 1883
- self.client_id = client_id
- self.sock = None
- self.server = server
- self.port = port
- self.ssl = ssl
- self.ssl_params = ssl_params
- self.pid = 0
- self.cb = None
- self.user = user
- self.pswd = password
- self.keepalive = keepalive
- self.lw_topic = None
- self.lw_msg = None
- self.lw_qos = 0
- self.lw_retain = False
-
- def _send_str(self, s):
- self.sock.write(struct.pack("!H", len(s)))
- self.sock.write(s)
-
- def _recv_len(self):
- n = 0
- sh = 0
- while 1:
- b = self.sock.read(1)[0]
- n |= (b & 0x7f) << sh
- if not b & 0x80:
- return n
- sh += 7
-
- def set_callback(self, f):
- self.cb = f
-
- def set_last_will(self, topic, msg, retain=False, qos=0):
- assert 0 <= qos <= 2
- assert topic
- self.lw_topic = topic
- self.lw_msg = msg
- self.lw_qos = qos
- self.lw_retain = retain
-
- def connect(self, clean_session=True):
- self.sock = socket.socket()
- addr = socket.getaddrinfo(self.server, self.port)[0][-1]
- self.sock.connect(addr)
- if self.ssl:
- import ussl
- self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
- premsg = bytearray(b"\x10\0\0\0\0\0")
- msg = bytearray(b"\x04MQTT\x04\x02\0\0")
-
- sz = 10 + 2 + len(self.client_id)
- msg[6] = clean_session << 1
- if self.user is not None:
- sz += 2 + len(self.user) + 2 + len(self.pswd)
- msg[6] |= 0xC0
- if self.keepalive:
- assert self.keepalive < 65536
- msg[7] |= self.keepalive >> 8
- msg[8] |= self.keepalive & 0x00FF
- if self.lw_topic:
- sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
- msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
- msg[6] |= self.lw_retain << 5
-
- i = 1
- while sz > 0x7f:
- premsg[i] = (sz & 0x7f) | 0x80
- sz >>= 7
- i += 1
- premsg[i] = sz
-
- self.sock.write(premsg, i + 2)
- self.sock.write(msg)
- #print(hex(len(msg)), hexlify(msg, ":"))
- self._send_str(self.client_id)
- if self.lw_topic:
- self._send_str(self.lw_topic)
- self._send_str(self.lw_msg)
- if self.user is not None:
- self._send_str(self.user)
- self._send_str(self.pswd)
- resp = self.sock.read(4)
- assert resp[0] == 0x20 and resp[1] == 0x02
- if resp[3] != 0:
- raise MQTTException(resp[3])
- return resp[2] & 1
-
- def disconnect(self):
- self.sock.write(b"\xe0\0")
- self.sock.close()
-
- def ping(self):
- self.sock.write(b"\xc0\0")
-
- def publish(self, topic, msg, retain=False, qos=0):
- pkt = bytearray(b"\x30\0\0\0")
- pkt[0] |= qos << 1 | retain
- sz = 2 + len(topic) + len(msg)
- if qos > 0:
- sz += 2
- assert sz < 2097152
- i = 1
- while sz > 0x7f:
- pkt[i] = (sz & 0x7f) | 0x80
- sz >>= 7
- i += 1
- pkt[i] = sz
- #print(hex(len(pkt)), hexlify(pkt, ":"))
- self.sock.write(pkt, i + 1)
- self._send_str(topic)
- if qos > 0:
- self.pid += 1
- pid = self.pid
- struct.pack_into("!H", pkt, 0, pid)
- self.sock.write(pkt, 2)
- self.sock.write(msg)
- if qos == 1:
- while 1:
- op = self.wait_msg()
- if op == 0x40:
- sz = self.sock.read(1)
- assert sz == b"\x02"
- rcv_pid = self.sock.read(2)
- rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
- if pid == rcv_pid:
- return
- elif qos == 2:
- assert 0
-
- def subscribe(self, topic, qos=0):
- assert self.cb is not None, "Subscribe callback is not set"
- pkt = bytearray(b"\x82\0\0\0")
- self.pid += 1
- struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
- #print(hex(len(pkt)), hexlify(pkt, ":"))
- self.sock.write(pkt)
- self._send_str(topic)
- self.sock.write(qos.to_bytes(1, "little"))
- while 1:
- op = self.wait_msg()
- if op == 0x90:
- resp = self.sock.read(4)
- #print(resp)
- assert resp[1] == pkt[2] and resp[2] == pkt[3]
- if resp[3] == 0x80:
- raise MQTTException(resp[3])
- return
-
- # Wait for a single incoming MQTT message and process it.
- # Subscribed messages are delivered to a callback previously
- # set by .set_callback() method. Other (internal) MQTT
- # messages processed internally.
- def wait_msg(self):
- res = self.sock.read(1)
- self.sock.setblocking(True)
- if res is None:
- return None
- if res == b"":
- raise OSError(-1)
- if res == b"\xd0": # PINGRESP
- sz = self.sock.read(1)[0]
- assert sz == 0
- return None
- op = res[0]
- if op & 0xf0 != 0x30:
- return op
- sz = self._recv_len()
- topic_len = self.sock.read(2)
- topic_len = (topic_len[0] << 8) | topic_len[1]
- topic = self.sock.read(topic_len)
- sz -= topic_len + 2
- if op & 6:
- pid = self.sock.read(2)
- pid = pid[0] << 8 | pid[1]
- sz -= 2
- msg = self.sock.read(sz)
- self.cb(topic, msg)
- if op & 6 == 2:
- pkt = bytearray(b"\x40\x02\0\0")
- struct.pack_into("!H", pkt, 2, pid)
- self.sock.write(pkt)
- elif op & 6 == 4:
- assert 0
-
- # Checks whether a pending message from server is available.
- # If not, returns immediately with None. Otherwise, does
- # the same processing as wait_msg.
- def check_msg(self):
- self.sock.setblocking(False)
- return self.wait_msg()
-
复制代码
新建文件,输入上面的代码,点击保存,选择保存到micropython设备,文件名simple.py
打开文件视图
运行程序
打开调试助手,进行调试
订阅程序里面的两个主题(Topic)
效果
三、点灯测试
- '''
- 实验名称:MQTT通信
- 版本:v1.1
- 日期:2023.6
- 改编:maikemaker
- 说明:点灯测试
- MQTT助手:http://www.emqx.io/online-mqtt-client
- '''
- import network,time
- from simple import MQTTClient #导入MQTT板块
- from machine import Pin,Timer
-
- led=Pin(13,Pin.OUT,value=0)
- #WIFI连接函数
- def WIFI_Connect():
-
- WIFI_LED=Pin(12, Pin.OUT) #初始化WIFI指示灯
-
- wlan = network.WLAN(network.STA_IF) #STA模式
- wlan.active(True) #激活接口
- start_time=time.time() #记录时间做超时判断
-
- if not wlan.isconnected():
- print('connecting to network...')
- wlan.connect('waoo2111280', 'waoo2111280') #输入WIFI账号密码
-
- while not wlan.isconnected():
-
- #LED闪烁提示
- WIFI_LED.value(1)
- time.sleep_ms(300)
- WIFI_LED.value(0)
- time.sleep_ms(300)
-
- #超时判断,15秒没连接成功判定为超时
- if time.time()-start_time > 15 :
- print('WIFI Connected Timeout!')
- wlan.active(False) #反激活WiFi
- break
-
- if wlan.isconnected():
- #LED点亮
- WIFI_LED.value(1)
-
- #串口打印信息
- print('network information:', wlan.ifconfig())
-
- return True
-
- else:
- return False
-
- #MQTT发布数据任务
- def MQTT_Send(tim):
- client.publish(TOPIC2, 'Im fine.Thanks')
-
- #设置MQTT回调函数,有信息时候执行
- def MQTT_callback(topic, msg):
- print('topic: {}'.format(topic))
- print('msg: {}'.format(msg))
- if msg==b'on':
- led.value(1)
- if msg==b'off':
- led.value(0)
- MQTT_Send(None) #调用MQTT_Send()函数
-
- #接收数据任务
- def MQTT_Rev(tim):
- client.check_msg()
-
-
-
- #执行WIFI连接函数并判断是否已经连接成功
- if WIFI_Connect():
-
- SERVER = 'broker.emqx.io'# MQTT服务器
- PORT = 1883
- CLIENT_ID = '202306021143' # 客户端ID
- #话题1
- TOPIC = '/T1/test/1' # TOPIC名称,接收消息的主题
- #话题2
- TOPIC2 = '/T2/test/2' # TOPIC名称,回复的主题
-
- client = MQTTClient(CLIENT_ID, SERVER, PORT)
- client.set_callback(MQTT_callback) #配置回调函数
- client.connect()
- client.subscribe(TOPIC) #订阅主题
-
- #开启RTOS定时器,编号为0或2,周期1000ms,执行MQTT发布
- tim = Timer(0)
- tim.init(period=300, mode=Timer.PERIODIC,callback=MQTT_Rev)
-
复制代码
设置APP
添加两个按钮
测试
效果还不错吧
试一下开关控件
效果也不错。
不想用blynk和点灯的可以试试这个APP。
|