得劲滋润爽 发表于 2023-6-2 13:27:51

thonny更新固件和手机远程点灯

本帖最后由 得劲滋润爽 于 2023-6-2 13:30 编辑

thonny更新固件和手机远程点灯

板子:合宙ESP32-C3简约版
thonny:4.0.2
MQTT服务器:broker.emqx.io
在线调试助手:http://www.emqx.io/online-mqtt-client
手机APP:IOT MQTT panel
micropython固件:esp32c3-usb-20230426-v1.20.0


我的板子是已经有固件了,并且可以正常使用,如果你的条件不满足请使用乐鑫官方的flash_download_tool进行固件的烧写。(社区搜索ESP32 C3能搜到教程)


一、连接板子,烧写固件


报错了





[*]


按住boot键再插数据线试一下

OK了,如果还是失败或者刷完固件无法正常运行可以再尝试重新烧写一次,还不行就请使用乐鑫官方的烧写工具进行烧写固件。!




二、测试MQTT通信
'''
实验名称:MQTT通信
版本:v1.1
日期:2023.5
改编:maikemaker
说明:编程实现MQTT通信,实现接收到消息后打印出消息和topic并且往另一个topic发送一条消息
MQTT助手:http://www.emqx.io/online-mqtt-client
'''
import network,time
from simple import MQTTClient #导入MQTT板块
from machine import Pin,Timer


#WIFI连接函数
def WIFI_Connect():

    WIFI_LED=Pin(12, Pin.OUT) #初始化WIFI指示灯

    wlan = network.WLAN(network.STA_IF) #STA模式
    wlan.active(True)                   #激活接口
    start_time=time.time()            #记录时间做超时判断

    if not wlan.isconnected():
      print('connecting to network...')
      wlan.connect('waoo2111280', 'waoo2111280') #输入WIFI账号密码

      while not wlan.isconnected():

            #LED闪烁提示
            WIFI_LED.value(1)
            time.sleep_ms(300)
            WIFI_LED.value(0)
            time.sleep_ms(300)

            #超时判断,15秒没连接成功判定为超时
            if time.time()-start_time > 15 :
                print('WIFI Connected Timeout!')
                wlan.active(False) #反激活WiFi
                break

    if wlan.isconnected():
      #LED点亮
      WIFI_LED.value(1)

      #串口打印信息
      print('network information:', wlan.ifconfig())
      
      return True

    else:
      return False
   
#MQTT发布数据任务
def MQTT_Send(tim):
    client.publish(TOPIC2, 'Im fine.Thanks')

#设置MQTT回调函数,有信息时候执行
def MQTT_callback(topic, msg):
    print('topic: {}'.format(topic))
    print('msg: {}'.format(msg))
    MQTT_Send(None) #调用MQTT_Send()函数
   
#接收数据任务
def MQTT_Rev(tim):
    client.check_msg()



#执行WIFI连接函数并判断是否已经连接成功
if WIFI_Connect():

    SERVER = 'broker.emqx.io'# MQTT服务器
    PORT = 1883 #端口
    CLIENT_ID = '202306021143' # 客户端ID

程序效果,给接收的话题发送一个消息,接收到之后串口打印消息并给回复的话题回复一条消息。


上传MQTT库
import usocket as socket
import ustruct as struct
from ubinascii import hexlify

class MQTTException(Exception):
    pass

