云天 发表于 2023-8-6 19:49:49

行空板 星火认知大模型 桌面助手AI兔

本帖最后由 云天 于 2023-8-6 19:57 编辑

【项目背景】
7月17日,我带学生到浙江诸暨参加第二十四届全国学生信息素养提升实践活动,在科技之夜,我们聆听了科大讯飞对星火认知大模型的介绍。讯飞星火认知大模型是以中文为核心的新一代认知智能大模型,拥有跨领域的知识和语言理解能力,能够基于自然对话方式理解与执行任务。
【项目设计】
硬件使用行空板,使用讯飞语音识别,将识别结果送给讯飞星火认知大模型,反馈结果用讯飞语音合成进行语音输出。
【项目硬件】

因行空板无无音频输出,使用即插即用的免驱USB声卡,实现音频外置播放功能。
【注册申请】
讯飞开放平台注册,创建应用,申请星火认识大模型试用。获取APPID、APISecret、APIKey。

【语音识别】

''' 在线语音识别 '''
import websocket
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
import pyaudio

from unihiker import GUI


u_gui=GUI()
显示内容=u_gui.draw_text(text="行空板chat",x=0,y=0,font_size=20, color="#0000FF")
recording_results=""   # 识别结果
STATUS_FIRST_FRAME = 0# 第一帧的标识
STATUS_CONTINUE_FRAME = 1# 中间帧标识
STATUS_LAST_FRAME = 2# 最后一帧的标识

