云天 发表于 2024-1-9 18:58:03

用大语言模型控制行空板机器人

本帖最后由 云天 于 2024-1-9 18:58 编辑

【项目背景】
       百模大战愈演愈烈,大模型在各个社会场景中释放出的潜力已肉眼可见,不断推动着生产力的变革。技术的迭代升级,让开发门槛进一步降低,为开发者带来了新机遇。
       2023科大讯飞全球1024开发者节上,讯飞星火认知大模型V3.0如约而至,七大能力持续提升,整体超越ChatGPT,医疗六大核心能力超越GPT-4。
       现在讯飞星火V3.0已向开发者开放调用,多项能力提升的同时,新增FunctionCall 、System指令和大模型定制训练平台,满足开发者更多场景调用需求,带来优质人机交互体验。

       讯飞星火FunctionCall:
       让大模型实现与外部物理世界的交互,其中的关键就是对外部功能的调用,而其底层就是大模型FunctionCall的支持能力。讯飞星火新推出的FunctionCall 可调用自有函数生成内容,让大模型能更准确理解用户意图并与函数调用相匹配。对于开发者来说,Functioncall 功能的推出不仅使研发成本大大降低,还有助于打造更懂用户的应用。
【项目设计】
       本项目使用行空板结合星火大语言模型,让机器人理解我说的话(话的内容为控制机器人做动作的指令:抬起手、放下手,但话的内容不固定),做出相应的动作。同时配合讯飞语音识别和语音合成功能,用户可以与行空板机器人进行语音对话。
【演示视频】
https://www.bilibili.com/video/BV1qC4y1Y76L/?share_source=copy_web&vd_source=98855d5b99ff76982639c5ca6ff6f528
【Function Call分析】
       从人机交互上来说, Function Call 本质上只做了一件事,那就是实现了「准确识别用户的语义,将其转为结构化的指令」。而这,非常了不起。
       它实现的最大的价值,就是让机器轻易地理解了用户模糊化的输入,将其转换为机器可以理解的技术指令。这对于人机交互的范式来说,完全是质的改变。
       官方文档里给出了使用模型进行Function Call 工具调用的流程(星火认知大模型Web API文档 | 讯飞开放平台文档中心 (xfyun.cn)):
我:明天我应该穿什么衣服?
星火:{"header":{"code":0,"message":"Success","sid":"cht000b8e77@dx18ce887dd74b8f3550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{\"datetime\":\"明天\",\"location\":\"CURRENT_CITY\"}","name":"天气查询"},"index":0}]},"usage":{"text":{"question_tokens":6,"prompt_tokens":6,"completion_tokens":0,"total_tokens":6}}}}       假设我们正在开发一个聊天类的AI应用,它需要根据用户输入的问题来生成相应的回答。我们在提问上并没有直接问明天的天气如何,而是问「我明天应该穿什么衣服?」此时 语言完全理解了我的问题,而且知道穿衣建议是需要了解相关天气情况的。那么机器人应该返回一个关于今天天气的字符串。为了实现这个功能,我们可以定义一个名为“天气查询”的函数,它接受两个参数:一个表示日期的字符串、一个表示所在城市的字符串。然后,我们可以将这个函数的描述传递给大模型,让它生成一个包含调用该函数所需参数的JSON对象。最后,当用户输入一个问题时,我们可以让大模型分析问题并确定是否需要调用“天气查询”函数。如果需要,我们可以从JSON对象中提取所需的参数,并将它们传递给"天气查"询函数。这样,我们就可以根据用户输入的问题来生成相应的回答了。       为了让机器理解人类的意图,过去我们想方设法去「约束用户行为」或者「猜测用户意图』。但时代已经开始变了,通过 Function Call, 我们只需要在发送给 GPT 请求时加一个functions的参数,告知 AI 可以调用的外部方法有什么,然后 AI 就能够自动分析问题的上下文,并通过多轮对话来收集必要的调用参数,最后拼合返回调用方法的 JSON。【修改functions】       定制了一个"机器人控制"functions,描述为:“机器人控制可以控制机器人的左右手相应动作,你可以让机器人抬起、放下左右手。可以是只控制一只手,也可能同时控制两只手。”
       "payload": {
            "message": {
                "text": question
            },
            "functions": {
                "text": [
                  {
                        "name": "机器人控制",
                        "description": "机器人控制可以控制机器人的左右手相应动作,你可以让机器人抬起、放下左右手。可以是只控制一只手,也可能同时控制两只手",
                        "parameters": {
                            "type": "object",
                            "properties": {
                              "hand": {
                                    "type": "string",
                                    "description": "控制对象,左手或右手或左右手"
                              },
                              "action": {
                                    "type": "string",
                                    "description": "动作,抬起或放下"
                              }
                            },
                            "required": [
                              "hand","action"
                            ]
                        }
                  }
                ]
            }   
      }
       经过对话发现星火对"让两只手都抬起",给出的反馈“hand”为“双手”,此处给星火点赞!


