行空板M10 小智AI 聊天机器人
本帖最后由 云天 于 2025-2-26 11:04 编辑【项目背景】
近期,网络上涌现出众多基于 ESP32-S3 设备的“小智 AI聊天机器人”复刻项目。然而,这些项目大多在组装完成后直接下载固件,用户无法对程序进行进一步修改。(也可以在Windows上搭建 ESP IDF 5.3开发环境编译小智)
最近,有人开源了一种使用 Python 实现“小智 AI”的方案,该方案可在 Windows 电脑和树莓派上运行。鉴于行空板 M10 搭载了 Python 编程环境,我将相关代码移植至该平台,并进行了适配优化,使其能够更好地运行。
【小智AI ESP32S3】
小智AI聊天机器人是一款基于人工智能技术的交互式语音助手,具备语音识别、自然语言处理、语音合成等功能,能够通过语音与用户进行交流,通常基于ESP32-S3开发板等硬件平台实现,支持Wi-Fi或4G联网,并配备麦克风、扬声器、显示屏等模块。 开源平台:https://xiaozhi.me/
【小智AI Python】
开源网址:https://github.com/zhh827/py-xiaozhi,用python实现的小智客户端,用于代码学习和在没有硬件条件下体验AI小智的语音功能。大家可以选使用Mind+的python环境下试运行。
注意事项:1.通过Mind+的库管理,安装requirements.txt文件中的相关库。2.将opus.dll拷贝到至C:\Windows\System32目录中。3.需要手动修改py-xiaozhi.py脚本中的全局变量MAC_ADDR(字母小写),运行py-xiaozhi.py后,根据窗口中打印出的注册码,到https://xiaozhi.me/的控制台中进行注册。
【小智AI 行空板】
1.使用行空板上A、B键,实现开始录音、停止录音,与“小智”进行对话。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import json
import time
import requests
import paho.mqtt.client as mqtt
import threading
import pyaudio
import opuslib# windwos平台需要将opus.dll 拷贝到C:\Windows\System32
import socket
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from os import urandom
import logging
from pynput import keyboard as pynput_keyboard
from unihiker import GUI
import webrtcvad
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import NeoPixel
# 初始化行空板硬件接口
Board().begin()
gui=GUI()
pin1 = Pin(Pin.D23)
np1 = NeoPixel(pin1,24)
np1.brightness(128)
np1.clear()
gui.clear()
fontSize=20
max_lines = 16
max_chars=8
# 图形界面元素
status_label = gui.draw_text(x=80, y=10, text='初始化中...', color='red')
log_text = gui.draw_text(x=10, y=100, text='', font_size=fontSize,color='blue')
emotion = gui.draw_text(x=110, y=50, text='', font_size=fontSize,color='green')
OTA_VERSION_URL = 'https://api.tenclass.net/xiaozhi/ota/'
MAC_ADDR = '7a:da:6b:5c:76:50'
# {"mqtt":{"endpoint":"post-cn-apg3xckag01.mqtt.aliyuncs.com","client_id":"GID_test@@@cc_ba_97_20_b4_bc",
# "username":"Signature|LTAI5tF8J3CrdWmRiuTjxHbF|post-cn-apg3xckag01","password":"0mrkMFELXKyelhuYy2FpGDeCigU=",
# "publish_topic":"device-server","subscribe_topic":"devices"},"firmware":{"version":"0.9.9","url":""}}
mqtt_info = {}
aes_opus_info = {"type": "hello", "version": 3, "transport": "udp",
"udp": {"server": "120.24.160.13", "port": 8884, "encryption": "aes-128-ctr",
"key": "263094c3aa28cb42f3965a1020cb21a7", "nonce": "01000000ccba9720b4bc268100000000"},
"audio_params": {"format": "opus", "sample_rate": 24000, "channels": 1, "frame_duration": 60},
"session_id": "b23ebfe9"}
iot_msg = {"session_id": "635aa42d", "type": "iot",
"descriptors": [{"name": "Speaker", "description": "当前 AI 机器人的扬声器",
"properties": {"volume": {"description": "当前音量值", "type": "number"}},
"methods": {"SetVolume": {"description": "设置音量",
"parameters": {
"volume": {"description": "0到100之间的整数", "type": "number"}
}
}
}
},
{"name": "Lamp", "description": "一个测试用的灯",
"properties": {"power": {"description": "灯是否打开", "type": "boolean"}},
"methods": {"TurnOn": {"description": "打开灯", "parameters": {}},
"TurnOff": {"description": "关闭灯", "parameters": {}}
}
}
]
}
iot_status_msg = {"session_id": "635aa42d", "type": "iot", "states": [
{"name": "Speaker", "state": {"volume": 50}}, {"name": "Lamp", "state": {"power": False}}]}
goodbye_msg = {"session_id": "b23ebfe9", "type": "goodbye"}
local_sequence = 0
listen_state = None
tts_state = None
key_state = None
audio = None
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# udp_socket.setblocking(False)
conn_state = False
recv_audio_thread = threading.Thread()
send_audio_thread = threading.Thread()
mqttc = None
def get_ota_version():
global mqtt_info
header = {
'Device-Id': MAC_ADDR,
'Content-Type': 'application/json'
}
post_data = {"flash_size": 16777216, "minimum_free_heap_size": 8318916, "mac_address": f"{MAC_ADDR}",
"chip_model_name": "esp32s3", "chip_info": {"model": 9, "cores": 2, "revision": 2, "features": 18},
"application": {"name": "xiaozhi", "version": "0.9.9", "compile_time": "Jan 22 2025T20:40:23Z",
"idf_version": "v5.3.2-dirty",
"elf_sha256": "22986216df095587c42f8aeb06b239781c68ad8df80321e260556da7fcf5f522"},
"partition_table": [{"label": "nvs", "type": 1, "subtype": 2, "address": 36864, "size": 16384},
{"label": "otadata", "type": 1, "subtype": 0, "address": 53248, "size": 8192},
{"label": "phy_init", "type": 1, "subtype": 1, "address": 61440, "size": 4096},
{"label": "model", "type": 1, "subtype": 130, "address": 65536, "size": 983040},
{"label": "storage", "type": 1, "subtype": 130, "address": 1048576,
"size": 1048576},
{"label": "factory", "type": 0, "subtype": 0, "address": 2097152, "size": 4194304},
{"label": "ota_0", "type": 0, "subtype": 16, "address": 6291456, "size": 4194304},
{"label": "ota_1", "type": 0, "subtype": 17, "address": 10485760,
"size": 4194304}],
"ota": {"label": "factory"},
"board": {"type": "bread-compact-wifi", "ssid": "mzy", "rssi": -58, "channel": 6,
"ip": "192.168.124.38", "mac": "cc:ba:97:20:b4:bc"}}
response = requests.post(OTA_VERSION_URL, headers=header, data=json.dumps(post_data))
print('=========================')
print(response.text)
logging.info(f"get version: {response}")
mqtt_info = response.json()['mqtt']
def aes_ctr_encrypt(key, nonce, plaintext):
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(plaintext) + encryptor.finalize()
def aes_ctr_decrypt(key, nonce, ciphertext):
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext
def send_audio():
global aes_opus_info, udp_socket, local_sequence, listen_state, audio
key = aes_opus_info['udp']['key']
nonce = aes_opus_info['udp']['nonce']
server_ip = aes_opus_info['udp']['server']
server_port = aes_opus_info['udp']['port']
# 初始化Opus编码器
encoder = opuslib.Encoder(16000, 1, opuslib.APPLICATION_AUDIO)
# 打开麦克风流, 帧大小,应该与Opus帧大小匹配
mic = audio.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=960)
try:
while True:
if listen_state == "stop":
continue
time.sleep(0.1)
# 读取音频数据
data = mic.read(960)
# 编码音频数据
encoded_data = encoder.encode(data, 960)
# 打印音频数据
# print(f"Encoded data: {len(encoded_data)}")
# nonce插入data.size local_sequence_
local_sequence += 1
new_nonce = nonce + format(len(encoded_data), '04x') + nonce + format(local_sequence, '08x')
# 加密数据,添加nonce
encrypt_encoded_data = aes_ctr_encrypt(bytes.fromhex(key), bytes.fromhex(new_nonce), bytes(encoded_data))
data = bytes.fromhex(new_nonce) + encrypt_encoded_data
sent = udp_socket.sendto(data, (server_ip, server_port))
except Exception as e:
print(f"send audio err: {e}")
finally:
print("send audio exit()")
local_sequence = 0
udp_socket = None
# 关闭流和PyAudio
mic.stop_stream()
mic.close()
def recv_audio():
global aes_opus_info, udp_socket, audio
key = aes_opus_info['udp']['key']
nonce = aes_opus_info['udp']['nonce']
sample_rate = aes_opus_info['audio_params']['sample_rate']
frame_duration = aes_opus_info['audio_params']['frame_duration']
frame_num = int(frame_duration / (1000 / sample_rate))
print(f"recv audio: sample_rate -> {sample_rate}, frame_duration -> {frame_duration}, frame_num -> {frame_num}")
# 初始化Opus编码器
decoder = opuslib.Decoder(sample_rate, 1)
spk = audio.open(format=pyaudio.paInt16, channels=1, rate=sample_rate, output=True, frames_per_buffer=frame_num)
try:
while True:
data, server = udp_socket.recvfrom(4096)
# print(f"Received from server {server}: {len(data)}")
encrypt_encoded_data = data
# 解密数据,分离nonce
split_encrypt_encoded_data_nonce = encrypt_encoded_data[:16]
# 十六进制格式打印nonce
# print(f"split_encrypt_encoded_data_nonce: {split_encrypt_encoded_data_nonce.hex()}")
split_encrypt_encoded_data = encrypt_encoded_data
decrypt_data = aes_ctr_decrypt(bytes.fromhex(key),
split_encrypt_encoded_data_nonce,
split_encrypt_encoded_data)
# 解码播放音频数据
spk.write(decoder.decode(decrypt_data, frame_num))
# except BlockingIOError:
# # 无数据时短暂休眠以减少CPU占用
# time.sleep(0.1)
except Exception as e:
print(f"recv audio err: {e}")
finally:
udp_socket = None
spk.stop_stream()
spk.close()
def wrap_hanzi(text, first_line_width=5, other_line_width=16):
"""将字符串格式化为第一行指定宽度,后续行指定宽度"""
lines = []
# 处理第一行
if len(text) > first_line_width:
lines.append(text[:first_line_width])
remaining_text = text
else:
lines.append(text)
remaining_text = ""
# 处理后续行
for i in range(0, len(remaining_text), other_line_width):
lines.append(remaining_text)
return "\n".join(lines)
def get_ascii_emotion(emotion):
"""根据情绪类型返回对应的 ASCII 表情符号"""
if emotion == "happy":
return ":)"
elif emotion == "sad":
return ":("
elif emotion == "winking":
return ";)"
elif emotion == "surprised":
return ":O"
elif emotion == "angry":
return ">:(("
elif emotion == "laughing":
return ":D"
elif emotion == "cool":
return "B-)"
elif emotion == "crying":
return ":'("
elif emotion == "shy":
return "^_^"
elif emotion == "thinking":
return ":|"
elif emotion == "love":
return "<3"
elif emotion == "sleepy":
return "-.-"
elif emotion == "neutral":
return ":|"
elif emotion == "excited":
return ":D"
elif emotion == "confused":
return ":S"
else:
return ":("# 默认表情
def on_message(client, userdata, message):
global aes_opus_info, udp_socket, tts_state, recv_audio_thread, send_audio_thread,max_chars
msg = json.loads(message.payload)
print(f"recv msg: {msg}")
if msg['type'] == 'hello':
aes_opus_info = msg
udp_socket.connect((msg['udp']['server'], msg['udp']['port']))
# 检查recv_audio_thread线程是否启动
if not recv_audio_thread.is_alive():
# 启动一个线程,用于接收音频数据
recv_audio_thread = threading.Thread(target=recv_audio)
recv_audio_thread.start()
else:
print("recv_audio_thread is alive")
# 检查send_audio_thread线程是否启动
if not send_audio_thread.is_alive():
# 启动一个线程,用于发送音频数据
send_audio_thread = threading.Thread(target=send_audio)
send_audio_thread.start()
else:
print("send_audio_thread is alive")
if msg['type'] == 'llm':
ascii_emotion = get_ascii_emotion(msg['emotion'])
emotion.config(text=ascii_emotion)
if msg['type'] == 'tts' and msg['state']=='start':
status_label.config(text="讲话中……")
if msg['type'] == 'tts' and msg['state']=='stop':
status_label.config(text="就绪")
if msg['type'] == 'tts' and msg['state']=='sentence_start':
tts_state = msg['state']
text=msg['text']
text=wrap_hanzi(text, 5,max_chars)
log_text.config(text="小智: " + text)
if msg['text'] == '开灯':
np1.range_color(0,23,0x0000FF)
if msg['text'] == '关灯':
np1.clear()
if msg['type'] == 'stt':
text=msg['text']
text=wrap_hanzi(text, 5,max_chars)
log_text.config(text="我: " + text)
if msg['type'] == 'goodbye' and udp_socket and msg['session_id'] == aes_opus_info['session_id']:
print(f"recv good bye msg")
aes_opus_info['session_id'] = None
def on_connect(client, userdata, flags, rs, pr):
subscribe_topic = mqtt_info['subscribe_topic'].split("/") + '/p2p/GID_test@@@' + MAC_ADDR.replace(':', '_')
print(f"subscribe topic: {subscribe_topic}")
# 订阅主题
client.subscribe(subscribe_topic)
def push_mqtt_msg(message):
global mqtt_info, mqttc
mqttc.publish(mqtt_info['publish_topic'], json.dumps(message))
def listen_start():
global key_state, udp_socket, aes_opus_info, listen_state, conn_state
if key_state == "press":
return
key_state = "press"
# 判断是否需要发送hello消息
if conn_state is False or aes_opus_info['session_id'] is None:
conn_state = True
# 发送hello消息,建立udp连接
hello_msg = {"type": "hello", "version": 3, "transport": "udp",
"audio_params": {"format": "opus", "sample_rate": 16000, "channels": 1, "frame_duration": 60}}
push_mqtt_msg(hello_msg)
print(f"send hello message: {hello_msg}")
if tts_state == "start" or tts_state == "entence_start":
# 在播放状态下发送abort消息
push_mqtt_msg({"type": "abort"})
print(f"send abort message")
if aes_opus_info['session_id'] is not None:
# 发送start listen消息
msg = {"session_id": aes_opus_info['session_id'], "type": "listen", "state": "start", "mode": "manual"}
print(f"send start listen message: {msg}")
status_label.config(text="聆听中……")
push_mqtt_msg(msg)
def listen_stop():
global aes_opus_info, key_state
key_state = "release"
# 发送stop listen消息
if aes_opus_info['session_id'] is not None:
msg = {"session_id": aes_opus_info['session_id'], "type": "listen", "state": "stop"}
print(f"send stop listen message: {msg}")
push_mqtt_msg(msg)
def run():
global mqtt_info, mqttc
# 获取mqtt与版本信息
get_ota_version()
# 创建客户端实例
mqttc = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=mqtt_info['client_id'])
mqttc.username_pw_set(username=mqtt_info['username'], password=mqtt_info['password'])
mqttc.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=mqtt.ssl.CERT_REQUIRED,
tls_version=mqtt.ssl.PROTOCOL_TLS, ciphers=None)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.connect(host=mqtt_info['endpoint'], port=8883)
gui.on_a_click(listen_start)
gui.on_b_click(listen_stop)
mqttc.loop_forever()
if __name__ == "__main__":
audio = pyaudio.PyAudio()
run()
https://www.bilibili.com/video/BV17ZAGeYEmH/?share_source=copy_web
(演示视频中,让小智根据对话情景开关灯)
2.通过行空板引脚24外接按钮实现与“小智”对话
#增加了24引脚按键,使用舵机模拟双手
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import json
import time
import requests
import paho.mqtt.client as mqtt
import threading
import pyaudio
import opuslib# windwos平台需要将opus.dll 拷贝到C:\Windows\System32
import socket
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from os import urandom
import logging
from pynput import keyboard as pynput_keyboard
from unihiker import GUI
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import Servo
# 初始化行空板硬件接口
Board().begin()
gui=GUI()
gui.clear()
#舵机引脚及按键引脚设置
pin2 = Pin(Pin.D21)
pin3 = Pin(Pin.D22)
servo1 = Servo(pin2)
servo2 = Servo(pin3)
p_p24_in=Pin(Pin.P24, Pin.IN)
#显示内容设置
fontSize=20
max_lines = 16
max_chars=8
# 图形界面元素
status_label = gui.draw_text(x=80, y=10, text='初始化中...', color='red')
log_text = gui.draw_text(x=10, y=100, text='', font_size=fontSize,color='blue')
emotion = gui.draw_text(x=110, y=50, text='', font_size=fontSize,color='green')
OTA_VERSION_URL = 'https://api.tenclass.net/xiaozhi/ota/'
MAC_ADDR = '7a:da:6b:5c:76:50'
# {"mqtt":{"endpoint":"post-cn-apg3xckag01.mqtt.aliyuncs.com","client_id":"GID_test@@@cc_ba_97_20_b4_bc",
# "username":"Signature|LTAI5tF8J3CrdWmRiuTjxHbF|post-cn-apg3xckag01","password":"0mrkMFELXKyelhuYy2FpGDeCigU=",
# "publish_topic":"device-server","subscribe_topic":"devices"},"firmware":{"version":"0.9.9","url":""}}
mqtt_info = {}
aes_opus_info = {"type": "hello", "version": 3, "transport": "udp",
"udp": {"server": "120.24.160.13", "port": 8884, "encryption": "aes-128-ctr",
"key": "263094c3aa28cb42f3965a1020cb21a7", "nonce": "01000000ccba9720b4bc268100000000"},
"audio_params": {"format": "opus", "sample_rate": 24000, "channels": 1, "frame_duration": 60},
"session_id": None}
iot_msg = {"session_id": "635aa42d", "type": "iot",
"descriptors": [{"name": "Speaker", "description": "当前 AI 机器人的扬声器",
"properties": {"volume": {"description": "当前音量值", "type": "number"}},
"methods": {"SetVolume": {"description": "设置音量",
"parameters": {
"volume": {"description": "0到100之间的整数", "type": "number"}
}
}
}
},
{"name": "Lamp", "description": "一个测试用的灯",
"properties": {"power": {"description": "灯是否打开", "type": "boolean"}},
"methods": {"TurnOn": {"description": "打开灯", "parameters": {}},
"TurnOff": {"description": "关闭灯", "parameters": {}}
}
}
]
}
iot_status_msg = {"session_id": "635aa42d", "type": "iot", "states": [
{"name": "Speaker", "state": {"volume": 50}}, {"name": "Lamp", "state": {"power": False}}]}
goodbye_msg = {"session_id": "b23ebfe9", "type": "goodbye"}
local_sequence = 0
listen_state = None
tts_state = None
key_state = None
audio = None
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# udp_socket.setblocking(False)
conn_state = False
recv_audio_thread = threading.Thread()
send_audio_thread = threading.Thread()
anjian_thread = threading.Thread()
mqttc = None
# 抬手 放手函数
def handup():
servo1.write_angle(170)
servo2.write_angle(10)
def handdown():
servo1.write_angle(10)
servo2.write_angle(170)
def get_ota_version():
global mqtt_info
header = {
'Device-Id': MAC_ADDR,
'Content-Type': 'application/json'
}
post_data = {"flash_size": 16777216, "minimum_free_heap_size": 8318916, "mac_address": f"{MAC_ADDR}",
"chip_model_name": "esp32s3", "chip_info": {"model": 9, "cores": 2, "revision": 2, "features": 18},
"application": {"name": "xiaozhi", "version": "0.9.9", "compile_time": "Jan 22 2025T20:40:23Z",
"idf_version": "v5.3.2-dirty",
"elf_sha256": "22986216df095587c42f8aeb06b239781c68ad8df80321e260556da7fcf5f522"},
"partition_table": [{"label": "nvs", "type": 1, "subtype": 2, "address": 36864, "size": 16384},
{"label": "otadata", "type": 1, "subtype": 0, "address": 53248, "size": 8192},
{"label": "phy_init", "type": 1, "subtype": 1, "address": 61440, "size": 4096},
{"label": "model", "type": 1, "subtype": 130, "address": 65536, "size": 983040},
{"label": "storage", "type": 1, "subtype": 130, "address": 1048576,
"size": 1048576},
{"label": "factory", "type": 0, "subtype": 0, "address": 2097152, "size": 4194304},
{"label": "ota_0", "type": 0, "subtype": 16, "address": 6291456, "size": 4194304},
{"label": "ota_1", "type": 0, "subtype": 17, "address": 10485760,
"size": 4194304}],
"ota": {"label": "factory"},
"board": {"type": "bread-compact-wifi", "ssid": "mzy", "rssi": -58, "channel": 6,
"ip": "192.168.124.38", "mac": "cc:ba:97:20:b4:bc"}}
response = requests.post(OTA_VERSION_URL, headers=header, data=json.dumps(post_data))
print('=========================')
print(response.text)
logging.info(f"get version: {response}")
mqtt_info = response.json()['mqtt']
def aes_ctr_encrypt(key, nonce, plaintext):
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(plaintext) + encryptor.finalize()
def aes_ctr_decrypt(key, nonce, ciphertext):
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext
def send_audio():
global aes_opus_info, udp_socket, local_sequence, listen_state, audio
key = aes_opus_info['udp']['key']
nonce = aes_opus_info['udp']['nonce']
server_ip = aes_opus_info['udp']['server']
server_port = aes_opus_info['udp']['port']
# 初始化Opus编码器
encoder = opuslib.Encoder(16000, 1, opuslib.APPLICATION_AUDIO)
# 打开麦克风流, 帧大小,应该与Opus帧大小匹配
mic = audio.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=960)
try:
while True:
if listen_state == "stop":
continue
time.sleep(0.1)
# 读取音频数据
data = mic.read(960)
# 编码音频数据
encoded_data = encoder.encode(data, 960)
# 打印音频数据
# print(f"Encoded data: {len(encoded_data)}")
# nonce插入data.size local_sequence_
local_sequence += 1
new_nonce = nonce + format(len(encoded_data), '04x') + nonce + format(local_sequence, '08x')
# 加密数据,添加nonce
encrypt_encoded_data = aes_ctr_encrypt(bytes.fromhex(key), bytes.fromhex(new_nonce), bytes(encoded_data))
data = bytes.fromhex(new_nonce) + encrypt_encoded_data
sent = udp_socket.sendto(data, (server_ip, server_port))
except Exception as e:
print(f"send audio err: {e}")
finally:
print("send audio exit()")
local_sequence = 0
udp_socket = None
# 关闭流和PyAudio
mic.stop_stream()
mic.close()
def recv_audio():
global aes_opus_info, udp_socket, audio
key = aes_opus_info['udp']['key']
nonce = aes_opus_info['udp']['nonce']
sample_rate = aes_opus_info['audio_params']['sample_rate']
frame_duration = aes_opus_info['audio_params']['frame_duration']
frame_num = int(frame_duration / (1000 / sample_rate))
print(f"recv audio: sample_rate -> {sample_rate}, frame_duration -> {frame_duration}, frame_num -> {frame_num}")
# 初始化Opus编码器
decoder = opuslib.Decoder(sample_rate, 1)
spk = audio.open(format=pyaudio.paInt16, channels=1, rate=sample_rate, output=True, frames_per_buffer=frame_num)
try:
while True:
data, server = udp_socket.recvfrom(4096)
# print(f"Received from server {server}: {len(data)}")
encrypt_encoded_data = data
# 解密数据,分离nonce
split_encrypt_encoded_data_nonce = encrypt_encoded_data[:16]
# 十六进制格式打印nonce
# print(f"split_encrypt_encoded_data_nonce: {split_encrypt_encoded_data_nonce.hex()}")
split_encrypt_encoded_data = encrypt_encoded_data
decrypt_data = aes_ctr_decrypt(bytes.fromhex(key),
split_encrypt_encoded_data_nonce,
split_encrypt_encoded_data)
# 解码播放音频数据
spk.write(decoder.decode(decrypt_data, frame_num))
# except BlockingIOError:
# # 无数据时短暂休眠以减少CPU占用
# time.sleep(0.1)
except Exception as e:
print(f"recv audio err: {e}")
finally:
udp_socket = None
spk.stop_stream()
spk.close()
def wrap_hanzi(text, first_line_width=5, other_line_width=16):
"""将字符串格式化为第一行指定宽度,后续行指定宽度"""
lines = []
# 处理第一行
if len(text) > first_line_width:
lines.append(text[:first_line_width])
remaining_text = text
else:
lines.append(text)
remaining_text = ""
# 处理后续行
for i in range(0, len(remaining_text), other_line_width):
lines.append(remaining_text)
return "\n".join(lines)
def get_ascii_emotion(emotion):
"""根据情绪类型返回对应的 ASCII 表情符号"""
if emotion == "happy":
return ":)"
elif emotion == "sad":
return ":("
elif emotion == "winking":
return ";)"
elif emotion == "surprised":
return ":O"
elif emotion == "angry":
return ">:(("
elif emotion == "laughing":
return ":D"
elif emotion == "cool":
return "B-)"
elif emotion == "crying":
return ":'("
elif emotion == "shy":
return "^_^"
elif emotion == "thinking":
return ":|"
elif emotion == "love":
return "<3"
elif emotion == "sleepy":
return "-.-"
elif emotion == "neutral":
return ":|"
elif emotion == "excited":
return ":D"
elif emotion == "confused":
return ":S"
else:
return ":("# 默认表情
def on_message(client, userdata, message):
global aes_opus_info, udp_socket, tts_state, recv_audio_thread, send_audio_thread,max_chars,listen_state
msg = json.loads(message.payload)
print(f"recv msg: {msg}")
if msg['type'] == 'hello':
aes_opus_info = msg
udp_socket.connect((msg['udp']['server'], msg['udp']['port']))
# 检查recv_audio_thread线程是否启动
if not recv_audio_thread.is_alive():
# 启动一个线程,用于接收音频数据
recv_audio_thread = threading.Thread(target=recv_audio)
recv_audio_thread.start()
else:
print("recv_audio_thread is alive")
# 检查send_audio_thread线程是否启动
if not send_audio_thread.is_alive():
# 启动一个线程,用于发送音频数据
send_audio_thread = threading.Thread(target=send_audio)
send_audio_thread.start()
else:
print("send_audio_thread is alive")
if listen_state isNone:
listen_state="hello"
# 发送start listen消息
msg = {"session_id": aes_opus_info['session_id'], "type": "listen", "state": "start", "mode": "manual"}
print(f"send start listen message: {msg}")
status_label.config(text="聆听中……")
push_mqtt_msg(msg)
if msg['type'] == 'tts':
tts_state = msg['state']
if msg['type'] == 'llm':
ascii_emotion = get_ascii_emotion(msg['emotion'])
emotion.config(text=ascii_emotion)
if msg['type'] == 'tts' and msg['state']=='start':
status_label.config(text="讲话中……")
if msg['type'] == 'tts' and msg['state']=='stop':
status_label.config(text="就绪")
if msg['type'] == 'tts' and msg['state']=='sentence_start':
text=msg['text']
text=wrap_hanzi(text, 5,max_chars)
log_text.config(text="小智: " + text)
if msg['text'] == '举手':
handup()
if msg['text'] == '放下':
handdown()
if msg['type'] == 'stt':
text=msg['text']
text=wrap_hanzi(text, 5,max_chars)
log_text.config(text="我: " + text)
if msg['type'] == 'goodbye' and udp_socket and msg['session_id'] == aes_opus_info['session_id']:
print(f"recv good bye msg")
aes_opus_info['session_id'] = None
log_text.config(text="")
status_label.config(text="休息中")
def on_connect(client, userdata, flags, rs, pr):
subscribe_topic = mqtt_info['subscribe_topic'].split("/") + '/p2p/GID_test@@@' + MAC_ADDR.replace(':', '_')
print(f"subscribe topic: {subscribe_topic}")
# 订阅主题
client.subscribe(subscribe_topic)
status_label.config(text="就绪")
def push_mqtt_msg(message):
global mqtt_info, mqttc
mqttc.publish(mqtt_info['publish_topic'], json.dumps(message))
def listen_start():
global key_state, udp_socket, aes_opus_info, listen_state, conn_state
if key_state == "press":
return
key_state = "press"
# 判断是否需要发送hello消息
if conn_state is False or aes_opus_info['session_id'] is None:
conn_state = True
# 发送hello消息,建立udp连接
hello_msg = {"type": "hello", "version": 3, "transport": "udp",
"audio_params": {"format": "opus", "sample_rate": 16000, "channels": 1, "frame_duration": 60}}
push_mqtt_msg(hello_msg)
print(f"send hello message: {hello_msg}")
if tts_state == "start" or tts_state == "entence_start":
# 在播放状态下发送abort消息
push_mqtt_msg({"type": "abort"})
print(f"send abort message")
if aes_opus_info['session_id'] is not None:
# 发送start listen消息
msg = {"session_id": aes_opus_info['session_id'], "type": "listen", "state": "start", "mode": "manual"}
print(f"send start listen message: {msg}")
status_label.config(text="聆听中……")
push_mqtt_msg(msg)
def listen_stop():
global aes_opus_info, key_state
key_state = "release"
# 发送stop listen消息
if aes_opus_info['session_id'] is not None:
msg = {"session_id": aes_opus_info['session_id'], "type": "listen", "state": "stop"}
print(f"send stop listen message: {msg}")
push_mqtt_msg(msg)
def run():
global mqtt_info, mqttc
# 获取mqtt与版本信息
get_ota_version()
# 创建客户端实例
mqttc = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=mqtt_info['client_id'])
mqttc.username_pw_set(username=mqtt_info['username'], password=mqtt_info['password'])
mqttc.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=mqtt.ssl.CERT_REQUIRED,
tls_version=mqtt.ssl.PROTOCOL_TLS, ciphers=None)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.connect(host=mqtt_info['endpoint'], port=8883)
anjian_thread = threading.Thread(target=anjian)
anjian_thread.start()
mqttc.loop_forever()
def anjian():
bs=1
while True:
if p_p24_in.read_digital()==True:
if bs==1:
listen_start()
else:
listen_stop()
bs=1-bs
time.sleep(1)
if __name__ == "__main__":
audio = pyaudio.PyAudio()
run()
https://www.bilibili.com/video/BV1yaPceyExH/?share_source=copy_web
(演示视频中让小智根据对话情景实现“举手”、“放下”)
3.使用webrtcvad库进行人声检测,当检测到人声时,开始录音,当“静音”时,停止录音,实现不用手动操作,完成与小智连续对话。
## pip install paho-mqtt pyaudio opuslib cryptography webrtcvad
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import json
import time
import requests
import paho.mqtt.client as mqtt
import threading
import pyaudio
import opuslib# windwos平台需要将opus.dll 拷贝到C:\Windows\System32
import socket
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from os import urandom
import logging
from pynput import keyboard as pynput_keyboard
from unihiker import GUI
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import Servo
import webrtcvad
vad = webrtcvad.Vad(3)
# 初始化行空板硬件接口
Board().begin()
gui=GUI()
gui.clear()
#舵机引脚及按键引脚设置
pin2 = Pin(Pin.D21)
pin3 = Pin(Pin.D22)
servo1 = Servo(pin2)
servo2 = Servo(pin3)
p_p24_in=Pin(Pin.P24, Pin.IN)
#显示内容设置
fontSize=20
max_lines = 16
max_chars=8
# 图形界面元素
status_label = gui.draw_text(x=80, y=10, text='初始化中...', color='red')
log_text = gui.draw_text(x=10, y=100, text='', font_size=fontSize,color='blue')
emotion = gui.draw_text(x=110, y=50, text='', font_size=fontSize,color='green')
OTA_VERSION_URL = 'https://api.tenclass.net/xiaozhi/ota/'
MAC_ADDR = '7a:da:6b:5c:76:50'
# {"mqtt":{"endpoint":"post-cn-apg3xckag01.mqtt.aliyuncs.com","client_id":"GID_test@@@cc_ba_97_20_b4_bc",
# "username":"Signature|LTAI5tF8J3CrdWmRiuTjxHbF|post-cn-apg3xckag01","password":"0mrkMFELXKyelhuYy2FpGDeCigU=",
# "publish_topic":"device-server","subscribe_topic":"devices"},"firmware":{"version":"0.9.9","url":""}}
mqtt_info = {}
aes_opus_info = {"type": "hello", "version": 3, "transport": "udp",
"udp": {"server": "120.24.160.13", "port": 8884, "encryption": "aes-128-ctr",
"key": "263094c3aa28cb42f3965a1020cb21a7", "nonce": "01000000ccba9720b4bc268100000000"},
"audio_params": {"format": "opus", "sample_rate": 24000, "channels": 1, "frame_duration": 60},
"session_id": None}
iot_msg = {"session_id": "635aa42d", "type": "iot",
"descriptors": [{"name": "Speaker", "description": "当前 AI 机器人的扬声器",
"properties": {"volume": {"description": "当前音量值", "type": "number"}},
"methods": {"SetVolume": {"description": "设置音量",
"parameters": {
"volume": {"description": "0到100之间的整数", "type": "number"}
}
}
}
},
{"name": "Lamp", "description": "一个测试用的灯",
"properties": {"power": {"description": "灯是否打开", "type": "boolean"}},
"methods": {"TurnOn": {"description": "打开灯", "parameters": {}},
"TurnOff": {"description": "关闭灯", "parameters": {}}
}
}
]
}
iot_status_msg = {"session_id": "635aa42d", "type": "iot", "states": [
{"name": "Speaker", "state": {"volume": 50}}, {"name": "Lamp", "state": {"power": False}}]}
goodbye_msg = {"session_id": "b23ebfe9", "type": "goodbye"}
local_sequence = 0
listen_state = None
tts_state = None
key_state = None
audio = None
vaddata=None
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# udp_socket.setblocking(False)
conn_state = False
recv_audio_thread = threading.Thread()
send_audio_thread = threading.Thread()
anjian_thread = threading.Thread()
mqttc = None
# 抬手 放手函数
def handup():
servo1.write_angle(170)
servo2.write_angle(10)
def handdown():
servo1.write_angle(10)
servo2.write_angle(170)
def get_ota_version():
global mqtt_info
header = {
'Device-Id': MAC_ADDR,
'Content-Type': 'application/json'
}
post_data = {"flash_size": 16777216, "minimum_free_heap_size": 8318916, "mac_address": f"{MAC_ADDR}",
"chip_model_name": "esp32s3", "chip_info": {"model": 9, "cores": 2, "revision": 2, "features": 18},
"application": {"name": "xiaozhi", "version": "0.9.9", "compile_time": "Jan 22 2025T20:40:23Z",
"idf_version": "v5.3.2-dirty",
"elf_sha256": "22986216df095587c42f8aeb06b239781c68ad8df80321e260556da7fcf5f522"},
"partition_table": [{"label": "nvs", "type": 1, "subtype": 2, "address": 36864, "size": 16384},
{"label": "otadata", "type": 1, "subtype": 0, "address": 53248, "size": 8192},
{"label": "phy_init", "type": 1, "subtype": 1, "address": 61440, "size": 4096},
{"label": "model", "type": 1, "subtype": 130, "address": 65536, "size": 983040},
{"label": "storage", "type": 1, "subtype": 130, "address": 1048576,
"size": 1048576},
{"label": "factory", "type": 0, "subtype": 0, "address": 2097152, "size": 4194304},
{"label": "ota_0", "type": 0, "subtype": 16, "address": 6291456, "size": 4194304},
{"label": "ota_1", "type": 0, "subtype": 17, "address": 10485760,
"size": 4194304}],
"ota": {"label": "factory"},
"board": {"type": "bread-compact-wifi", "ssid": "mzy", "rssi": -58, "channel": 6,
"ip": "192.168.124.38", "mac": "cc:ba:97:20:b4:bc"}}
response = requests.post(OTA_VERSION_URL, headers=header, data=json.dumps(post_data))
print('=========================')
print(response.text)
logging.info(f"get version: {response}")
mqtt_info = response.json()['mqtt']
def aes_ctr_encrypt(key, nonce, plaintext):
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(plaintext) + encryptor.finalize()
def aes_ctr_decrypt(key, nonce, ciphertext):
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext
def send_audio():
global aes_opus_info, udp_socket, local_sequence, listen_state, audio,vaddata
key = aes_opus_info['udp']['key']
nonce = aes_opus_info['udp']['nonce']
server_ip = aes_opus_info['udp']['server']
server_port = aes_opus_info['udp']['port']
# 初始化Opus编码器
encoder = opuslib.Encoder(16000, 1, opuslib.APPLICATION_AUDIO)
# 打开麦克风流, 帧大小,应该与Opus帧大小匹配
mic = audio.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=480)
try:
while True:
if listen_state == "stop":
continue
time.sleep(0.1)
# 读取音频数据
vaddata = mic.read(480)
data=vaddata+mic.read(480)
# 编码音频数据
encoded_data = encoder.encode(data, 960)
# 打印音频数据
#print(f"Encoded data: {len(encoded_data)}")
# nonce插入data.size local_sequence_
local_sequence += 1
new_nonce = nonce + format(len(encoded_data), '04x') + nonce + format(local_sequence, '08x')
# 加密数据,添加nonce
encrypt_encoded_data = aes_ctr_encrypt(bytes.fromhex(key), bytes.fromhex(new_nonce), bytes(encoded_data))
data = bytes.fromhex(new_nonce) + encrypt_encoded_data
sent = udp_socket.sendto(data, (server_ip, server_port))
except Exception as e:
print(f"send audio err: {e}")
finally:
print("send audio exit()")
local_sequence = 0
udp_socket = None
# 关闭流和PyAudio
mic.stop_stream()
mic.close()
def recv_audio():
global aes_opus_info, udp_socket, audio,speekstoptime
key = aes_opus_info['udp']['key']
nonce = aes_opus_info['udp']['nonce']
sample_rate = aes_opus_info['audio_params']['sample_rate']
frame_duration = aes_opus_info['audio_params']['frame_duration']
frame_num = int(frame_duration / (1000 / sample_rate))
print(f"recv audio: sample_rate -> {sample_rate}, frame_duration -> {frame_duration}, frame_num -> {frame_num}")
# 初始化Opus编码器
decoder = opuslib.Decoder(sample_rate, 1)
spk = audio.open(format=pyaudio.paInt16, channels=1, rate=sample_rate, output=True, frames_per_buffer=frame_num)
try:
while True:
data, server = udp_socket.recvfrom(4096)
print(f"Received from server {server}: {len(data)}")
encrypt_encoded_data = data
# 解密数据,分离nonce
split_encrypt_encoded_data_nonce = encrypt_encoded_data[:16]
# 十六进制格式打印nonce
# print(f"split_encrypt_encoded_data_nonce: {split_encrypt_encoded_data_nonce.hex()}")
split_encrypt_encoded_data = encrypt_encoded_data
decrypt_data = aes_ctr_decrypt(bytes.fromhex(key),
split_encrypt_encoded_data_nonce,
split_encrypt_encoded_data)
# 解码播放音频数据
spk.write(decoder.decode(decrypt_data, frame_num))
speekstoptime=time.time()
# except BlockingIOError:
# # 无数据时短暂休眠以减少CPU占用
# time.sleep(0.1)
except Exception as e:
print(f"recv audio err: {e}")
finally:
udp_socket = None
spk.stop_stream()
spk.close()
def wrap_hanzi(text, first_line_width=5, other_line_width=16):
"""将字符串格式化为第一行指定宽度,后续行指定宽度"""
lines = []
# 处理第一行
if len(text) > first_line_width:
lines.append(text[:first_line_width])
remaining_text = text
else:
lines.append(text)
remaining_text = ""
# 处理后续行
for i in range(0, len(remaining_text), other_line_width):
lines.append(remaining_text)
return "\n".join(lines)
def get_ascii_emotion(emotion):
"""根据情绪类型返回对应的 ASCII 表情符号"""
if emotion == "happy":
return ":)"
elif emotion == "sad":
return ":("
elif emotion == "winking":
return ";)"
elif emotion == "surprised":
return ":O"
elif emotion == "angry":
return ">:(("
elif emotion == "laughing":
return ":D"
elif emotion == "cool":
return "B-)"
elif emotion == "crying":
return ":'("
elif emotion == "shy":
return "^_^"
elif emotion == "thinking":
return ":|"
elif emotion == "love":
return "<3"
elif emotion == "sleepy":
return "-.-"
elif emotion == "neutral":
return ":|"
elif emotion == "excited":
return ":D"
elif emotion == "confused":
return ":S"
else:
return ":("# 默认表情
def on_message(client, userdata, message):
global aes_opus_info, udp_socket, tts_state, recv_audio_thread, send_audio_thread,max_chars,listen_state,speekstate,speekstoptime
msg = json.loads(message.payload)
print(f"recv msg: {msg}")
if msg['type'] == 'hello':
aes_opus_info = msg
udp_socket.connect((msg['udp']['server'], msg['udp']['port']))
# 检查recv_audio_thread线程是否启动
if not recv_audio_thread.is_alive():
# 启动一个线程,用于接收音频数据
recv_audio_thread = threading.Thread(target=recv_audio)
recv_audio_thread.start()
else:
print("recv_audio_thread is alive")
# 检查send_audio_thread线程是否启动
if not send_audio_thread.is_alive():
# 启动一个线程,用于发送音频数据
send_audio_thread = threading.Thread(target=send_audio)
send_audio_thread.start()
else:
print("send_audio_thread is alive")
if listen_state isNone:
listen_state="hello"
# 发送start listen消息
msg = {"session_id": aes_opus_info['session_id'], "type": "listen", "state": "start", "mode": "manual"}
print(f"send start listen message: {msg}")
status_label.config(text="聆听中……")
push_mqtt_msg(msg)
if msg['type'] == 'tts':
tts_state = msg['state']
if msg['type'] == 'llm':
ascii_emotion = get_ascii_emotion(msg['emotion'])
emotion.config(text=ascii_emotion)
if msg['type'] == 'tts' and msg['state']=='start':
status_label.config(text="讲话中……")
speekstate=0
if msg['type'] == 'tts' and msg['state']=='stop':
status_label.config(text="就绪")
speekstate=1
if msg['type'] == 'tts' and msg['state']=='sentence_start':
text=msg['text']
text=wrap_hanzi(text, 5,max_chars)
log_text.config(text="小智: " + text)
if msg['text'] == '举手':
handup()
if msg['text'] == '放下':
handdown()
if msg['type'] == 'stt':
text=msg['text']
text=wrap_hanzi(text, 5,max_chars)
log_text.config(text="我: " + text)
if msg['type'] == 'goodbye' and udp_socket and msg['session_id'] == aes_opus_info['session_id']:
print(f"recv good bye msg")
aes_opus_info['session_id'] = None
listen_state= None
log_text.config(text="")
status_label.config(text="休息中")
# 关闭 UDP 连接
if udp_socket:
udp_socket.close()
udp_socket = None
for thread in (recv_audio_thread, send_audio_thread):
if thread and thread.is_alive():
thread.join(timeout=1)
def on_connect(client, userdata, flags, rs, pr):
subscribe_topic = mqtt_info['subscribe_topic'].split("/") + '/p2p/GID_test@@@' + MAC_ADDR.replace(':', '_')
print(f"subscribe topic: {subscribe_topic}")
# 订阅主题
client.subscribe(subscribe_topic)
status_label.config(text="就绪")
def push_mqtt_msg(message):
global mqtt_info, mqttc
mqttc.publish(mqtt_info['publish_topic'], json.dumps(message))
def listen_start():
global key_state, udp_socket, aes_opus_info, listen_state, conn_state
if key_state == "press":
return
key_state = "press"
# 判断是否需要发送hello消息
if conn_state is False or aes_opus_info['session_id'] is None:
# 清理旧连接
if udp_socket:
udp_socket.close()
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn_state = True
# 发送hello消息,建立udp连接
hello_msg = {"type": "hello", "version": 3, "transport": "udp",
"audio_params": {"format": "opus", "sample_rate": 16000, "channels": 1, "frame_duration": 60}}
push_mqtt_msg(hello_msg)
print(f"send hello message: {hello_msg}")
if tts_state == "start" or tts_state == "entence_start":
# 在播放状态下发送abort消息
push_mqtt_msg({"type": "abort"})
print(f"send abort message")
if aes_opus_info['session_id'] is not None:
# 发送start listen消息
msg = {"session_id": aes_opus_info['session_id'], "type": "listen", "state": "start", "mode": "manual"}
print(f"send start listen message: {msg}")
push_mqtt_msg(msg)
def listen_stop():
global aes_opus_info, key_state
key_state = "release"
# 发送stop listen消息
if aes_opus_info['session_id'] is not None:
msg = {"session_id": aes_opus_info['session_id'], "type": "listen", "state": "stop"}
print(f"send stop listen message: {msg}")
push_mqtt_msg(msg)
def run():
global mqtt_info, mqttc
# 获取mqtt与版本信息
get_ota_version()
# 创建客户端实例
mqttc = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=mqtt_info['client_id'])
mqttc.username_pw_set(username=mqtt_info['username'], password=mqtt_info['password'])
mqttc.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=mqtt.ssl.CERT_REQUIRED,
tls_version=mqtt.ssl.PROTOCOL_TLS, ciphers=None)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.connect(host=mqtt_info['endpoint'], port=8883)
anjian_thread = threading.Thread(target=anjian)
anjian_thread.start()
mqttc.loop_forever()
speekstate=0
speekstoptime=0
def anjian():
global vaddata,speekstate,speekstoptime
bs=0
last_voice_time=0
while True:
if vaddata is not None:
if vad.is_speech(vaddata, 16000) and speekstate==1 and time.time()-speekstoptime>1.5:
status_label.config(text="聆听中……")
print(".",end="")
bs=1
listen_start()
last_voice_time = time.time()
elif time.time() - last_voice_time >1.5 and bs==1:
bs=0
listen_stop()
else:
time.sleep(2)
bs=1
listen_start()
last_voice_time = time.time()
if __name__ == "__main__":
audio = pyaudio.PyAudio()
run()
https://www.bilibili.com/video/BV1iuPGeFEVG/?share_source=copy_web
(webrtcvad库检测效果一般,我使用的是模式 3:非常激进模式,灵敏度最高。)
4.借助行空板上的触摸屏,点击“按钮”,与小智对话
开源网址:https://github.com/Huang-junsen/py-xiaozhi.git,主要修改了“mqtt_client.py”、“gui.py”。
https://www.bilibili.com/video/BV1vuPGeFExD/?share_source=copy_web&vd_source=98855d5b99ff76982639c5ca6ff6f528
修改的代码文件:
页:
[1]