class MQTTClient:

    def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
               ssl=False, ssl_params={}):
      if port == 0:
            port = 8883 if ssl else 1883
      self.client_id = client_id
      self.sock = None
      self.server = server
      self.port = port
      self.ssl = ssl
      self.ssl_params = ssl_params
      self.pid = 0
      self.cb = None
      self.user = user
      self.pswd = password
      self.keepalive = keepalive
      self.lw_topic = None
      self.lw_msg = None
      self.lw_qos = 0
      self.lw_retain = False

    def _send_str(self, s):
      self.sock.write(struct.pack("!H", len(s)))
      self.sock.write(s)

    def _recv_len(self):
      n = 0
      sh = 0
      while 1:
            b = self.sock.read(1)
            n |= (b & 0x7f) << sh
            if not b & 0x80:
                return n
            sh += 7

    def set_callback(self, f):
      self.cb = f

    def set_last_will(self, topic, msg, retain=False, qos=0):
      assert 0 <= qos <= 2
      assert topic
      self.lw_topic = topic
      self.lw_msg = msg
      self.lw_qos = qos
      self.lw_retain = retain

    def connect(self, clean_session=True):
      self.sock = socket.socket()
      addr = socket.getaddrinfo(self.server, self.port)[-1]
      self.sock.connect(addr)
      if self.ssl:
            import ussl
            self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
      premsg = bytearray(b"\x10\0\0\0\0\0")
      msg = bytearray(b"\x04MQTT\x04\x02\0\0")

      sz = 10 + 2 + len(self.client_id)
      msg = clean_session << 1
      if self.user is not None:
            sz += 2 + len(self.user) + 2 + len(self.pswd)
            msg |= 0xC0
      if self.keepalive:
            assert self.keepalive < 65536
            msg |= self.keepalive >> 8
            msg |= self.keepalive & 0x00FF
      if self.lw_topic:
            sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
            msg |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
            msg |= self.lw_retain << 5

      i = 1
      while sz > 0x7f:
            premsg = (sz & 0x7f) | 0x80
            sz >>= 7
            i += 1
      premsg = sz

      self.sock.write(premsg, i + 2)
      self.sock.write(msg)
      #print(hex(len(msg)), hexlify(msg, ":"))
      self._send_str(self.client_id)
      if self.lw_topic:
            self._send_str(self.lw_topic)
            self._send_str(self.lw_msg)
      if self.user is not None:
            self._send_str(self.user)
            self._send_str(self.pswd)
      resp = self.sock.read(4)
      assert resp == 0x20 and resp == 0x02
      if resp != 0:
            raise MQTTException(resp)
      return resp & 1

    def disconnect(self):
      self.sock.write(b"\xe0\0")
      self.sock.close()

    def ping(self):
      self.sock.write(b"\xc0\0")

    def publish(self, topic, msg, retain=False, qos=0):
      pkt = bytearray(b"\x30\0\0\0")
      pkt |= qos << 1 | retain
      sz = 2 + len(topic) + len(msg)
      if qos > 0:
            sz += 2
      assert sz < 2097152
      i = 1
      while sz > 0x7f:
            pkt = (sz & 0x7f) | 0x80
            sz >>= 7
            i += 1
      pkt = sz
      #print(hex(len(pkt)), hexlify(pkt, ":"))
      self.sock.write(pkt, i + 1)
      self._send_str(topic)
      if qos > 0:
            self.pid += 1
            pid = self.pid
            struct.pack_into("!H", pkt, 0, pid)
            self.sock.write(pkt, 2)
      self.sock.write(msg)
      if qos == 1:
            while 1:
                op = self.wait_msg()
                if op == 0x40:
                  sz = self.sock.read(1)
                  assert sz == b"\x02"
                  rcv_pid = self.sock.read(2)
                  rcv_pid = rcv_pid << 8 | rcv_pid
                  if pid == rcv_pid:
                        return
      elif qos == 2:
            assert 0

    def subscribe(self, topic, qos=0):
      assert self.cb is not None, "Subscribe callback is not set"
      pkt = bytearray(b"\x82\0\0\0")
      self.pid += 1
      struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
      #print(hex(len(pkt)), hexlify(pkt, ":"))
      self.sock.write(pkt)
      self._send_str(topic)
      self.sock.write(qos.to_bytes(1, "little"))
      while 1:
            op = self.wait_msg()
            if op == 0x90:
                resp = self.sock.read(4)
                #print(resp)
                assert resp == pkt and resp == pkt
                if resp == 0x80:
                  raise MQTTException(resp)
                return

    # Wait for a single incoming MQTT message and process it.
    # Subscribed messages are delivered to a callback previously
    # set by .set_callback() method. Other (internal) MQTT
    # messages processed internally.
    def wait_msg(self):
      res = self.sock.read(1)
      self.sock.setblocking(True)
      if res is None:
            return None
      if res == b"":
            raise OSError(-1)
      if res == b"\xd0":# PINGRESP
            sz = self.sock.read(1)
            assert sz == 0
            return None
      op = res
      if op & 0xf0 != 0x30:
            return op
      sz = self._recv_len()
      topic_len = self.sock.read(2)
      topic_len = (topic_len << 8) | topic_len
      topic = self.sock.read(topic_len)
      sz -= topic_len + 2
      if op & 6:
            pid = self.sock.read(2)
            pid = pid << 8 | pid
            sz -= 2
      msg = self.sock.read(sz)
      self.cb(topic, msg)
      if op & 6 == 2:
            pkt = bytearray(b"\x40\x02\0\0")
            struct.pack_into("!H", pkt, 2, pid)
            self.sock.write(pkt)
      elif op & 6 == 4:
            assert 0

    # Checks whether a pending message from server is available.
    # If not, returns immediately with None. Otherwise, does
    # the same processing as wait_msg.
    def check_msg(self):
      self.sock.setblocking(False)
      return self.wait_msg()

