米菲爸爸 发表于 2024-4-12 18:54:53

FireBeetle 2 ESP32 C6开发板 Micropython连接MQTT服务

本帖最后由 米菲爸爸 于 2024-4-12 19:08 编辑

当使用ESP32-C6与MicroPython连接MQTT服务时,可以创建一个轻量级的物联网应用程序。ESP32-C6是一款低功耗、高性能和高度集成的Wi-Fi和蓝牙解决方案,而MicroPython是一种方便的Python语言子集,适用于嵌入式开发。在本文中,我们将使用MicroPython和ESP32-C6来连接并发布/订阅MQTT消息。

当然由于Micropython开发Esp32系列开发板目前还都不是主流,所以难免经常踩坑。 步骤1:准备工作在开始之前,请确保你已经具备以下几点:1.一台安装了MicroPython固件的ESP32-C6设备。(刷固件可以参考文末视频)2.一个MQTT代理(例如EMQX)的服务器地址、端口号以及连接凭据(用户名和密码)。 步骤2:连接Wi-Fi首先,我们把所有的参数都放到一个json格式的配置文件里以便之后快速切换不同的MQTT服务
# 读取JSON配置文件

def read_config_file(filename):

    try:

      with open(filename, 'r') as file:

            config = json.load(file)

            return config

    except OSError:

      print("无法读取配置文件:", filename)

      return None



# 读取配置文件

config = read_config_file("configEMQX.json")




# 检查是否成功读取配置文件

if config is not None:

    # 获取配置项

    ssid, pwd = config.get("ssid"), config.get("pwd")
我们需要将ESP32-C6连接到Wi-Fi网络,以便能够与MQTT代理进行通信。在MicroPython中,我们可以使用`network`模块来进行Wi-Fi连接。以下是一个示例代码:
wlan = network.WLAN(network.STA_IF)

    wlan.active(True)

    if not wlan.isconnected() :

      #wlan.disconnect()

      #wlan.config(dhcp_hostname="")

      wlan.connect(ssid, pwd)

      retry_limit = 10# 设置重试次数上限为10

      retry_count = 0# 初始化重试计数器

      while not wlan.isconnected() and retry_count < retry_limit:

            retry_count += 1

            time.sleep_ms(1000)

            print("try to connecting", retry_count)

      

    if not wlan.isconnected():

      print("连接尝试次数已达上限,连接失败")

    else:

      print("成功连接到网络")

      print(wlan.ifconfig())

在配置文件里将"yourSSID"和"yourPWD"替换为你的Wi-Fi网络的凭据。代码将连接到指定的Wi-Fi网络,然后等待直到连接成功。 步骤3:安装和导入MQTT库在ESP32-C6上使用MicroPython连接uMQTT服务需要使用到uMQTT库,该库提供了uMQTT客户端的功能。首先,需要安装mip,在Thonny的Tools-> Manage Packages 下搜索安装mip组件

在代码中添加import mip

mip.install("umqtt.simple")
代码将使用mip模块来安装MQTT库。安装完成后,我们可以在代码中导入MQTT库: from umqtt.simple import MQTTClient
步骤4:连接MQTT代理现在,我们可以连接到MQTT代理并发布/订阅消息。请将以下代码替换为你的MQTT服务器地址、端口号以及连接凭据: mqtt_server = "你的MQTT服务器地址"

mqtt_port = 1883

mqtt_username = "你的MQTT用户名"

mqtt_password = "你的MQTT密码"

client = MQTTClient("esp32-c6-client", mqtt_server, port=mqtt_port, user=mqtt_username, password=mqtt_password)

client.connect()

print("已成功连接到MQTT代理")
在代码中,我们首先创建了一个名为`esp32-c6-client`的MQTT客户端实例。然后,使用`client.connect()`方法连接到MQTT代理。 步骤5:发布MQTT消息要发布MQTT消息,可以使用`client.publish()`方法。以下是一个示例,将"Hello,MQTT!"作为消息发布到名为`topic/test`的主题下: message = "Hello, MQTT!"

topic = "topic/test"

client.publish(topic, message)

print("已发布消息:", message)
在代码中,我们使用`client.publish()`方法,传递了主题和消息作为参数。 步骤6:订阅MQTT消息要订阅MQTT消息,可以使用`client.subscribe()`方法。以下是一个示例,订阅名为`topic/test`的主题: topic = "topic/test"

def on_message(topic, message):

    print("收到消息,主题:", topic.decode())

    print("消息内容:", message.decode())


client.set_callback(on_message)

client.subscribe(topic)


print("已订阅主题:", topic)
在代码中,我们使用了`client.subscribe()`方法来订阅指定的主题,并在`on_message`函数中定义了接收到消息时的处理逻辑。 步骤7:保持连接与断开连接为了保持与MQTT代理的持续连接,可以使用`client.check_msg()`方法定期检查新的消息或心跳包,并引导客户端保持连接。以下是一个示例代码,在循环中每隔1秒调用一次`client.check_msg()`方法: while True:

    client.check_msg()

    time.sleep(1)
为了断开与MQTT代理的连接,可以使用`client.disconnect()`方法: client.disconnect()

print("已断开与MQTT代理的连接")
步骤8:整合所有代码完成以上步骤后,你可以将所有的代码整合到一起,并进行部署和运行。下面是一个完整的示例代码: import network

from umqtt.simple import MQTTClient

import ujson as json

import time



# 读取JSON配置文件

def read_config_file(filename):

    try:

      with open(filename, 'r') as file:

            config = json.load(file)

            return config

    except OSError:

      print("无法读取配置文件:", filename)

      return None



# 读取配置文件

config = read_config_file("configEMQX.json")



def connectMQTT():

    client = MQTTClient(

      client_id=config.get("client_id").encode(),

      server=config.get("server").encode(),

      port=config.get("port"),

      user=config.get("user").encode(),

      password=config.get("password").encode(),

      keepalive=7200,

      ssl=False

    )



    client.connect()

    return client



# 检查是否成功读取配置文件

if config is not None:

    # 获取配置项

    ssid, pwd = config.get("ssid"), config.get("pwd")

   

    wlan = network.WLAN(network.STA_IF)

    wlan.active(True)

    if not wlan.isconnected() :

      #wlan.disconnect()

      #wlan.config(dhcp_hostname="")

      wlan.connect(ssid, pwd)

      retry_limit = 10# 设置重试次数上限为10

      retry_count = 0# 初始化重试计数器

      while not wlan.isconnected() and retry_count < retry_limit:

            retry_count += 1

            time.sleep_ms(1000)

            print("try to connecting", retry_count)

      

    if not wlan.isconnected():

      print("连接尝试次数已达上限,连接失败")

    else:

      print("成功连接到网络")

      print(wlan.ifconfig())

      

      #import mip

      #mip.install("umqtt.simple")

         

      mqttClient = connectMQTT()

      mqttClient.publish("topic", "value123")

      print("publish Done")

else :

    print("Config not exist")
下面是关于Esp32C6 开箱 刷固件 点灯 点屏幕 连接MQTT服务 的视频https://www.bilibili.com/video/BV1rJ4m1p7oq/
页: [1]
查看完整版本: FireBeetle 2 ESP32 C6开发板 Micropython连接MQTT服务