5064浏览
查看: 5064|回复: 3

[M10项目] 行空板 星火认知大模型 桌面助手AI兔

[复制链接]
本帖最后由 云天 于 2023-8-6 19:57 编辑

【项目背景】
7月17日,我带学生到浙江诸暨参加第二十四届全国学生信息素养提升实践活动,在科技之夜,我们聆听了科大讯飞对星火认知大模型的介绍。讯飞星火认知大模型是以中文为核心的新一代认知智能大模型,拥有跨领域的知识和语言理解能力,能够基于自然对话方式理解与执行任务。
【项目设计】
硬件使用行空板,使用讯飞语音识别,将识别结果送给讯飞星火认知大模型,反馈结果用讯飞语音合成进行语音输出。
【项目硬件】
行空板 星火认知大模型 桌面助手AI兔图4
因行空板无无音频输出,使用即插即用的免驱USB声卡,实现音频外置播放功能。
【注册申请】
讯飞开放平台注册,创建应用,申请星火认识大模型试用。获取APPID、APISecret、APIKey。
行空板 星火认知大模型 桌面助手AI兔图5
【语音识别】
  1. ''' 在线语音识别 '''
  2. import websocket
  3. import hashlib
  4. import base64
  5. import hmac
  6. import json
  7. from urllib.parse import urlencode
  8. import time
  9. import ssl
  10. from wsgiref.handlers import format_date_time
  11. from datetime import datetime
  12. from time import mktime
  13. import _thread as thread
  14. import pyaudio
  15. from unihiker import GUI
  16. u_gui=GUI()
  17. 显示内容=u_gui.draw_text(text="行空板chat",x=0,y=0,font_size=20, color="#0000FF")
  18. recording_results=""   # 识别结果
  19. STATUS_FIRST_FRAME = 0  # 第一帧的标识
  20. STATUS_CONTINUE_FRAME = 1  # 中间帧标识
  21. STATUS_LAST_FRAME = 2  # 最后一帧的标识
  22. class Ws_Param_record(object):
  23.     # 初始化接口对象
  24.     def __init__(self,APPID,APIKey,APISecret):
  25.         # 初始化讯飞接口的ID,Key,Secret
  26.         self.APPID=APPID
  27.         self.APIKey=APIKey
  28.         self.APISecret=APISecret
  29.         # 公共参数(common)
  30.         self.CommonArgs={"app_id":self.APPID}
  31.         # 业务参数(business)
  32.         self.BusinessArgs={"domain":"iat","language":"zh_cn",
  33.                            "accent":"mandarin","vinfo":1,"vad_eos":1000}
  34.     def create_url(self):
  35.         # 生成url
  36.         url='wss://ws-api.xfyun.cn/v2/iat'
  37.         now=datetime.now()
  38.         date=format_date_time(mktime(now.timetuple()))
  39.         # 生成RFC1123格式的时间戳
  40.         signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
  41.         signature_origin += "date: " + date + "\n"
  42.         signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
  43.         # 拼接字符串
  44.         signature_sha = hmac.new(self.APISecret.encode('utf-8'),
  45.                                  signature_origin.encode('utf-8'),
  46.                                  digestmod=hashlib.sha256).digest()
  47.         signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
  48.         # 进行hmac_sha256进行加密
  49.         authorization_origin = "api_key="%s", algorithm="%s", headers="%s", " \
  50.                                "signature="%s"" % (self.APIKey, "hmac-sha256",
  51.                                                      "host date request-line", signature_sha)
  52.         authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  53.         v={
  54.             "authorization": authorization,
  55.             "date": date,
  56.             "host": "ws-api.xfyun.cn"
  57.         }
  58.         # 将请求的鉴权参数组合为字典
  59.         url=url+'?'+urlencode(v)
  60.         # 拼接鉴权参数,生成url
  61.         return url
  62. def on_open_record(ws):
  63.     global bssb
  64.     # 收到websocket连接建立的处理
  65.     def run(*args):
  66.         global bssb
  67.       
  68.         # 在线音频处理并发送到讯飞
  69.         status=STATUS_FIRST_FRAME
  70.         # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
  71.         CHUNK = 520  # 定义数据流块
  72.         FORMAT = pyaudio.paInt16  # 16bit编码格式
  73.         CHANNELS = 1  # 单声道
  74.         RATE = 16000  # 16000采样频率
  75.         p=pyaudio.PyAudio()  # 录音
  76.         # 实例化pyaudio对象
  77.         stream = p.open(format=FORMAT,  # 音频流wav格式
  78.                         channels=CHANNELS,  # 单声道
  79.                         rate=RATE,  # 采样率16000
  80.                         input=True,
  81.                         frames_per_buffer=CHUNK)
  82.         # 创建音频流,使用这个对象去打开声卡,设置采样深度、通道数、采样率、输入和采样点缓存数量
  83.         print("---------------开始录音-----------------")
  84.         显示内容.config(text="开始录音---"+str(status))
  85.         # 开始录音
  86.         global text
  87.         
  88.         for i in range(0,int(RATE/CHUNK*60)):
  89.             # 录制特定时间的音频
  90.             buf=stream.read(CHUNK)
  91.             # 读出声卡缓冲区的音频数据
  92.             if not buf:
  93.                 status=STATUS_LAST_FRAME
  94.             if status==STATUS_FIRST_FRAME:
  95.                 # 首帧处理
  96.                 d = {"common": wsParam_record.CommonArgs,
  97.                      "business": wsParam_record.BusinessArgs,
  98.                      "data": {"status": 0, "format": "audio/L16;rate=16000",
  99.                               "audio": str(base64.b64encode(buf), 'utf-8'),
  100.                               "encoding": "raw"}}
  101.                 d = json.dumps(d)
  102.                 # 将拼接的字符串d数据结构转换为json
  103.                 if bssb==0:
  104.                   ws.send(d)
  105.                 status=STATUS_CONTINUE_FRAME
  106.             elif status==STATUS_CONTINUE_FRAME:
  107.                 # 中间帧处理
  108.                 d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
  109.                               "audio": str(base64.b64encode(buf), 'utf-8'),
  110.                               "encoding": "raw"}}
  111.                 if bssb==0:
  112.                    ws.send(json.dumps(d))
  113.                   
  114.             elif status==STATUS_LAST_FRAME:
  115.                 # 最后一帧处理
  116.                 d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
  117.                               "audio": str(base64.b64encode(buf), 'utf-8'),
  118.                               "encoding": "raw"}}
  119.                
  120.                 ws.send(json.dumps(d))
  121.                 time.sleep(1)
  122.                 break
  123.     thread.start_new_thread(run,())
  124. def on_message_record(ws,message):
  125.     global recording_results,bssb
  126.     # 收到websocket消息的正常处理
  127.     try:
  128.         print(json.loads(message))
  129.         code = json.loads(message)["code"]
  130.         # 解码返回的message的json数据中的code
  131.         sid = json.loads(message)["sid"]
  132.         if code != 0:
  133.             errMsg = json.loads(message)["message"]
  134.             # 解码message中错误信息
  135.             print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
  136.         else:
  137.             data = json.loads(message)["data"]["result"]["ws"]
  138.             status = json.loads(message)["data"]["status"]
  139.             # 解码message中ws数据
  140.             result = ""
  141.             for i in data:
  142.                 for w in i["cw"]:
  143.                     result += w["w"]
  144.             
  145.             if status==2 :
  146.                 if result == '。' or result == '.。' or result == ' .。' or result == ' 。':
  147.                     recording_results=recording_results+'。'
  148.                 elif result == '?':
  149.                     recording_results=recording_results+'?'
  150.                 elif result == '!':
  151.                     recording_results=recording_results+'!'
  152.                 print("end")
  153.                 bssb=1
  154.                 ws.close()
  155.                
  156.             else:
  157.                 # t.insert(END, result)  # 把上边的标点插入到result的最后
  158.                 print("翻译结果: %s" % (result))
  159.                 显示内容.config(text=result)
  160.                
  161.                 recording_results=result
  162.     except Exception as e:
  163.         # 异常处理,参数异常
  164.         print("receive msg,but parse exception:", e)
  165. def on_error_record(ws,error):
  166.     # 收到websocket后错误的处理
  167.     print("### error ### : ",error)
  168.    
  169.     # 重新启动监听
  170. def on_close_record(ws,a,b):
  171.     # 收到websocket关闭的处理
  172.     print("close ok")
  173. def run_record():
  174.     global wsParam_record
  175.     wsParam_record=Ws_Param_record(APPID='d41ee990',
  176.                     APISecret='NmFkZjdhM2U3ZmU5ZmNlN2FjOGJmYjY4',
  177.                     APIKey='d551eafc91a585a49beabe7ad3644ca4')
  178.     # 初始化讯飞接口编码
  179.     websocket.enableTrace(False)
  180.     # True表示默认在控制台打印连接和信息发送接收情况
  181.     wsUrl=wsParam_record.create_url()
  182.     # 生成讯飞的url进行连接
  183.     ws_record=websocket.WebSocketApp(wsUrl, on_message=on_message_record, on_error=on_error_record, on_close=on_close_record)
  184.     ws_record.on_open=on_open_record  # 进行websocket连接
  185.     ws_record.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_timeout=2)
  186.     # 无限循环,只要这个websocket连接未断开,这个循环就会一直进行下去
  187. if __name__ == '__main__':
  188.     bssb=0
  189.     run_record()
  190.     while True:
  191.         pass