class Ws_Param_record(object):
    # 初始化接口对象
    def __init__(self,APPID,APIKey,APISecret):
      # 初始化讯飞接口的ID,Key,Secret
      self.APPID=APPID
      self.APIKey=APIKey
      self.APISecret=APISecret
      # 公共参数(common)
      self.CommonArgs={"app_id":self.APPID}
      # 业务参数(business)
      self.BusinessArgs={"domain":"iat","language":"zh_cn",
                           "accent":"mandarin","vinfo":1,"vad_eos":1000}

    def create_url(self):
      # 生成url
      url='wss://ws-api.xfyun.cn/v2/iat'
      now=datetime.now()
      date=format_date_time(mktime(now.timetuple()))
      # 生成RFC1123格式的时间戳
      signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
      signature_origin += "date: " + date + "\n"
      signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
      # 拼接字符串
      signature_sha = hmac.new(self.APISecret.encode('utf-8'),
                                 signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()
      signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
      # 进行hmac_sha256进行加密
      authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", " \
                               "signature=\"%s\"" % (self.APIKey, "hmac-sha256",
                                                   "host date request-line", signature_sha)
      authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
      v={
            "authorization": authorization,
            "date": date,
            "host": "ws-api.xfyun.cn"
      }
      # 将请求的鉴权参数组合为字典
      url=url+'?'+urlencode(v)
      # 拼接鉴权参数,生成url
      return url


def on_open_record(ws):
    global bssb
    # 收到websocket连接建立的处理
    def run(*args):
      global bssb
      
      # 在线音频处理并发送到讯飞
      status=STATUS_FIRST_FRAME
      # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
      CHUNK = 520# 定义数据流块
      FORMAT = pyaudio.paInt16# 16bit编码格式
      CHANNELS = 1# 单声道
      RATE = 16000# 16000采样频率
      p=pyaudio.PyAudio()# 录音
      # 实例化pyaudio对象
      stream = p.open(format=FORMAT,# 音频流wav格式
                        channels=CHANNELS,# 单声道
                        rate=RATE,# 采样率16000
                        input=True,
                        frames_per_buffer=CHUNK)
      # 创建音频流,使用这个对象去打开声卡,设置采样深度、通道数、采样率、输入和采样点缓存数量
      print("---------------开始录音-----------------")
      显示内容.config(text="开始录音---"+str(status))
      # 开始录音
      global text
      
      for i in range(0,int(RATE/CHUNK*60)):
            # 录制特定时间的音频
            buf=stream.read(CHUNK)
            # 读出声卡缓冲区的音频数据
            if not buf:
                status=STATUS_LAST_FRAME
            if status==STATUS_FIRST_FRAME:
                # 首帧处理
                d = {"common": wsParam_record.CommonArgs,
                     "business": wsParam_record.BusinessArgs,
                     "data": {"status": 0, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                d = json.dumps(d)
                # 将拼接的字符串d数据结构转换为json
                if bssb==0:
                  ws.send(d)
                status=STATUS_CONTINUE_FRAME
            elif status==STATUS_CONTINUE_FRAME:
                # 中间帧处理
                d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                if bssb==0:
                   ws.send(json.dumps(d))
                  
            elif status==STATUS_LAST_FRAME:
                # 最后一帧处理
                d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
               
                ws.send(json.dumps(d))
                time.sleep(1)
                break
    thread.start_new_thread(run,())


def on_message_record(ws,message):
    global recording_results,bssb
    # 收到websocket消息的正常处理
    try:
      print(json.loads(message))
      code = json.loads(message)["code"]
      # 解码返回的message的json数据中的code
      sid = json.loads(message)["sid"]
      if code != 0:
            errMsg = json.loads(message)["message"]
            # 解码message中错误信息
            print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
      else:
            data = json.loads(message)["data"]["result"]["ws"]
            status = json.loads(message)["data"]["status"]
            # 解码message中ws数据
            result = ""
            for i in data:
                for w in i["cw"]:
                  result += w["w"]
            
            if status==2 :
                if result == '。' or result == '.。' or result == ' .。' or result == ' 。':
                  recording_results=recording_results+'。'
                elif result == '?':
                  recording_results=recording_results+'?'
                elif result == '!':
                  recording_results=recording_results+'!'
                print("end")
                bssb=1
                ws.close()
               
            else:
                # t.insert(END, result)# 把上边的标点插入到result的最后
                print("翻译结果: %s" % (result))
                显示内容.config(text=result)
               
                recording_results=result
    except Exception as e:
      # 异常处理,参数异常
      print("receive msg,but parse exception:", e)

def on_error_record(ws,error):
    # 收到websocket后错误的处理
    print("### error ### : ",error)
   
    # 重新启动监听

def on_close_record(ws,a,b):
    # 收到websocket关闭的处理
    print("close ok")

def run_record():
    global wsParam_record
    wsParam_record=Ws_Param_record(APPID='d41ee990',
                  APISecret='NmFkZjdhM2U3ZmU5ZmNlN2FjOGJmYjY4',
                  APIKey='d551eafc91a585a49beabe7ad3644ca4')
    # 初始化讯飞接口编码
    websocket.enableTrace(False)
    # True表示默认在控制台打印连接和信息发送接收情况
    wsUrl=wsParam_record.create_url()
    # 生成讯飞的url进行连接
    ws_record=websocket.WebSocketApp(wsUrl, on_message=on_message_record, on_error=on_error_record, on_close=on_close_record)
    ws_record.on_open=on_open_record# 进行websocket连接
    ws_record.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_timeout=2)
    # 无限循环,只要这个websocket连接未断开,这个循环就会一直进行下去

if __name__ == '__main__':
    bssb=0
    run_record()
    while True:
      pass
【星火认知大模型】
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time

import websocket,math
from unihiker import GUI


u_gui=GUI()
#讯飞星火大语言模型对话
class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, gpt_url):
      self.APPID = APPID
      self.APIKey = APIKey
      self.APISecret = APISecret
      self.host = urlparse(gpt_url).netloc
      self.path = urlparse(gpt_url).path
      self.gpt_url = gpt_url

    # 生成url
    def create_url(self):
      # 生成RFC1123格式的时间戳
      now = datetime.now()
      date = format_date_time(mktime(now.timetuple()))

      # 拼接字符串
      signature_origin = "host: " + self.host + "\n"
      signature_origin += "date: " + date + "\n"
      signature_origin += "GET " + self.path + " HTTP/1.1"

      # 进行hmac-sha256进行加密
      signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

      signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

      authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

      authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

      # 将请求的鉴权参数组合为字典
      v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
      }
      # 拼接鉴权参数,生成url
      url = self.gpt_url + '?' + urlencode(v)
      # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
      return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws,a,b):
    print("### closed ###")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, question=ws.question))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    global content
    #print(message)
    data = json.loads(message)
    code = data['header']['code']
    if code != 0:
      print(f'请求错误: {code}, {data}')
      ws.close()
    else:
      choices = data["payload"]["choices"]
      status = choices["status"]
      content =content+ choices["text"]["content"]
      print(content, end='')
      
      if status == 2:
            for i in range(0,math.ceil(len(content)/11)):
             i=u_gui.draw_text(text=content,x=0,y=i*20,font_size=15, color="#0000FF")
            ws.close()