我:让机器人抬起左手
星火:{"header":{"code":0,"message":"Success","sid":"cht000bf06b@dx18ce8ce92c5b8f3550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{\"action\":\"抬起\",\"hand\":\"左手\"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":4,"prompt_tokens":4,"completion_tokens":0,"total_tokens":4}}}}
我:放下右手
星火:{"header":{"code":0,"message":"Success","sid":"cht000bf6ba@dx18ce8cee8eab8f2550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{\"action\":\"放下\",\"hand\":\"右手\"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":2,"prompt_tokens":6,"completion_tokens":0,"total_tokens":6}}}}

我:抬起右手
星火:{"header":{"code":0,"message":"Success","sid":"cht000b09e3@dx18ce8d1b7e69a4b550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{\"action\":\"抬起\",\"hand\":\"右手\"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":2,"prompt_tokens":2,"completion_tokens":0,"total_tokens":2}}}}

我:让两只手都抬起
星火:{"header":{"code":0,"message":"Success","sid":"cht000b0bea@dx18ce8d202b79a4b550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{\"action\":\"抬起\",\"hand\":\"双手\"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":5,"prompt_tokens":7,"completion_tokens":0,"total_tokens":7}}}}【定义行空板控制舵机函数】
       定义控制舵机函数“RobotControl”,根据用户输入参数的值。"hand"指控制操作的对象:左手、右手、双手。“action”指控制动作:抬起、放下。

def RobotControl(hand,action):
    if hand=="左手":
      if action=="抬起":
            servoL.write_angle(90)
      else:
            servoL.write_angle(178)
    elif hand=="右手":
      if action=="抬起":
         servoR.write_angle(90)
      else:
         servoR.write_angle(2)
    elif hand=="双手":
      if action=="抬起":
         servoL.write_angle(90)
         servoR.write_angle(90)
      else:
         servoR.write_angle(2)
         servoL.write_angle(178)

      另我兴奋的是,我输入“把右手举起来”,星火也能反馈回来action:抬起。(当然此时,我已看到我的简易机器人,把它的“右手”“抬起”来了。
我:把右手举起来
星火:{"header":{"code":0,"message":"Success","sid":"cht000b8d3e@dx18ce96fcae5b8f2550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{\"action\":\"抬起\",\"hand\":\"右手\"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":4,"prompt_tokens":4,"completion_tokens":0,"total_tokens":4}}}}【简易机器人制作】

【SparkApi模块文件完整代码】

import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time

import websocket# 使用websocket_client
from pinpong.extension.unihiker import *
from pinpong.board import Servo
from pinpong.board import Board
from pinpong.board import Pin
Board().begin()
pin1 = Pin(Pin.D23)
pin2 = Pin(Pin.D22)
servoL = Servo(pin1)#左手舵机
servoR = Servo(pin2)#右手舵机
#左右手放平
servoL.write_angle(178)
servoR.write_angle(2)
hand=""
action=""

answer = ""