复制代码
【星火认知大模型】
  1. import _thread as thread
  2. import base64
  3. import datetime
  4. import hashlib
  5. import hmac
  6. import json
  7. from urllib.parse import urlparse
  8. import ssl
  9. from datetime import datetime
  10. from time import mktime
  11. from urllib.parse import urlencode
  12. from wsgiref.handlers import format_date_time
  13. import websocket,math
  14. from unihiker import GUI
  15. u_gui=GUI()
  16. #讯飞星火大语言模型对话
  17. class Ws_Param(object):
  18.     # 初始化
  19.     def __init__(self, APPID, APIKey, APISecret, gpt_url):
  20.         self.APPID = APPID
  21.         self.APIKey = APIKey
  22.         self.APISecret = APISecret
  23.         self.host = urlparse(gpt_url).netloc
  24.         self.path = urlparse(gpt_url).path
  25.         self.gpt_url = gpt_url
  26.     # 生成url
  27.     def create_url(self):
  28.         # 生成RFC1123格式的时间戳
  29.         now = datetime.now()
  30.         date = format_date_time(mktime(now.timetuple()))
  31.         # 拼接字符串
  32.         signature_origin = "host: " + self.host + "\n"
  33.         signature_origin += "date: " + date + "\n"
  34.         signature_origin += "GET " + self.path + " HTTP/1.1"
  35.         # 进行hmac-sha256进行加密
  36.         signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
  37.                                  digestmod=hashlib.sha256).digest()
  38.         signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
  39.         authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
  40.         authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  41.         # 将请求的鉴权参数组合为字典
  42.         v = {
  43.             "authorization": authorization,
  44.             "date": date,
  45.             "host": self.host
  46.         }
  47.         # 拼接鉴权参数,生成url
  48.         url = self.gpt_url + '?' + urlencode(v)
  49.         # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
  50.         return url
  51. # 收到websocket错误的处理
  52. def on_error(ws, error):
  53.     print("### error:", error)
  54. # 收到websocket关闭的处理
  55. def on_close(ws,a,b):
  56.     print("### closed ###")
  57. # 收到websocket连接建立的处理
  58. def on_open(ws):
  59.     thread.start_new_thread(run, (ws,))
  60. def run(ws, *args):
  61.     data = json.dumps(gen_params(appid=ws.appid, question=ws.question))
  62.     ws.send(data)
  63. # 收到websocket消息的处理
  64. def on_message(ws, message):
  65.     global content
  66.     #print(message)
  67.     data = json.loads(message)
  68.     code = data['header']['code']
  69.     if code != 0:
  70.         print(f'请求错误: {code}, {data}')
  71.         ws.close()
  72.     else:
  73.         choices = data["payload"]["choices"]
  74.         status = choices["status"]
  75.         content =content+ choices["text"][0]["content"]
  76.         print(content, end='')
  77.         
  78.         if status == 2:
  79.             for i in range(0,math.ceil(len(content)/11)):
  80.              i=u_gui.draw_text(text=content[i*11:i*11+11],x=0,y=i*20,font_size=15, color="#0000FF")
  81.             ws.close()
  82. def gen_params(appid, question):
  83.     """
  84.     通过appid和用户的提问来生成请参数
  85.     """
  86.     data = {
  87.         "header": {
  88.             "app_id": appid,
  89.             "uid": "1234"
  90.         },
  91.         "parameter": {
  92.             "chat": {
  93.                 "domain": "general",
  94.                 "random_threshold": 0.5,
  95.                 "max_tokens": 2048,
  96.                 "auditing": "default"
  97.             }
  98.         },
  99.         "payload": {
  100.             "message": {
  101.                 "text": [
  102.                     {"role": "user", "content": question}
  103.                 ]
  104.             }
  105.         }
  106.     }
  107.     return data
  108. def run_chat(appid, api_key, api_secret, gpt_url, question):
  109.     wsParam = Ws_Param(appid, api_key, api_secret, gpt_url)
  110.     websocket.enableTrace(False)
  111.     wsUrl = wsParam.create_url()
  112.     ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
  113.     ws.appid = appid
  114.     ws.question = question
  115.     ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
  116. """
  117.     1、配置好python、pip的环境变量
  118.     2、执行 pip install websocket 与 pip3 install websocket-client
  119.     3、去控制台https://console.xfyun.cn/services/cbm获取appid等信息填写即可
  120. """
  121. if __name__ == "__main__":
  122.     # 测试时候在此处正确填写相关信息即可运行
  123.     content=""
  124.     run_chat(appid="d41ee990",
  125.          api_secret="NmFkZjdhM2U3ZmU5ZmNlN2FjOGJmYjY4",
  126.          api_key="d551eafc91a585a49beabe7ad3644ca4",
  127.          gpt_url="ws://spark-api.xf-yun.com/v1.1/chat",
  128.          question="请一篇关于中国男足世界杯夺冠的新闻稿,100字。")
  129.     while 1:
  130.         pass