def gen_params(appid, question):
    """
    通过appid和用户的提问来生成请参数
    """
    data = {
      "header": {
            "app_id": appid,
            "uid": "1234"
      },
      "parameter": {
            "chat": {
                "domain": "general",
                "random_threshold": 0.5,
                "max_tokens": 2048,
                "auditing": "default"
            }
      },
      "payload": {
            "message": {
                "text": [
                  {"role": "user", "content": question}
                ]
            }
      }
    }
    return data


def run_chat(appid, api_key, api_secret, gpt_url, question):
    wsParam = Ws_Param(appid, api_key, api_secret, gpt_url)
    websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.question = question
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})



"""
    1、配置好python、pip的环境变量
    2、执行 pip install websocket 与 pip3 install websocket-client
    3、去控制台https://console.xfyun.cn/services/cbm获取appid等信息填写即可
"""
if __name__ == "__main__":
    # 测试时候在此处正确填写相关信息即可运行
    content=""
    run_chat(appid="d41ee990",
         api_secret="NmFkZjdhM2U3ZmU5ZmNlN2FjOGJmYjY4",
         api_key="d551eafc91a585a49beabe7ad3644ca4",
         gpt_url="ws://spark-api.xf-yun.com/v1.1/chat",
         question="请一篇关于中国男足世界杯夺冠的新闻稿,100字。")
    while 1:
      pass
“请一篇关于中国男足世界杯夺冠的新闻稿,100字。”,对话测试结果:    中国男足在2022年世界杯决赛中以3比2战胜巴西,成功夺得世界杯冠军!这是中国足球历史上最辉煌的时刻,也是全国亿万球迷共同欢庆的时刻。经过多年的努力和拼搏,中国男足终于实现了自己的梦想,为国家争得了荣誉和尊严。中国男足在2022年世界杯决赛中以3比2战胜巴西,成功夺得世界杯冠军!这是中国足球历史上最辉煌的时刻,也是全国亿万球迷共同欢庆的时刻。经过多年的努力和拼搏,中国男足终于实现了自己的梦想,为国家争得了荣誉和尊严。这次胜利不仅仅是体育成就,更是国家实力和民族自信的体现。我们相信,通过这次胜利,中国足球将有更加美好的未来!
【语音合成】

# -*- coding:utf-8 -*-
#
#   author: iflytek
#
#本demo测试时运行的环境为:Windows + Python3.7
#本demo测试成功运行时所安装的第三方库及其版本如下:
#   cffi==1.12.3
#   gevent==1.4.0
#   greenlet==0.4.15
#   pycparser==2.19
#   six==1.12.0
#   websocket==0.2.1
#   websocket-client==0.56.0
#   合成小语种需要传输小语种文本、使用小语种发音人vcn、tte=unicode以及修改文本编码方式
#错误码链接:https://www.xfyun.cn/document/error-code (code返回错误码时必看)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
import websocket
import datetime
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
import os
import pyaudio




STATUS_FIRST_FRAME = 0# 第一帧的标识
STATUS_CONTINUE_FRAME = 1# 中间帧标识
STATUS_LAST_FRAME = 2# 最后一帧的标识

#语音合成
class Ws_Param_audio(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Text):
      self.APPID = APPID
      self.APIKey = APIKey
      self.APISecret = APISecret
      self.Text = Text

      # 公共参数(common)
      self.CommonArgs = {"app_id": self.APPID}
      # 业务参数(business),更多个性化参数可在官网查看
      self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "xiaoyan", "tte": "utf8"}
      self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")}
      #使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"”
      #self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")}

    # 生成url
    def create_url(self):
      url = 'wss://tts-api.xfyun.cn/v2/tts'
      # 生成RFC1123格式的时间戳
      now = datetime.now()
      date = format_date_time(mktime(now.timetuple()))

      # 拼接字符串
      signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
      signature_origin += "date: " + date + "\n"
      signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
      # 进行hmac-sha256进行加密
      signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()
      signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')

      authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
            self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
      authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
      # 将请求的鉴权参数组合为字典
      v = {
            "authorization": authorization,
            "date": date,
            "host": "ws-api.xfyun.cn"
      }
      # 拼接鉴权参数,生成url
      url = url + '?' + urlencode(v)
      # print("date: ",date)
      # print("v: ",v)
      # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
      # print('websocket url :', url)
      return url