class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Spark_url):
      self.APPID = APPID
      self.APIKey = APIKey
      self.APISecret = APISecret
      self.host = urlparse(Spark_url).netloc
      self.path = urlparse(Spark_url).path
      self.Spark_url = Spark_url

    # 生成url
    def create_url(self):
      # 生成RFC1123格式的时间戳
      now = datetime.now()
      date = format_date_time(mktime(now.timetuple()))

      # 拼接字符串
      signature_origin = "host: " + self.host + "\n"
      signature_origin += "date: " + date + "\n"
      signature_origin += "GET " + self.path + " HTTP/1.1"

      # 进行hmac-sha256进行加密
      signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

      signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

      authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

      authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

      # 将请求的鉴权参数组合为字典
      v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
      }
      # 拼接鉴权参数,生成url
      url = self.Spark_url + '?' + urlencode(v)
      # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
      return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws,one,two):
    print(" ")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    global hand,action
    #print(message)
    data = json.loads(message)
    code = data['header']['code']
    if code != 0:
      print(f'请求错误: {code}, {data}')
      ws.close()
    else:
      choices = data["payload"]["choices"]
      status = choices["status"]
      content = choices["text"]["content"]
      print(content,end ="")
      global answer
      answer += content
      # print(1)
      if choices["text"]["function_call"] is not None:
            function_call=choices["text"]["function_call"]
            if function_call["name"]=="机器人控制":
               arguments=json.loads(function_call["arguments"])
               action=arguments["action"]
               hand=arguments["hand"]

               RobotControl(hand,action)
      if status == 2:
            ws.close()


def gen_params(appid, domain,question):
    """
    通过appid和用户的提问来生成请参数
    """
    data = {
      "header": {
            "app_id": appid,
            "uid": "1234"
      },
      "parameter": {
            "chat": {
                "domain": domain,
                "temperature": 0.5,
                "max_tokens": 2048
            }
      },
      "payload": {
            "message": {
                "text": question
            },
            "functions": {
                "text": [
                  {
                        "name": "机器人控制",
                        "description": "“机器人控制”可以控制机器人的左右手相应动作,你可以使用动作指令让机器人抬起、放下左右手。动作指令只包括抬起和放下。可以是只控制一只手,也可能同时控制双手",
                        "parameters": {
                            "type": "object",
                            "properties": {
                              "hand": {
                                    "type": "string",
                                    "description": "控制对象,左手或右手或左右手"
                              },
                              "action": {
                                    "type": "string",
                                    "description": "动作,抬起或放下"
                              }
                            },
                            "required": [
                              "hand","action"
                            ]
                        }
                  }
                ]
            }   
      }
    }
    return data

def RobotControl(hand,action):
    if hand=="左手":
      if action=="抬起":
            servoL.write_angle(90)
      else:
            servoL.write_angle(178)
    elif hand=="右手":
      if action=="抬起":
         servoR.write_angle(90)
      else:
         servoR.write_angle(2)
    elif hand=="双手":
      if action=="抬起":
         servoL.write_angle(90)
         servoR.write_angle(90)
      else:
         servoR.write_angle(2)
         servoL.write_angle(178)
def main(appid, api_key, api_secret, Spark_url,domain, question):
    # print("星火:")
    wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
    websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.question = question
    ws.domain = domain
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})



