FireBeetle 2 ESP32 C6开发板 Micropython连接MQTT服务
本帖最后由 米菲爸爸 于 2024-4-12 19:08 编辑当使用ESP32-C6与MicroPython连接MQTT服务时,可以创建一个轻量级的物联网应用程序。ESP32-C6是一款低功耗、高性能和高度集成的Wi-Fi和蓝牙解决方案,而MicroPython是一种方便的Python语言子集,适用于嵌入式开发。在本文中,我们将使用MicroPython和ESP32-C6来连接并发布/订阅MQTT消息。
当然由于Micropython开发Esp32系列开发板目前还都不是主流,所以难免经常踩坑。 步骤1:准备工作在开始之前,请确保你已经具备以下几点:1.一台安装了MicroPython固件的ESP32-C6设备。(刷固件可以参考文末视频)2.一个MQTT代理(例如EMQX)的服务器地址、端口号以及连接凭据(用户名和密码)。 步骤2:连接Wi-Fi首先,我们把所有的参数都放到一个json格式的配置文件里以便之后快速切换不同的MQTT服务
# 读取JSON配置文件
def read_config_file(filename):
try:
with open(filename, 'r') as file:
config = json.load(file)
return config
except OSError:
print("无法读取配置文件:", filename)
return None
# 读取配置文件
config = read_config_file("configEMQX.json")
# 检查是否成功读取配置文件
if config is not None:
# 获取配置项
ssid, pwd = config.get("ssid"), config.get("pwd")
我们需要将ESP32-C6连接到Wi-Fi网络,以便能够与MQTT代理进行通信。在MicroPython中,我们可以使用`network`模块来进行Wi-Fi连接。以下是一个示例代码:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected() :
#wlan.disconnect()
#wlan.config(dhcp_hostname="")
wlan.connect(ssid, pwd)
retry_limit = 10# 设置重试次数上限为10
retry_count = 0# 初始化重试计数器
while not wlan.isconnected() and retry_count < retry_limit:
retry_count += 1
time.sleep_ms(1000)
print("try to connecting", retry_count)
if not wlan.isconnected():
print("连接尝试次数已达上限,连接失败")
else:
print("成功连接到网络")
print(wlan.ifconfig())
在配置文件里将"yourSSID"和"yourPWD"替换为你的Wi-Fi网络的凭据。代码将连接到指定的Wi-Fi网络,然后等待直到连接成功。 步骤3:安装和导入MQTT库在ESP32-C6上使用MicroPython连接uMQTT服务需要使用到uMQTT库,该库提供了uMQTT客户端的功能。首先,需要安装mip,在Thonny的Tools-> Manage Packages 下搜索安装mip组件
在代码中添加import mip
mip.install("umqtt.simple")
代码将使用mip模块来安装MQTT库。安装完成后,我们可以在代码中导入MQTT库: from umqtt.simple import MQTTClient
步骤4:连接MQTT代理现在,我们可以连接到MQTT代理并发布/订阅消息。请将以下代码替换为你的MQTT服务器地址、端口号以及连接凭据: mqtt_server = "你的MQTT服务器地址"
mqtt_port = 1883
mqtt_username = "你的MQTT用户名"
mqtt_password = "你的MQTT密码"
client = MQTTClient("esp32-c6-client", mqtt_server, port=mqtt_port, user=mqtt_username, password=mqtt_password)
client.connect()
print("已成功连接到MQTT代理")
在代码中,我们首先创建了一个名为`esp32-c6-client`的MQTT客户端实例。然后,使用`client.connect()`方法连接到MQTT代理。 步骤5:发布MQTT消息要发布MQTT消息,可以使用`client.publish()`方法。以下是一个示例,将"Hello,MQTT!"作为消息发布到名为`topic/test`的主题下: message = "Hello, MQTT!"
topic = "topic/test"
client.publish(topic, message)
print("已发布消息:", message)
在代码中,我们使用`client.publish()`方法,传递了主题和消息作为参数。 步骤6:订阅MQTT消息要订阅MQTT消息,可以使用`client.subscribe()`方法。以下是一个示例,订阅名为`topic/test`的主题: topic = "topic/test"
def on_message(topic, message):
print("收到消息,主题:", topic.decode())
print("消息内容:", message.decode())
client.set_callback(on_message)
client.subscribe(topic)
print("已订阅主题:", topic)
在代码中,我们使用了`client.subscribe()`方法来订阅指定的主题,并在`on_message`函数中定义了接收到消息时的处理逻辑。 步骤7:保持连接与断开连接为了保持与MQTT代理的持续连接,可以使用`client.check_msg()`方法定期检查新的消息或心跳包,并引导客户端保持连接。以下是一个示例代码,在循环中每隔1秒调用一次`client.check_msg()`方法: while True:
client.check_msg()
time.sleep(1)
为了断开与MQTT代理的连接,可以使用`client.disconnect()`方法: client.disconnect()
print("已断开与MQTT代理的连接")
步骤8:整合所有代码完成以上步骤后,你可以将所有的代码整合到一起,并进行部署和运行。下面是一个完整的示例代码: import network
from umqtt.simple import MQTTClient
import ujson as json
import time
# 读取JSON配置文件
def read_config_file(filename):
try:
with open(filename, 'r') as file:
config = json.load(file)
return config
except OSError:
print("无法读取配置文件:", filename)
return None
# 读取配置文件
config = read_config_file("configEMQX.json")
def connectMQTT():
client = MQTTClient(
client_id=config.get("client_id").encode(),
server=config.get("server").encode(),
port=config.get("port"),
user=config.get("user").encode(),
password=config.get("password").encode(),
keepalive=7200,
ssl=False
)
client.connect()
return client
# 检查是否成功读取配置文件
if config is not None:
# 获取配置项
ssid, pwd = config.get("ssid"), config.get("pwd")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected() :
#wlan.disconnect()
#wlan.config(dhcp_hostname="")
wlan.connect(ssid, pwd)
retry_limit = 10# 设置重试次数上限为10
retry_count = 0# 初始化重试计数器
while not wlan.isconnected() and retry_count < retry_limit:
retry_count += 1
time.sleep_ms(1000)
print("try to connecting", retry_count)
if not wlan.isconnected():
print("连接尝试次数已达上限,连接失败")
else:
print("成功连接到网络")
print(wlan.ifconfig())
#import mip
#mip.install("umqtt.simple")
mqttClient = connectMQTT()
mqttClient.publish("topic", "value123")
print("publish Done")
else :
print("Config not exist")
下面是关于Esp32C6 开箱 刷固件 点灯 点屏幕 连接MQTT服务 的视频https://www.bilibili.com/video/BV1rJ4m1p7oq/
页:
[1]