def on_message_audio(ws, message):
   
    try:
      message =json.loads(message)
      code = message["code"]
      sid = message["sid"]
      audio = message["data"]["audio"]
      audio = base64.b64decode(audio)
      status = message["data"]["status"]
      #print(message)
      
      
      if code != 0:
            errMsg = message["message"]
            print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
      else:

            with open('demo.pcm', 'ab') as f:
                f.write(audio)
            print("OK")   
      if status == 2:
            global bs
            bs=1
            print("ws is closed")
            ws.close()   

    except Exception as e:
      print("receive msg,but parse exception:", e)



# 收到websocket错误的处理
def on_error_audio(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close_audio(ws,a,b):
   
    print("### closed ###")
   

# 收到websocket连接建立的处理
def on_open_audio(ws):
    def run(*args):
      d = {"common": wsParam_audio.CommonArgs,
             "business": wsParam_audio.BusinessArgs,
             "data": wsParam_audio.Data,
             }
      d = json.dumps(d)
      print("------>开始发送文本数据")
      ws.send(d)
      print("------>发送完成")
      if os.path.exists('./demo.pcm'):
            os.remove('./demo.pcm')

    thread.start_new_thread(run, ())
def run_audio(APPID,APISecret,APIKey,Text):
    global wsParam_audio
    wsParam_audio = Ws_Param_audio(APPID=APPID, APISecret=APISecret,
                     APIKey=APIKey,
                     Text=Text)
    websocket.enableTrace(False)
    wsUrl = wsParam_audio.create_url()
    ws_audio = websocket.WebSocketApp(wsUrl, on_message=on_message_audio, on_error=on_error_audio, on_close=on_close_audio)
    ws_audio.on_open = on_open_audio
    ws_audio.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
    p = pyaudio.PyAudio()
    stream = p.open(format=p.get_format_from_width(2), channels=1, rate=16000, output=True)

# 将 pcm 数据直接写入 PyAudio 的数据流
    with open("demo.pcm", "rb") as f:
      stream.write(f.read())
    time.sleep(0.2)
         
    stream.stop_stream()
    stream.close()
    p.terminate()
if __name__ == "__main__":
    bs=1
    # 测试时候在此处正确填写相关信息即可运行
    appid="d41ee990"
    api_secret="NmFkZjdhM2U3ZmU5ZmNlN2FjOGJmYjY4"
    api_key="d551eafc91a585a49beabe7ad3644ca4"
   
    while 1:
      if bs==1:
            run_audio(APPID=appid,APISecret=api_secret,APIKey=api_key,Text="你是谁?你能做些什么?")
            bs=0
            

【舵机测试】使用了“线程”来控制舵机,使舵机运行与语音合成同步进行。




【完整程序】
将 语音识别、认知大模型、语音合成、舵机控制四段程序进行整合。

''' 在线语音识别 '''
import websocket
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
import pyaudio
from unihiker import GUI
from pinpong.extension.unihiker import *
from pinpong.board import Board,Pin
''' 讯飞星火大语言模型对话 '''
from urllib.parse import urlparse
import math
''' 语音合成 '''
import os
'''控制舵机'''

from pinpong.board import Servo
Board().begin()
pin1 = Pin(Pin.D21, Pin.OUT)
pin2 = Pin(Pin.D22, Pin.OUT)
servo1 = Servo(pin1)
servo1.write_angle(90)
servo2 = Servo(pin2)
servo2.write_angle(45)
# 事件回调函数
def u_thread1_function():
    while True:
      servo1.write_angle(90)
      servo2.write_angle(45)
      time.sleep(1)
      servo1.write_angle(45)
      servo2.write_angle(90)
      time.sleep(1)
#录制语音,在线语音识别
u_gui=GUI()


显示内容=u_gui.draw_text(text="行空板chat",x=5,y=100,font_size=30, color="#FF0000")
recording_results=""   # 识别结果
STATUS_FIRST_FRAME = 0# 第一帧的标识
STATUS_CONTINUE_FRAME = 1# 中间帧标识
STATUS_LAST_FRAME = 2# 最后一帧的标识

class Ws_Param_record(object):
    # 初始化接口对象
    def __init__(self,APPID,APIKey,APISecret):
      # 初始化讯飞接口的ID,Key,Secret
      self.APPID=APPID
      self.APIKey=APIKey
      self.APISecret=APISecret
      # 公共参数(common)
      self.CommonArgs={"app_id":self.APPID}
      # 业务参数(business)
      self.BusinessArgs={"domain":"iat","language":"zh_cn",
                           "accent":"mandarin","vinfo":1,"vad_eos":1000}

    def create_url(self):
      # 生成url
      url='wss://ws-api.xfyun.cn/v2/iat'
      now=datetime.now()
      date=format_date_time(mktime(now.timetuple()))
      # 生成RFC1123格式的时间戳
      signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
      signature_origin += "date: " + date + "\n"
      signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
      # 拼接字符串
      signature_sha = hmac.new(self.APISecret.encode('utf-8'),
                                 signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()
      signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
      # 进行hmac_sha256进行加密
      authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", " \
                               "signature=\"%s\"" % (self.APIKey, "hmac-sha256",
                                                   "host date request-line", signature_sha)
      authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
      v={
            "authorization": authorization,
            "date": date,
            "host": "ws-api.xfyun.cn"
      }
      # 将请求的鉴权参数组合为字典
      url=url+'?'+urlencode(v)
      # 拼接鉴权参数,生成url
      return url


def on_open_record(ws):
    global bssb
    # 收到websocket连接建立的处理
    def run(*args):
      global bssb
      
      # 在线音频处理并发送到讯飞
      status=STATUS_FIRST_FRAME
      # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
      CHUNK = 520# 定义数据流块
      FORMAT = pyaudio.paInt16# 16bit编码格式
      CHANNELS = 1# 单声道
      RATE = 16000# 16000采样频率
      p=pyaudio.PyAudio()# 录音
      # 实例化pyaudio对象
      stream = p.open(format=FORMAT,# 音频流wav格式
                        channels=CHANNELS,# 单声道
                        rate=RATE,# 采样率16000
                        input=True,
                        frames_per_buffer=CHUNK)
      # 创建音频流,使用这个对象去打开声卡,设置采样深度、通道数、采样率、输入和采样点缓存数量
      print("---------------开始录音-----------------")
      buzzer.play(buzzer.JUMP_UP,buzzer.Once)
      u_gui.clear()
      显示内容=u_gui.draw_text(text="开始录音……",x=5,y=100,font_size=30, color="#0000FF")
      # 开始录音
      bssb=0
      global text
      
      for i in range(0,int(RATE/CHUNK*60)):
            # 录制特定时间的音频
            buf=stream.read(CHUNK)
            # 读出声卡缓冲区的音频数据
            if not buf:
                status=STATUS_LAST_FRAME
            if status==STATUS_FIRST_FRAME:
                # 首帧处理
                d = {"common": wsParam_record.CommonArgs,
                     "business": wsParam_record.BusinessArgs,
                     "data": {"status": 0, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                d = json.dumps(d)
                # 将拼接的字符串d数据结构转换为json
                if bssb==0:
                  ws.send(d)
                status=STATUS_CONTINUE_FRAME
            elif status==STATUS_CONTINUE_FRAME:
                # 中间帧处理
                d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                if bssb==0:
                   ws.send(json.dumps(d))
               
            elif status==STATUS_LAST_FRAME:
                # 最后一帧处理
                d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
               
                ws.send(json.dumps(d))
                time.sleep(1)
                break
    thread.start_new_thread(run,())


def on_message_record(ws,message):
    global recording_results,bssb
    # 收到websocket消息的正常处理
    try:
      print(json.loads(message))
      code = json.loads(message)["code"]
      # 解码返回的message的json数据中的code
      sid = json.loads(message)["sid"]
      if code != 0:
            errMsg = json.loads(message)["message"]
            # 解码message中错误信息
            print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
      else:
            data = json.loads(message)["data"]["result"]["ws"]
            status = json.loads(message)["data"]["status"]
            # 解码message中ws数据
            result = ""
            for i in data:
                for w in i["cw"]:
                  result += w["w"]
            
            if status==2 :
                if result == '。' or result == '.。' or result == ' .。' or result == ' 。':
                  recording_results=recording_results+'。'
                elif result == '?':
                  recording_results=recording_results+'?'
                elif result == '!':
                  recording_results=recording_results+'!'
               
                u_gui.clear()
                for i in range(0,math.ceil(len(content)/8)):
                  i=u_gui.draw_text(text=content,x=0,y=i*30,font_size=20, color="#0000FF")
                bssb=1
                ws.close()
               
            else:
                # t.insert(END, result)# 把上边的标点插入到result的最后
                print("翻译结果: %s" % (result))
               
                #显示内容.config(text=result)
               
                recording_results=result
    except Exception as e:
      # 异常处理,参数异常
      print("receive msg,but parse exception:", e)

def on_error_record(ws,error):
    # 收到websocket后错误的处理
    print("### error ### : ",error)
   
    # 重新启动监听

def on_close_record(ws,a,b):
    # 收到websocket关闭的处理
    print("close ok")

def run_record(APPID,APISecret,APIKey):
    global wsParam_record
    wsParam_record=Ws_Param_record(APPID=APPID,
                  APISecret=APISecret,
                  APIKey=APIKey)
    # 初始化讯飞接口编码
    websocket.enableTrace(False)
    # True表示默认在控制台打印连接和信息发送接收情况
    wsUrl=wsParam_record.create_url()
    # 生成讯飞的url进行连接
    ws_record=websocket.WebSocketApp(wsUrl, on_message=on_message_record, on_error=on_error_record, on_close=on_close_record)
    ws_record.on_open=on_open_record# 进行websocket连接
    ws_record.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_timeout=2)
    # 无限循环,只要这个websocket连接未断开,这个循环就会一直进行下去
#讯飞星火大语言模型对话
class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, gpt_url):
      self.APPID = APPID
      self.APIKey = APIKey
      self.APISecret = APISecret
      self.host = urlparse(gpt_url).netloc
      self.path = urlparse(gpt_url).path
      self.gpt_url = gpt_url

    # 生成url
    def create_url(self):
      # 生成RFC1123格式的时间戳
      now = datetime.now()
      date = format_date_time(mktime(now.timetuple()))

      # 拼接字符串
      signature_origin = "host: " + self.host + "\n"
      signature_origin += "date: " + date + "\n"
      signature_origin += "GET " + self.path + " HTTP/1.1"

      # 进行hmac-sha256进行加密
      signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

      signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

      authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

      authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

      # 将请求的鉴权参数组合为字典
      v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
      }
      # 拼接鉴权参数,生成url
      url = self.gpt_url + '?' + urlencode(v)
      # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
      return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws,a,b):
    print("### closed ###")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, question=ws.question))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    global content
    #print(message)
    data = json.loads(message)
    code = data['header']['code']
    if code != 0:
      print(f'请求错误: {code}, {data}')
      ws.close()
    else:
      choices = data["payload"]["choices"]
      status = choices["status"]
      content =content+ choices["text"]["content"]
      print(content, end='')
      
      if status == 2:
            for i in range(0,math.ceil(len(content)/11)):
             i=u_gui.draw_text(text=content,x=0,y=i*20,font_size=15, color="#0000FF")
            ws.close()