【使用语音交互】
1.语音输入
(1)音量监听#主程序
import listening       音量监听listening.py模块文件代码
import pyaudio,wave
import numpy as np
def listen():

    temp = 20
    CHUNK = 1024
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 16000
    RECORD_SECONDS = 2
    WAVE_OUTPUT_FILENAME = 'record.wav'

    mindb=1500    #最小声音,大于则开始录音,否则结束
    delayTime=1.3#小声1.3秒后自动终止
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT,
                  channels=CHANNELS,
                  rate=RATE,
                  input=True,
                  frames_per_buffer=CHUNK)
    #snowboydecoder.play_audio_file()
    print("开始!计时")

    frames = []
    flag = False            # 开始录音节点
    stat = True                              #判断是否继续录音
    stat2 = False                        #判断声音小了

    tempnum = 0                              #tempnum、tempnum2、tempnum3为时间
    tempnum2 = 0

    while stat:
      data = stream.read(CHUNK,exception_on_overflow = False)
      
      audio_data = np.frombuffer(data, dtype=np.short)
      temp = np.max(audio_data)
      if temp > mindb and flag==False:
            flag =True
            print("开始录音")
            tempnum2=tempnum

      if flag:
            frames.append(data)
            if(temp < mindb and stat2==False):
                stat2 = True
                tempnum2 = tempnum
                print("声音小,且之前是是大的或刚开始,记录当前点")
            if(temp > mindb):
                stat2 =False
                tempnum2 = tempnum
                #刷新

            if(tempnum > tempnum2 + delayTime*15 and stat2==True):
                print("间隔%.2lfs后开始检测是否还是小声"%delayTime)
                if(stat2 and temp < mindb):
                  stat = False
                  #还是小声,则stat=True
                  print("小声!")
                else:
                  stat2 = False
                  print("大声!")


      print(str(temp)+"      " +str(tempnum))
      tempnum = tempnum + 1
      if tempnum > 3600:
            tempnum=0                        #超时直接退出
            #stat = False
    print("录音结束")

    stream.stop_stream()
    stream.close()
    p.terminate()
    wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(p.get_sample_size(FORMAT))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))
    wf.close()
(2)讯飞语音识别
主程序 导入讯飞语音识别库
import xunfeiasr
xunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)
将主程序文字输入修改为讯飞语音识别文本输入。
修改:
Input = input("\n" +"我:")
为:listening.listen()
ShiBieNaRong=xunfeiasr.xunfeiasr(r"record.wav")
       经测试星火有时会反馈其它action,所以在描述description中增加对动作指令的限制。
"description": "“机器人控制”可以控制机器人的左右手相应动作,你可以使用动作指令让机器人抬起、放下左右手。动作指令只包括抬起和放下。可以是只控制一只手,也可能同时控制双手",       使用语音输入如:把右手向上,也能实现机器人抬起右手。nice!

       主程序代码:

import SparkApi
import listening
#以下密钥信息从控制台获取
appid = "*********"   #填写控制台中获取的 APPID 信息
api_secret = "***********"   #填写控制台中获取的 APISecret 信息
api_key ="**************"    #填写控制台中获取的 APIKey 信息
import xunfeiasr
xunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)
#用于配置大模型版本,默认“general/generalv2”
#domain = "general"   # v1.5版本
# domain = "generalv2"    # v2.0版本
domain = "generalv3"    # v3.0版本
#云端环境的服务地址
#Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat"# v1.5环境的地址
# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat"# v2.0环境的地址
Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"

text =[]

# length = 0

def getText(role,content):
    jsoncon = {}
    jsoncon["role"] = role
    jsoncon["content"] = content
    text.append(jsoncon)
    return text

def getlength(text):
    length = 0
    for content in text:
      temp = content["content"]
      leng = len(temp)
      length += leng
    return length

def checklen(text):
    while (getlength(text) > 8000):
      del text
    return text
   


if __name__ == '__main__':
    text.clear
    while(1):
      listening.listen()
      Input=xunfeiasr.xunfeiasr(r"record.wav")
      question = checklen(getText("user",Input))
      SparkApi.answer =""
      print("星火:",end = "")
      SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question)
      getText("assistant",SparkApi.answer)
      # print(str(text))


2.行空板屏幕表情
from unihiker import GUI
u_gui=GUI()
表情=u_gui.draw_emoji(emoji="Peace",x=0,y=0,duration=0.2)
3.测试视频:

https://www.bilibili.com/video/BV1eT4y1p7DH/?share_source=copy_web&vd_source=98855d5b99ff76982639c5ca6ff6f528
4.机器人语音回答
    (1)设置行空板蓝牙连接蓝牙音箱。       在行空板终端窗口下使用bluetoothctl配置蓝牙连接蓝牙音箱。

    (2)讯飞语音合成
       初始化
