本帖最后由 米菲爸爸 于 2024-4-12 19:08 编辑
当使用ESP32-C6与MicroPython连接MQTT服务时,可以创建一个轻量级的物联网应用程序。 ESP32-C6是一款低功耗、高性能和高度集成的Wi-Fi和蓝牙解决方案,而MicroPython是一种方便的Python语言子集,适用于嵌入式开发。 在本文中,我们将使用MicroPython和ESP32-C6来连接并发布/订阅MQTT消息。
当然由于Micropython开发Esp32系列开发板目前还都不是主流,所以难免经常踩坑。 步骤1:准备工作 在开始之前,请确保你已经具备以下几点: 1.一台安装了MicroPython固件的ESP32-C6设备。(刷固件可以参考文末视频) 2.一个MQTT代理(例如EMQX)的服务器地址、端口号以及连接凭据(用户名和密码)。 步骤2:连接Wi-Fi 首先,我们把所有的参数都放到一个json格式的配置文件里以便之后快速切换不同的MQTT服务
- # 读取JSON配置文件
-
- def read_config_file(filename):
-
- try:
-
- with open(filename, 'r') as file:
-
- config = json.load(file)
-
- return config
-
- except OSError:
-
- print("无法读取配置文件:", filename)
-
- return None
-
-
-
- # 读取配置文件
-
- config = read_config_file("configEMQX.json")
-
-
-
-
- # 检查是否成功读取配置文件
-
- if config is not None:
-
- # 获取配置项
-
- ssid, pwd = config.get("ssid"), config.get("pwd")
复制代码
我们需要将ESP32-C6连接到Wi-Fi网络,以便能够与MQTT代理进行通信。 在MicroPython中,我们可以使用`network`模块来进行Wi-Fi连接。以下是一个示例代码:
- wlan = network.WLAN(network.STA_IF)
-
- wlan.active(True)
-
- if not wlan.isconnected() :
-
- #wlan.disconnect()
-
- #wlan.config(dhcp_hostname="")
-
- wlan.connect(ssid, pwd)
-
- retry_limit = 10 # 设置重试次数上限为10
-
- retry_count = 0 # 初始化重试计数器
-
- while not wlan.isconnected() and retry_count < retry_limit:
-
- retry_count += 1
-
- time.sleep_ms(1000)
-
- print("try to connecting", retry_count)
-
-
-
- if not wlan.isconnected():
-
- print("连接尝试次数已达上限,连接失败")
-
- else:
-
- print("成功连接到网络")
-
- print(wlan.ifconfig())
复制代码
在配置文件里将"yourSSID"和"yourPWD"替换为你的Wi-Fi网络的凭据。代码将连接到指定的Wi-Fi网络,然后等待直到连接成功。 步骤3:安装和导入MQTT库 在ESP32-C6上使用MicroPython连接uMQTT服务需要使用到uMQTT库,该库提供了uMQTT客户端的功能。 首先,需要安装mip,在Thonny的Tools-> Manage Packages 下搜索安装mip组件
在代码中添加 - import mip
-
- mip.install("umqtt.simple")
复制代码
代码将使用mip模块来安装MQTT库。安装完成后,我们可以在代码中导入MQTT库: - from umqtt.simple import MQTTClient
复制代码
步骤4:连接MQTT代理 现在,我们可以连接到MQTT代理并发布/订阅消息。请将以下代码替换为你的MQTT服务器地址、端口号以及连接凭据: - mqtt_server = "你的MQTT服务器地址"
-
- mqtt_port = 1883
-
- mqtt_username = "你的MQTT用户名"
-
- mqtt_password = "你的MQTT密码"
-
- client = MQTTClient("esp32-c6-client", mqtt_server, port=mqtt_port, user=mqtt_username, password=mqtt_password)
-
- client.connect()
-
- print("已成功连接到MQTT代理")
复制代码
在代码中,我们首先创建了一个名为`esp32-c6-client`的MQTT客户端实例。然后,使用`client.connect()`方法连接到MQTT代理。 步骤5:发布MQTT消息 要发布MQTT消息,可以使用`client.publish()`方法。以下是一个示例,将"Hello,MQTT!"作为消息发布到名为`topic/test`的主题下: - message = "Hello, MQTT!"
-
- topic = "topic/test"
-
- client.publish(topic, message)
-
- print("已发布消息:", message)
复制代码
在代码中,我们使用`client.publish()`方法,传递了主题和消息作为参数。 步骤6:订阅MQTT消息 要订阅MQTT消息,可以使用`client.subscribe()`方法。以下是一个示例,订阅名为`topic/test`的主题: - topic = "topic/test"
-
- def on_message(topic, message):
-
- print("收到消息,主题:", topic.decode())
-
- print("消息内容:", message.decode())
-
-
- client.set_callback(on_message)
-
- client.subscribe(topic)
-
-
- print("已订阅主题:", topic)
复制代码
在代码中,我们使用了`client.subscribe()`方法来订阅指定的主题,并在`on_message`函数中定义了接收到消息时的处理逻辑。 步骤7:保持连接与断开连接 为了保持与MQTT代理的持续连接,可以使用`client.check_msg()`方法定期检查新的消息或心跳包,并引导客户端保持连接。以下是一个示例代码,在循环中每隔1秒调用一次`client.check_msg()`方法: - while True:
-
- client.check_msg()
-
- time.sleep(1)
复制代码
为了断开与MQTT代理的连接,可以使用`client.disconnect()`方法: - client.disconnect()
-
- print("已断开与MQTT代理的连接")
复制代码
步骤8:整合所有代码 完成以上步骤后,你可以将所有的代码整合到一起,并进行部署和运行。下面是一个完整的示例代码: - import network
-
- from umqtt.simple import MQTTClient
-
- import ujson as json
-
- import time
-
-
-
- # 读取JSON配置文件
-
- def read_config_file(filename):
-
- try:
-
- with open(filename, 'r') as file:
-
- config = json.load(file)
-
- return config
-
- except OSError:
-
- print("无法读取配置文件:", filename)
-
- return None
-
-
-
- # 读取配置文件
-
- config = read_config_file("configEMQX.json")
-
-
-
- def connectMQTT():
-
- client = MQTTClient(
-
- client_id=config.get("client_id").encode(),
-
- server=config.get("server").encode(),
-
- port=config.get("port"),
-
- user=config.get("user").encode(),
-
- password=config.get("password").encode(),
-
- keepalive=7200,
-
- ssl=False
-
- )
-
-
-
- client.connect()
-
- return client
-
-
-
- # 检查是否成功读取配置文件
-
- if config is not None:
-
- # 获取配置项
-
- ssid, pwd = config.get("ssid"), config.get("pwd")
-
-
-
- wlan = network.WLAN(network.STA_IF)
-
- wlan.active(True)
-
- if not wlan.isconnected() :
-
- #wlan.disconnect()
-
- #wlan.config(dhcp_hostname="")
-
- wlan.connect(ssid, pwd)
-
- retry_limit = 10 # 设置重试次数上限为10
-
- retry_count = 0 # 初始化重试计数器
-
- while not wlan.isconnected() and retry_count < retry_limit:
-
- retry_count += 1
-
- time.sleep_ms(1000)
-
- print("try to connecting", retry_count)
-
-
-
- if not wlan.isconnected():
-
- print("连接尝试次数已达上限,连接失败")
-
- else:
-
- print("成功连接到网络")
-
- print(wlan.ifconfig())
-
-
-
- #import mip
-
- #mip.install("umqtt.simple")
-
-
-
- mqttClient = connectMQTT()
-
- mqttClient.publish("topic", "value123")
-
- print("publish Done")
-
- else :
-
- print("Config not exist")
复制代码
下面是关于Esp32C6 开箱 刷固件 点灯 点屏幕 连接MQTT服务 的视频
|