def gen_params(appid, question):
    """
    通过appid和用户的提问来生成请参数
    """
    data = {
      "header": {
            "app_id": appid,
            "uid": "1234"
      },
      "parameter": {
            "chat": {
                "domain": "general",
                "random_threshold": 0.5,
                "max_tokens": 2048,
                "auditing": "default"
            }
      },
      "payload": {
            "message": {
                "text": [
                  {"role": "user", "content": question}
                ]
            }
      }
    }
    return data


def run_chat(appid, api_key, api_secret, gpt_url, question):
    wsParam = Ws_Param(appid, api_key, api_secret, gpt_url)
    websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.question = question
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

#语音合成
class Ws_Param_audio(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Text):
      self.APPID = APPID
      self.APIKey = APIKey
      self.APISecret = APISecret
      self.Text = Text

      # 公共参数(common)
      self.CommonArgs = {"app_id": self.APPID}
      # 业务参数(business),更多个性化参数可在官网查看
      self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "xiaoyan", "tte": "utf8"}
      self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")}
      #使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"”
      #self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")}

    # 生成url
    def create_url(self):
      url = 'wss://tts-api.xfyun.cn/v2/tts'
      # 生成RFC1123格式的时间戳
      now = datetime.now()
      date = format_date_time(mktime(now.timetuple()))

      # 拼接字符串
      signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
      signature_origin += "date: " + date + "\n"
      signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
      # 进行hmac-sha256进行加密
      signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()
      signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')

      authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
            self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
      authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
      # 将请求的鉴权参数组合为字典
      v = {
            "authorization": authorization,
            "date": date,
            "host": "ws-api.xfyun.cn"
      }
      # 拼接鉴权参数,生成url
      url = url + '?' + urlencode(v)
      # print("date: ",date)
      # print("v: ",v)
      # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
      # print('websocket url :', url)
      return url