from df_xfyun_speech import XfTts
from unihiker import Audio
u_audio = Audio()
options = {}
business_args = {"aue":"raw","vcn":"xiaoyan","tte":"utf8","speed":50,"volume":50,"pitch":50,"bgs":0}
options["business_args"] = business_args
tts = XfTts(appid, api_key, api_secret, options)       主程序循环体while(1)中增加,当星火有回复时:       if SparkApi.answer!="":
             tts.synthesis(answer+"……", "speech.wav")
             u_audio.play("speech.wav")

当命中机器人控制时:

      if SparkApi.hand!="":
            tts.synthesis("好的"+SparkApi.hand+SparkApi.action+"了", "speech.wav")
            u_audio.play("speech.wav")
【主程序完整代码】

import SparkApi
import listening
import time
#以下密钥信息从控制台获取
appid = "*******************"   #填写控制台中获取的 APPID 信息
api_secret = "***************************"   #填写控制台中获取的 APISecret 信息
api_key ="*******************************"    #填写控制台中获取的 APIKey 信息
import xunfeiasr
xunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)
from df_xfyun_speech import XfTts
from unihiker import Audio
u_audio = Audio()
options = {}
business_args = {"aue":"raw","vcn":"xiaoyan","tte":"utf8","speed":50,"volume":50,"pitch":50,"bgs":0}
options["business_args"] = business_args
global tts
tts = XfTts(appid, api_key, api_secret, options)

#用于配置大模型版本,默认“general/generalv2”
#domain = "general"   # v1.5版本
# domain = "generalv2"    # v2.0版本
domain = "generalv3"    # v3.0版本
#云端环境的服务地址
#Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat"# v1.5环境的地址
# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat"# v2.0环境的地址
Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"

text =[]
from unihiker import GUI
u_gui=GUI()
表情=u_gui.draw_emoji(emoji="Peace",x=0,y=0,duration=0.2)

# length = 0

def getText(role,content):
    jsoncon = {}
    jsoncon["role"] = role
    jsoncon["content"] = content
    text.append(jsoncon)
    return text

def getlength(text):
    length = 0
    for content in text:
      temp = content["content"]
      leng = len(temp)
      length += leng
    return length

def checklen(text):
    while (getlength(text) > 8000):
      del text
    return text
   


if __name__ == '__main__':
    text.clear
    while(1):
      listening.listen()
      表情.config(emoji="Wink")
      Input=xunfeiasr.xunfeiasr(r"record.wav")
      print(Input)
      question = checklen(getText("user",Input))
      SparkApi.answer =""
      SparkApi.hand=""
      print("星火:",end = "")
      SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question)
      if SparkApi.answer!="":
             tts.synthesis(SparkApi.answer+"……", "speech.wav")
             u_audio.play("speech.wav")
             time.sleep(3)
      if SparkApi.hand!="":
            tts.synthesis("好的"+SparkApi.hand+SparkApi.action+"了", "speech.wav")
            u_audio.play("speech.wav")
      getText("assistant",SparkApi.answer)
      表情.config(emoji="Peace")
      # print(str(text))



【注:System指令】
       System指令可对模型在多轮对话中的角色信息进行快捷约束,避免模型定制,即要求模型按照约定的方式进行后续响应。相比prompt约束,可以解决prompt拼接不连贯、不支持多轮的问题。













木子呢 发表于 2024-1-10 09:58:29

wow!越来越丰富了,期待更多行空板+大语言模型的项目!!!

云天 发表于 2024-1-10 11:20:31

木子呢 发表于 2024-1-10 09:58
wow!越来越丰富了,期待更多行空板+大语言模型的项目!!!

最近一直在研究这方面内容。

许培享 发表于 2024-1-13 10:21:46

仰慕仰慕

zhouzhuofeng 发表于 2024-1-13 15:05:42

期待更多行空板+大语言模型的项目!!!

海川 发表于 2024-1-19 19:56:32

期待更多行空板+大语言模型的项目!!!

zhangzb61759 发表于 2024-1-26 22:34:31

云天老师太厉害了,学习了学习了

罗罗罗 发表于 2024-4-14 10:59:04

厉害了

刘睿鹏 发表于 2024-5-6 21:24:00

说方言它能懂吗
页: [1]
查看完整版本: 用大语言模型控制行空板机器人