复制代码
“请一篇关于中国男足世界杯夺冠的新闻稿,100字。”,对话测试结果:    中国男足在2022年世界杯决赛中以3比2战胜巴西,成功夺得世界杯冠军!这是中国足球历史上最辉煌的时刻,也是全国亿万球迷共同欢庆的时刻。经过多年的努力和拼搏,中国男足终于实现了自己的梦想,为国家争得了荣誉和尊严。中国男足在2022年世界杯决赛中以3比2战胜巴西,成功夺得世界杯冠军!这是中国足球历史上最辉煌的时刻,也是全国亿万球迷共同欢庆的时刻。经过多年的努力和拼搏,中国男足终于实现了自己的梦想,为国家争得了荣誉和尊严。这次胜利不仅仅是体育成就,更是国家实力和民族自信的体现。我们相信,通过这次胜利,中国足球将有更加美好的未来!
【语音合成】
  1. # -*- coding:utf-8 -*-
  2. #
  3. #   author: iflytek
  4. #
  5. #  本demo测试时运行的环境为:Windows + Python3.7
  6. #  本demo测试成功运行时所安装的第三方库及其版本如下:
  7. #   cffi==1.12.3
  8. #   gevent==1.4.0
  9. #   greenlet==0.4.15
  10. #   pycparser==2.19
  11. #   six==1.12.0
  12. #   websocket==0.2.1
  13. #   websocket-client==0.56.0
  14. #   合成小语种需要传输小语种文本、使用小语种发音人vcn、tte=unicode以及修改文本编码方式
  15. #  错误码链接:https://www.xfyun.cn/document/error-code (code返回错误码时必看)
  16. # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
  17. import websocket
  18. import datetime
  19. import hashlib
  20. import base64
  21. import hmac
  22. import json
  23. from urllib.parse import urlencode
  24. import time
  25. import ssl
  26. from wsgiref.handlers import format_date_time
  27. from datetime import datetime
  28. from time import mktime
  29. import _thread as thread
  30. import os
  31. import pyaudio
  32. STATUS_FIRST_FRAME = 0  # 第一帧的标识
  33. STATUS_CONTINUE_FRAME = 1  # 中间帧标识
  34. STATUS_LAST_FRAME = 2  # 最后一帧的标识
  35. #语音合成
  36. class Ws_Param_audio(object):
  37.     # 初始化
  38.     def __init__(self, APPID, APIKey, APISecret, Text):
  39.         self.APPID = APPID
  40.         self.APIKey = APIKey
  41.         self.APISecret = APISecret
  42.         self.Text = Text
  43.         # 公共参数(common)
  44.         self.CommonArgs = {"app_id": self.APPID}
  45.         # 业务参数(business),更多个性化参数可在官网查看
  46.         self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "xiaoyan", "tte": "utf8"}
  47.         self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")}
  48.         #使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"”
  49.         #self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")}
  50.     # 生成url
  51.     def create_url(self):
  52.         url = 'wss://tts-api.xfyun.cn/v2/tts'
  53.         # 生成RFC1123格式的时间戳
  54.         now = datetime.now()
  55.         date = format_date_time(mktime(now.timetuple()))
  56.         # 拼接字符串
  57.         signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
  58.         signature_origin += "date: " + date + "\n"
  59.         signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
  60.         # 进行hmac-sha256进行加密
  61.         signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
  62.                                  digestmod=hashlib.sha256).digest()
  63.         signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
  64.         authorization_origin = "api_key="%s", algorithm="%s", headers="%s", signature="%s"" % (
  65.             self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
  66.         authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  67.         # 将请求的鉴权参数组合为字典
  68.         v = {
  69.             "authorization": authorization,
  70.             "date": date,
  71.             "host": "ws-api.xfyun.cn"
  72.         }
  73.         # 拼接鉴权参数,生成url
  74.         url = url + '?' + urlencode(v)
  75.         # print("date: ",date)
  76.         # print("v: ",v)
  77.         # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
  78.         # print('websocket url :', url)
  79.         return url
  80. def on_message_audio(ws, message):
  81.    
  82.     try:
  83.         message =json.loads(message)
  84.         code = message["code"]
  85.         sid = message["sid"]
  86.         audio = message["data"]["audio"]
  87.         audio = base64.b64decode(audio)
  88.         status = message["data"]["status"]
  89.         #print(message)
  90.         
  91.         
  92.         if code != 0:
  93.             errMsg = message["message"]
  94.             print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
  95.         else:
  96.             with open('demo.pcm', 'ab') as f:
  97.                 f.write(audio)
  98.             print("OK")   
  99.         if status == 2:
  100.             global bs
  101.             bs=1
  102.             print("ws is closed")
  103.             ws.close()   
  104.     except Exception as e:
  105.         print("receive msg,but parse exception:", e)
  106. # 收到websocket错误的处理
  107. def on_error_audio(ws, error):
  108.     print("### error:", error)
  109. # 收到websocket关闭的处理
  110. def on_close_audio(ws,a,b):
  111.    
  112.     print("### closed ###")
  113.    
  114. # 收到websocket连接建立的处理
  115. def on_open_audio(ws):
  116.     def run(*args):
  117.         d = {"common": wsParam_audio.CommonArgs,
  118.              "business": wsParam_audio.BusinessArgs,
  119.              "data": wsParam_audio.Data,
  120.              }
  121.         d = json.dumps(d)
  122.         print("------>开始发送文本数据")
  123.         ws.send(d)
  124.         print("------>发送完成")
  125.         if os.path.exists('./demo.pcm'):
  126.             os.remove('./demo.pcm')
  127.     thread.start_new_thread(run, ())
  128. def run_audio(APPID,APISecret,APIKey,Text):
  129.     global wsParam_audio
  130.     wsParam_audio = Ws_Param_audio(APPID=APPID, APISecret=APISecret,
  131.                        APIKey=APIKey,
  132.                        Text=Text)
  133.     websocket.enableTrace(False)
  134.     wsUrl = wsParam_audio.create_url()
  135.     ws_audio = websocket.WebSocketApp(wsUrl, on_message=on_message_audio, on_error=on_error_audio, on_close=on_close_audio)
  136.     ws_audio.on_open = on_open_audio
  137.     ws_audio.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
  138.     p = pyaudio.PyAudio()
  139.     stream = p.open(format=p.get_format_from_width(2), channels=1, rate=16000, output=True)
  140. # 将 pcm 数据直接写入 PyAudio 的数据流
  141.     with open("demo.pcm", "rb") as f:
  142.         stream.write(f.read())
  143.     time.sleep(0.2)
  144.            
  145.     stream.stop_stream()
  146.     stream.close()
  147.     p.terminate()
  148. if __name__ == "__main__":
  149.     bs=1
  150.     # 测试时候在此处正确填写相关信息即可运行
  151.     appid="d41ee990"
  152.     api_secret="NmFkZjdhM2U3ZmU5ZmNlN2FjOGJmYjY4"
  153.     api_key="d551eafc91a585a49beabe7ad3644ca4"
  154.    
  155.     while 1:
  156.         if bs==1:
  157.             run_audio(APPID=appid,APISecret=api_secret,APIKey=api_key,Text="你是谁?你能做些什么?")
  158.             bs=0
  159.             
复制代码
【舵机测试】使用了“线程”来控制舵机,使舵机运行与语音合成同步进行。
行空板 星火认知大模型 桌面助手AI兔图6


行空板 星火认知大模型 桌面助手AI兔图2


【完整程序】
将 语音识别、认知大模型、语音合成、舵机控制四段程序进行整合。
  1. ''' 在线语音识别 '''
  2. import websocket
  3. import hashlib
  4. import base64
  5. import hmac
  6. import json
  7. from urllib.parse import urlencode
  8. import time
  9. import ssl
  10. from wsgiref.handlers import format_date_time
  11. from datetime import datetime
  12. from time import mktime
  13. import _thread as thread
  14. import pyaudio
  15. from unihiker import GUI
  16. from pinpong.extension.unihiker import *
  17. from pinpong.board import Board,Pin
  18. ''' 讯飞星火大语言模型对话 '''
  19. from urllib.parse import urlparse
  20. import math
  21. ''' 语音合成 '''
  22. import os
  23. '''控制舵机'''
  24. from pinpong.board import Servo
  25. Board().begin()
  26. pin1 = Pin(Pin.D21, Pin.OUT)
  27. pin2 = Pin(Pin.D22, Pin.OUT)
  28. servo1 = Servo(pin1)
  29. servo1.write_angle(90)
  30. servo2 = Servo(pin2)
  31. servo2.write_angle(45)
  32. # 事件回调函数
  33. def u_thread1_function():
  34.     while True:
  35.         servo1.write_angle(90)
  36.         servo2.write_angle(45)
  37.         time.sleep(1)
  38.         servo1.write_angle(45)
  39.         servo2.write_angle(90)
  40.         time.sleep(1)
  41. #录制语音,在线语音识别
  42. u_gui=GUI()
  43. 显示内容=u_gui.draw_text(text="行空板chat",x=5,y=100,font_size=30, color="#FF0000")
  44. recording_results=""   # 识别结果
  45. STATUS_FIRST_FRAME = 0  # 第一帧的标识
  46. STATUS_CONTINUE_FRAME = 1  # 中间帧标识
  47. STATUS_LAST_FRAME = 2  # 最后一帧的标识
  48. class Ws_Param_record(object):
  49.     # 初始化接口对象
  50.     def __init__(self,APPID,APIKey,APISecret):
  51.         # 初始化讯飞接口的ID,Key,Secret
  52.         self.APPID=APPID
  53.         self.APIKey=APIKey
  54.         self.APISecret=APISecret
  55.         # 公共参数(common)
  56.         self.CommonArgs={"app_id":self.APPID}
  57.         # 业务参数(business)
  58.         self.BusinessArgs={"domain":"iat","language":"zh_cn",
  59.                            "accent":"mandarin","vinfo":1,"vad_eos":1000}
  60.     def create_url(self):
  61.         # 生成url
  62.         url='wss://ws-api.xfyun.cn/v2/iat'
  63.         now=datetime.now()
  64.         date=format_date_time(mktime(now.timetuple()))
  65.         # 生成RFC1123格式的时间戳
  66.         signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
  67.         signature_origin += "date: " + date + "\n"
  68.         signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
  69.         # 拼接字符串
  70.         signature_sha = hmac.new(self.APISecret.encode('utf-8'),
  71.                                  signature_origin.encode('utf-8'),
  72.                                  digestmod=hashlib.sha256).digest()
  73.         signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
  74.         # 进行hmac_sha256进行加密
  75.         authorization_origin = "api_key="%s", algorithm="%s", headers="%s", " \
  76.                                "signature="%s"" % (self.APIKey, "hmac-sha256",
  77.                                                      "host date request-line", signature_sha)
  78.         authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  79.         v={
  80.             "authorization": authorization,
  81.             "date": date,
  82.             "host": "ws-api.xfyun.cn"
  83.         }
  84.         # 将请求的鉴权参数组合为字典
  85.         url=url+'?'+urlencode(v)
  86.         # 拼接鉴权参数,生成url
  87.         return url
  88. def on_open_record(ws):
  89.     global bssb
  90.     # 收到websocket连接建立的处理
  91.     def run(*args):
  92.         global bssb
  93.       
  94.         # 在线音频处理并发送到讯飞
  95.         status=STATUS_FIRST_FRAME
  96.         # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
  97.         CHUNK = 520  # 定义数据流块
  98.         FORMAT = pyaudio.paInt16  # 16bit编码格式
  99.         CHANNELS = 1  # 单声道
  100.         RATE = 16000  # 16000采样频率
  101.         p=pyaudio.PyAudio()  # 录音
  102.         # 实例化pyaudio对象
  103.         stream = p.open(format=FORMAT,  # 音频流wav格式
  104.                         channels=CHANNELS,  # 单声道
  105.                         rate=RATE,  # 采样率16000
  106.                         input=True,
  107.                         frames_per_buffer=CHUNK)
  108.         # 创建音频流,使用这个对象去打开声卡,设置采样深度、通道数、采样率、输入和采样点缓存数量
  109.         print("---------------开始录音-----------------")
  110.         buzzer.play(buzzer.JUMP_UP,buzzer.Once)
  111.         u_gui.clear()
  112.         显示内容=u_gui.draw_text(text="开始录音……",x=5,y=100,font_size=30, color="#0000FF")
  113.         # 开始录音
  114.         bssb=0
  115.         global text
  116.         
  117.         for i in range(0,int(RATE/CHUNK*60)):
  118.             # 录制特定时间的音频
  119.             buf=stream.read(CHUNK)
  120.             # 读出声卡缓冲区的音频数据
  121.             if not buf:
  122.                 status=STATUS_LAST_FRAME
  123.             if status==STATUS_FIRST_FRAME:
  124.                 # 首帧处理
  125.                 d = {"common": wsParam_record.CommonArgs,
  126.                      "business": wsParam_record.BusinessArgs,
  127.                      "data": {"status": 0, "format": "audio/L16;rate=16000",
  128.                               "audio": str(base64.b64encode(buf), 'utf-8'),
  129.                               "encoding": "raw"}}
  130.                 d = json.dumps(d)
  131.                 # 将拼接的字符串d数据结构转换为json
  132.                 if bssb==0:
  133.                   ws.send(d)
  134.                 status=STATUS_CONTINUE_FRAME
  135.             elif status==STATUS_CONTINUE_FRAME:
  136.                 # 中间帧处理
  137.                 d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
  138.                               "audio": str(base64.b64encode(buf), 'utf-8'),
  139.                               "encoding": "raw"}}
  140.                 if bssb==0:
  141.                    ws.send(json.dumps(d))
  142.                
  143.             elif status==STATUS_LAST_FRAME:
  144.                 # 最后一帧处理
  145.                 d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
  146.                               "audio": str(base64.b64encode(buf), 'utf-8'),
  147.                               "encoding": "raw"}}
  148.                
  149.                 ws.send(json.dumps(d))
  150.                 time.sleep(1)
  151.                 break
  152.     thread.start_new_thread(run,())
  153. def on_message_record(ws,message):
  154.     global recording_results,bssb
  155.     # 收到websocket消息的正常处理
  156.     try:
  157.         print(json.loads(message))
  158.         code = json.loads(message)["code"]
  159.         # 解码返回的message的json数据中的code
  160.         sid = json.loads(message)["sid"]
  161.         if code != 0:
  162.             errMsg = json.loads(message)["message"]
  163.             # 解码message中错误信息
  164.             print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
  165.         else:
  166.             data = json.loads(message)["data"]["result"]["ws"]
  167.             status = json.loads(message)["data"]["status"]
  168.             # 解码message中ws数据
  169.             result = ""
  170.             for i in data:
  171.                 for w in i["cw"]:
  172.                     result += w["w"]
  173.             
  174.             if status==2 :
  175.                 if result == '。' or result == '.。' or result == ' .。' or result == ' 。':
  176.                     recording_results=recording_results+'。'
  177.                 elif result == '?':
  178.                     recording_results=recording_results+'?'
  179.                 elif result == '!':
  180.                     recording_results=recording_results+'!'
  181.                
  182.                 u_gui.clear()
  183.                 for i in range(0,math.ceil(len(content)/8)):
  184.                     i=u_gui.draw_text(text=content[i*8:i*8+8],x=0,y=i*30,font_size=20, color="#0000FF")
  185.                 bssb=1
  186.                 ws.close()
  187.                
  188.             else:
  189.                 # t.insert(END, result)  # 把上边的标点插入到result的最后
  190.                 print("翻译结果: %s" % (result))
  191.                
  192.                 #显示内容.config(text=result)
  193.                
  194.                 recording_results=result
  195.     except Exception as e:
  196.         # 异常处理,参数异常
  197.         print("receive msg,but parse exception:", e)
  198. def on_error_record(ws,error):
  199.     # 收到websocket后错误的处理
  200.     print("### error ### : ",error)
  201.    
  202.     # 重新启动监听
  203. def on_close_record(ws,a,b):
  204.     # 收到websocket关闭的处理
  205.     print("close ok")
  206. def run_record(APPID,APISecret,APIKey):
  207.     global wsParam_record
  208.     wsParam_record=Ws_Param_record(APPID=APPID,
  209.                     APISecret=APISecret,
  210.                     APIKey=APIKey)
  211.     # 初始化讯飞接口编码
  212.     websocket.enableTrace(False)
  213.     # True表示默认在控制台打印连接和信息发送接收情况
  214.     wsUrl=wsParam_record.create_url()
  215.     # 生成讯飞的url进行连接
  216.     ws_record=websocket.WebSocketApp(wsUrl, on_message=on_message_record, on_error=on_error_record, on_close=on_close_record)
  217.     ws_record.on_open=on_open_record  # 进行websocket连接
  218.     ws_record.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_timeout=2)
  219.     # 无限循环,只要这个websocket连接未断开,这个循环就会一直进行下去
  220. #讯飞星火大语言模型对话
  221. class Ws_Param(object):
  222.     # 初始化
  223.     def __init__(self, APPID, APIKey, APISecret, gpt_url):
  224.         self.APPID = APPID
  225.         self.APIKey = APIKey
  226.         self.APISecret = APISecret
  227.         self.host = urlparse(gpt_url).netloc
  228.         self.path = urlparse(gpt_url).path
  229.         self.gpt_url = gpt_url
  230.     # 生成url
  231.     def create_url(self):
  232.         # 生成RFC1123格式的时间戳
  233.         now = datetime.now()
  234.         date = format_date_time(mktime(now.timetuple()))
  235.         # 拼接字符串
  236.         signature_origin = "host: " + self.host + "\n"
  237.         signature_origin += "date: " + date + "\n"
  238.         signature_origin += "GET " + self.path + " HTTP/1.1"
  239.         # 进行hmac-sha256进行加密
  240.         signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
  241.                                  digestmod=hashlib.sha256).digest()
  242.         signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
  243.         authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
  244.         authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  245.         # 将请求的鉴权参数组合为字典
  246.         v = {
  247.             "authorization": authorization,
  248.             "date": date,
  249.             "host": self.host
  250.         }
  251.         # 拼接鉴权参数,生成url
  252.         url = self.gpt_url + '?' + urlencode(v)
  253.         # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
  254.         return url
  255. # 收到websocket错误的处理
  256. def on_error(ws, error):
  257.     print("### error:", error)
  258. # 收到websocket关闭的处理
  259. def on_close(ws,a,b):
  260.     print("### closed ###")
  261. # 收到websocket连接建立的处理
  262. def on_open(ws):
  263.     thread.start_new_thread(run, (ws,))
  264. def run(ws, *args):
  265.     data = json.dumps(gen_params(appid=ws.appid, question=ws.question))
  266.     ws.send(data)
  267. # 收到websocket消息的处理
  268. def on_message(ws, message):
  269.     global content
  270.     #print(message)
  271.     data = json.loads(message)
  272.     code = data['header']['code']
  273.     if code != 0:
  274.         print(f'请求错误: {code}, {data}')
  275.         ws.close()
  276.     else:
  277.         choices = data["payload"]["choices"]
  278.         status = choices["status"]
  279.         content =content+ choices["text"][0]["content"]
  280.         print(content, end='')
  281.         
  282.         if status == 2:
  283.             for i in range(0,math.ceil(len(content)/11)):
  284.              i=u_gui.draw_text(text=content[i*11:i*11+11],x=0,y=i*20,font_size=15, color="#0000FF")
  285.             ws.close()
  286. def gen_params(appid, question):
  287.     """
  288.     通过appid和用户的提问来生成请参数
  289.     """
  290.     data = {
  291.         "header": {
  292.             "app_id": appid,
  293.             "uid": "1234"
  294.         },
  295.         "parameter": {
  296.             "chat": {
  297.                 "domain": "general",
  298.                 "random_threshold": 0.5,
  299.                 "max_tokens": 2048,
  300.                 "auditing": "default"
  301.             }
  302.         },
  303.         "payload": {
  304.             "message": {
  305.                 "text": [
  306.                     {"role": "user", "content": question}
  307.                 ]
  308.             }
  309.         }
  310.     }
  311.     return data
  312. def run_chat(appid, api_key, api_secret, gpt_url, question):
  313.     wsParam = Ws_Param(appid, api_key, api_secret, gpt_url)
  314.     websocket.enableTrace(False)
  315.     wsUrl = wsParam.create_url()
  316.     ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
  317.     ws.appid = appid
  318.     ws.question = question
  319.     ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
  320. #语音合成
  321. class Ws_Param_audio(object):
  322.     # 初始化
  323.     def __init__(self, APPID, APIKey, APISecret, Text):
  324.         self.APPID = APPID
  325.         self.APIKey = APIKey
  326.         self.APISecret = APISecret
  327.         self.Text = Text
  328.         # 公共参数(common)
  329.         self.CommonArgs = {"app_id": self.APPID}
  330.         # 业务参数(business),更多个性化参数可在官网查看
  331.         self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "xiaoyan", "tte": "utf8"}
  332.         self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")}
  333.         #使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"”
  334.         #self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")}
  335.     # 生成url
  336.     def create_url(self):
  337.         url = 'wss://tts-api.xfyun.cn/v2/tts'
  338.         # 生成RFC1123格式的时间戳
  339.         now = datetime.now()
  340.         date = format_date_time(mktime(now.timetuple()))
  341.         # 拼接字符串
  342.         signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
  343.         signature_origin += "date: " + date + "\n"
  344.         signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
  345.         # 进行hmac-sha256进行加密
  346.         signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
  347.                                  digestmod=hashlib.sha256).digest()
  348.         signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
  349.         authorization_origin = "api_key="%s", algorithm="%s", headers="%s", signature="%s"" % (
  350.             self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
  351.         authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  352.         # 将请求的鉴权参数组合为字典
  353.         v = {
  354.             "authorization": authorization,
  355.             "date": date,
  356.             "host": "ws-api.xfyun.cn"
  357.         }
  358.         # 拼接鉴权参数,生成url
  359.         url = url + '?' + urlencode(v)
  360.         # print("date: ",date)
  361.         # print("v: ",v)
  362.         # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
  363.         # print('websocket url :', url)
  364.         return url
  365. def on_message_audio(ws, message):
  366.    
  367.     try:
  368.         message =json.loads(message)
  369.         code = message["code"]
  370.         sid = message["sid"]
  371.         audio = message["data"]["audio"]
  372.         audio = base64.b64decode(audio)
  373.         status = message["data"]["status"]
  374.         #print(message)
  375.         
  376.         
  377.         if code != 0:
  378.             errMsg = message["message"]
  379.             print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
  380.         else:
  381.             with open('demo.pcm', 'ab') as f:
  382.                 f.write(audio)
  383.             print("OK")   
  384.         if status == 2:
  385.             global bs
  386.             bs=1
  387.             print("ws is closed")
  388.             ws.close()   
  389.     except Exception as e:
  390.         print("receive msg,but parse exception:", e)
  391. # 收到websocket错误的处理
  392. def on_error_audio(ws, error):
  393.     print("### error:", error)
  394. # 收到websocket关闭的处理
  395. def on_close_audio(ws,a,b):
  396.    
  397.     print("### closed ###")
  398.    
  399. # 收到websocket连接建立的处理
  400. def on_open_audio(ws):
  401.     def run(*args):
  402.         d = {"common": wsParam_audio.CommonArgs,
  403.              "business": wsParam_audio.BusinessArgs,
  404.              "data": wsParam_audio.Data,
  405.              }
  406.         d = json.dumps(d)
  407.         print("------>开始发送文本数据")
  408.         ws.send(d)
  409.         print("------>发送完成")
  410.         if os.path.exists('./demo.pcm'):
  411.             os.remove('./demo.pcm')
  412.     thread.start_new_thread(run, ())
  413. def run_audio(APPID,APISecret,APIKey,Text):
  414.     global wsParam_audio
  415.     wsParam_audio = Ws_Param_audio(APPID=APPID, APISecret=APISecret,
  416.                        APIKey=APIKey,
  417.                        Text=Text)
  418.     websocket.enableTrace(False)
  419.     wsUrl = wsParam_audio.create_url()
  420.     ws_audio = websocket.WebSocketApp(wsUrl, on_message=on_message_audio, on_error=on_error_audio, on_close=on_close_audio)
  421.     ws_audio.on_open = on_open_audio
  422.     ws_audio.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
  423.     p = pyaudio.PyAudio()
  424.     stream = p.open(format=p.get_format_from_width(2), channels=1, rate=16000, output=True)
  425. # 将 pcm 数据直接写入 PyAudio 的数据流
  426.     with open("demo.pcm", "rb") as f:
  427.         stream.write(f.read())
  428.     time.sleep(0.2)
  429.            
  430.     stream.stop_stream()
  431.     stream.close()
  432.     p.terminate()
  433. if __name__ == '__main__':
  434.     bssb=0
  435.     recording_results=""#语音识别结果
  436.     content=""#对话反馈结果
  437.     appid="d41ee990"
  438.     api_secret="NmFkZjdhM2U3ZmU5ZmNlN2FjOGJmYjY4"
  439.     api_key="d551eafc91a585a49beabe7ad3644ca4"
  440.    
  441.     while True:
  442.         recording_results=""
  443.         #在线语音识别
  444.         run_record(appid,api_secret,api_key)
  445.         content=""
  446.         u_gui.clear()
  447.         #讯飞星火大语言模型对话反馈
  448.         if recording_results!="":
  449.           run_chat(appid=appid,api_secret=api_secret,api_key=api_key,
  450.              gpt_url="ws://spark-api.xf-yun.com/v1.1/chat",
  451.              question=recording_results)
  452.         #语音合成
  453.         if content!="":
  454.           thread1=u_gui.start_thread(u_thread1_function)
  455.           run_audio(APPID=appid,APISecret=api_secret,APIKey=api_key,Text=content)
  456.           u_gui.stop_thread(thread1)
  457.           u_gui.clear()
复制代码
行空板 星火认知大模型 桌面助手AI兔图1

行空板 星火认知大模型 桌面助手AI兔图3


【演示视频】





木子呢  管理员

发表于 2023-8-7 11:26:40

这个兔耳朵,哈哈哈哈哈
回复

使用道具 举报

rzegkly  版主

发表于 2023-8-10 22:32:46

木子呢 发表于 2023-8-7 11:26
这个兔耳朵,哈哈哈哈哈

漂亮
回复

使用道具 举报

断墨寻径  学徒

发表于 2024-2-2 22:10:37

大神,在语音识别那里,下面代码报错了,错误信息是:“### error ### :  [Errno -3] Temporary failure in name resolution”。请问下怎么解决呀,谢谢。
def on_error_record(ws,error):
    # 收到websocket后错误的处理
    print("### error ### : ",error)
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

为本项目制作心愿单
购买心愿单
心愿单 编辑
[[wsData.name]]

硬件清单

  • [[d.name]]
btnicon
我也要做!
点击进入购买页面
上海智位机器人股份有限公司 沪ICP备09038501号-4

© 2013-2024 Comsenz Inc. Powered by Discuz! X3.4 Licensed

mail