def on_message_audio(ws, message):
   
    try:
      message =json.loads(message)
      code = message["code"]
      sid = message["sid"]
      audio = message["data"]["audio"]
      audio = base64.b64decode(audio)
      status = message["data"]["status"]
      #print(message)
      
      
      if code != 0:
            errMsg = message["message"]
            print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
      else:

            with open('demo.pcm', 'ab') as f:
                f.write(audio)
            print("OK")   
      if status == 2:
            global bs
            bs=1
            print("ws is closed")
            ws.close()   

    except Exception as e:
      print("receive msg,but parse exception:", e)



# 收到websocket错误的处理
def on_error_audio(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close_audio(ws,a,b):
   
    print("### closed ###")
   

# 收到websocket连接建立的处理
def on_open_audio(ws):
    def run(*args):
      d = {"common": wsParam_audio.CommonArgs,
             "business": wsParam_audio.BusinessArgs,
             "data": wsParam_audio.Data,
             }
      d = json.dumps(d)
      print("------>开始发送文本数据")
      ws.send(d)
      print("------>发送完成")
      if os.path.exists('./demo.pcm'):
            os.remove('./demo.pcm')

    thread.start_new_thread(run, ())
def run_audio(APPID,APISecret,APIKey,Text):
    global wsParam_audio
    wsParam_audio = Ws_Param_audio(APPID=APPID, APISecret=APISecret,
                     APIKey=APIKey,
                     Text=Text)
    websocket.enableTrace(False)
    wsUrl = wsParam_audio.create_url()
    ws_audio = websocket.WebSocketApp(wsUrl, on_message=on_message_audio, on_error=on_error_audio, on_close=on_close_audio)
    ws_audio.on_open = on_open_audio
    ws_audio.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
    p = pyaudio.PyAudio()
    stream = p.open(format=p.get_format_from_width(2), channels=1, rate=16000, output=True)

# 将 pcm 数据直接写入 PyAudio 的数据流
    with open("demo.pcm", "rb") as f:
      stream.write(f.read())
    time.sleep(0.2)
         
    stream.stop_stream()
    stream.close()
    p.terminate()

if __name__ == '__main__':
    bssb=0
    recording_results=""#语音识别结果
    content=""#对话反馈结果
    appid="d41ee990"
    api_secret="NmFkZjdhM2U3ZmU5ZmNlN2FjOGJmYjY4"
    api_key="d551eafc91a585a49beabe7ad3644ca4"
   
    while True:
      recording_results=""
      #在线语音识别
      run_record(appid,api_secret,api_key)
      content=""
      u_gui.clear()
      #讯飞星火大语言模型对话反馈
      if recording_results!="":
          run_chat(appid=appid,api_secret=api_secret,api_key=api_key,
             gpt_url="ws://spark-api.xf-yun.com/v1.1/chat",
             question=recording_results)
      #语音合成
      if content!="":
          thread1=u_gui.start_thread(u_thread1_function)
          run_audio(APPID=appid,APISecret=api_secret,APIKey=api_key,Text=content)
          u_gui.stop_thread(thread1)
          u_gui.clear()



【演示视频】
https://www.bilibili.com/video/BV1yz4y1p73C/?share_source=copy_web&vd_source=98855d5b99ff76982639c5ca6ff6f528




木子呢 发表于 2023-8-7 11:26:40

这个兔耳朵,哈哈哈哈哈

rzegkly 发表于 2023-8-10 22:32:46

木子呢 发表于 2023-8-7 11:26
这个兔耳朵,哈哈哈哈哈

漂亮

断墨寻径 发表于 2024-2-2 22:10:37

大神,在语音识别那里,下面代码报错了,错误信息是:“### error ### : Temporary failure in name resolution”。请问下怎么解决呀,谢谢。
def on_error_record(ws,error):
    # 收到websocket后错误的处理
    print("### error ### : ",error)
页: [1]
查看完整版本: 行空板 星火认知大模型 桌面助手AI兔