新建文件,输入上面的代码,点击保存,选择保存到micropython设备,文件名simple.py

打开文件视图


运行程序

打开调试助手,进行调试



订阅程序里面的两个主题(Topic)

效果



三、点灯测试
'''
实验名称:MQTT通信
版本:v1.1
日期:2023.6
改编:maikemaker
说明:点灯测试
MQTT助手:http://www.emqx.io/online-mqtt-client
'''
import network,time
from simple import MQTTClient #导入MQTT板块
from machine import Pin,Timer

led=Pin(13,Pin.OUT,value=0)
#WIFI连接函数
def WIFI_Connect():

    WIFI_LED=Pin(12, Pin.OUT) #初始化WIFI指示灯

    wlan = network.WLAN(network.STA_IF) #STA模式
    wlan.active(True)                   #激活接口
    start_time=time.time()            #记录时间做超时判断

    if not wlan.isconnected():
      print('connecting to network...')
      wlan.connect('waoo2111280', 'waoo2111280') #输入WIFI账号密码

      while not wlan.isconnected():

            #LED闪烁提示
            WIFI_LED.value(1)
            time.sleep_ms(300)
            WIFI_LED.value(0)
            time.sleep_ms(300)

            #超时判断,15秒没连接成功判定为超时
            if time.time()-start_time > 15 :
                print('WIFI Connected Timeout!')
                wlan.active(False) #反激活WiFi
                break

    if wlan.isconnected():
      #LED点亮
      WIFI_LED.value(1)

      #串口打印信息
      print('network information:', wlan.ifconfig())
      
      return True

    else:
      return False
   
#MQTT发布数据任务
def MQTT_Send(tim):
    client.publish(TOPIC2, 'Im fine.Thanks')

#设置MQTT回调函数,有信息时候执行
def MQTT_callback(topic, msg):
    print('topic: {}'.format(topic))
    print('msg: {}'.format(msg))
    if msg==b'on':
      led.value(1)
    if msg==b'off':
      led.value(0)
    MQTT_Send(None) #调用MQTT_Send()函数
   
#接收数据任务
def MQTT_Rev(tim):
    client.check_msg()



#执行WIFI连接函数并判断是否已经连接成功
if WIFI_Connect():

    SERVER = 'broker.emqx.io'# MQTT服务器
    PORT = 1883
    CLIENT_ID = '202306021143' # 客户端ID
    #话题1
    TOPIC = '/T1/test/1' # TOPIC名称,接收消息的主题
    #话题2
    TOPIC2 = '/T2/test/2' # TOPIC名称,回复的主题
   
    client = MQTTClient(CLIENT_ID, SERVER, PORT)
    client.set_callback(MQTT_callback)#配置回调函数
    client.connect()
    client.subscribe(TOPIC) #订阅主题

    #开启RTOS定时器,编号为0或2,周期1000ms,执行MQTT发布
    tim = Timer(0)
    tim.init(period=300, mode=Timer.PERIODIC,callback=MQTT_Rev)



设置APP

添加两个按钮



测试


效果还不错吧
试一下开关控件


效果也不错。


不想用blynk和点灯的可以试试这个APP。
页: [1]
查看完整版本: thonny更新固件和手机远程点灯