本帖最后由 云天 于 2024-2-6 11:38 编辑
“AI的普及不是在替代人类,而是在解放人类” 【项目背景】 作为推动新一轮科技革命和产业变革的战略性技术,AI具有很强的“头雁效应”。人机交互的新模式、新通路、新应用不断涌现,人工智能衍生的系列工具正逐渐在各个社会场景中释放潜力。在代码研发领域,随着机器学习和深度学习技术的高速发展,大语言模型解决了自然语言的歧义性问题,正在大幅提高开发效率以及代码质量。AI不仅改变了编程的方式,更在很大程度上推动了软件开发和创新的进步。1月30日,科大讯飞举行星火认知大模型V3.5升级发布会。此次首个基于全国产算力训练的全民开放大模型讯飞星火V3.5在语言理解、文本生成、知识问答、逻辑推理、数学能力、代码能力和多模态能力七个方面进行了全面升级。
【项目设计】
使用行空板结合星火认知大模型3.5,根据指令编写程序函数,并保存成py文件,再通过Function Call进行调用。实现如打印1至100或画心形图等。
【项目亮点】
1.体验了星火认知大模型的编写代码能力。
2.使用星火认知大模型的system指令明确规定模型的行为。(系统指令以自然语言编写,并通过JSON请求中的“system”角色标识。系统指令可以规定模型的口吻、任务、格式和响应的限制,从而确保模型的行为符合特定要求。)
3.使用星火认知大模型的Function Call功能。( Function Call更加规范, 它通过规范传入的Function描述及输入输出参数, 让大模型能更准确理解用户意图并与函数调用相匹配。)
4.将大模型编写的程序保存在本地,通过多轮对话启动本地函数,或通过Function Call功能命中函数,实现相应功能。
【硬件行空板】
使用两节3.7V锂电池降压5V给行空板供电。
【自动化生成外部功能函数】
1.外部功能函数 用来激活大模型中的Function Calling功能的,它能自动识别这些函数并根据预定义的逻辑生成相应的输出。主要包括三个步骤: 1.1.根据功能逻辑编写功能函数。 1.2.为每一个功能函数生成相应的JSON Schema对象描述,以启用大模型的Function Calling功能。 1.3.等待模型返回相应的执行结果。 2.首先完成第一项,编写功能函数,确定应用的整体业务逻辑,利用大模型自动拆解这些逻辑并生成相应的功能函数。 大模型可以通过精心设计的提示工程成功地自动完成这一流程。具体来说有两个细节需要注意: 2.1.如何设计合适的输入提示来引导模型 2.2.如何从模型输出的字符串中提取出有效的代码 简单来说,除了需要精心构造的提示以引导模型之外,因为大模型的输入和输出都是字符串格式,还需要开发一种机制,用于解析模型输出字符串中的代码,并将其转化为实际可运行的功能函数。 3.如何设计合适的提示词首先需要验证模型是否具备相应的能力,即它是否可以生成完整的函数,一个简单的测试示例如下:
-
- data = {
- "header": {
- "app_id": appid,
- "uid": "sxs111234"
- },
- "parameter": {
- "chat": {
- "domain": domain,
- "temperature": 0.5,
- "max_tokens": 2048,
- "auditing": "default"
- }
- },
- "payload": {
- "message": {
- "text": question
- }
- }
-
- }
复制代码
-
- question="[{
- 'role': 'system',
- 'content': '你是一个专门的Python代码编辑器。你的任务是生成准确、高质量的Python代码。请仅输出纯Python代码,不要包含任何其他类型的文本或注释,不要出现符号#,不要出现“测试语句”,不要出现“调用函数”。'
- }, {
- 'role': 'user',
- 'content': '请为我创建一个Python函数,该函数的作用是输出“Hello, World!”字符串。'
- }
- ]"
复制代码
输出:-
- 以下是一个简单的Python函数,它输出“Hello, World!”字符串:
- ```python
- def hello_world():
- print("Hello, World!")
- ```
复制代码
4.存储可执行代码
Step 1:提取代码片段想实现直接在本地调用,经过多次尝试,一种比较高效的解决问题的方法是直接在上述字符串中通过正则表达式提取出只包含Python代码的字符串,代码如下: -
- def extract_executable_python_code(markdown_str):
- """
- 提取包含在Markdown格式字符串中的Python代码。
-
- 若输入字符串是Markdown格式且包含Python代码块,此函数将提取出代码块。
- 否则,返回原字符串。
-
- 参数:
- markdown_str: 待检查的Markdown格式字符串。
-
- 返回:
- 提取出的Python代码块或None。
- """
-
- import re
-
- # 判断字符串是否是Markdown格式
- if re.search(r'```(?:python|Python|PYTHON)', markdown_str):
- # 找到代码块的开始和结束位置
- code_start = markdown_str.find('def')
- code_end = markdown_str.find('```\n', code_start)
- # 提取代码部分
- code = markdown_str[code_start:code_end]
- else:
- # 如果字符串不是Markdown格式,返回原字符串
- code = markdown_str
-
- return code
复制代码
测试一下执行结果:
- python_code = extract_executable_python_code(data["payload"]["choices"]["text"][0]["content"])
复制代码
- 'def hello_world():\n print("Hello, World!")'
复制代码
Step 2:本地存储
本地存储不仅是应用的一个关键功能,还充当着代码仓库的角色,便于大模型进行选择和调用。当使用自然语言指导大模型自动生成函数后,下一步是将这些函数保存到本地存储中。这样做的目的是确保大模型在执行Function Calling操作时能够无缝地访问并调用这些预先存储的函数。让大模型在执行Chat模型输出的字符串是一个markdown格式对象,将其保存为md格式,看看格式化的代码是什么样的,代码如下: - with open('helloworld.py', 'w', encoding='utf-8') as f:
- f.write(python_code)
复制代码
看下print_hello_world.py中的内容:
Step 3:封装函数
所以综上一个流程化的封装类是这样的:
-
- import re
- import os
- class PythonCodeExtractor:
- """
- PythonCodeExtractor 类用于从 Markdown 格式的字符串中提取 Python 代码,
- 并将其保存到指定的目录。
-
- 属性:
- working_directory: 用于存储 Python 代码文件的目录路径。
-
- 方法:
- extract_executable_python_code: 从 Markdown 字符串中提取可执行的 Python 代码。
- extract_and_save: 提取代码并保存到本地。
- """
- deffunction=0
- def __init__(self, working_directory="./"):
- """
- 初始化 PythonCodeExtractor 类。
-
- 参数:
- working_directory (str): 存储 Python 代码文件的目录路径。
- """
- self.working_directory = working_directory
- def extract_executable_python_code(self, markdown_str):
- """
- 从 Markdown 格式的字符串中提取可执行的 Python 代码。
-
- 参数:
- markdown_str (str): 包含 Python 代码的 Markdown 字符串。
-
- 返回:
- str: 提取出的 Python 代码或原字符串。
- """
- global deffunction
-
- # 使用正则表达式查找 Python 代码块
- if re.search(r'```(?:python|Python|PYTHON)', markdown_str):
- if markdown_str.find('def')!=-1:
- code_start = markdown_str.find('def')
- else:
- code_start = markdown_str.find('python')+6
- code_end = markdown_str.find('```', code_start)
-
- code = markdown_str[code_start:code_end]
- deffunction=1
- else:
- code = markdown_str
- return code
- def extract_and_save(self, markdown_input, verbosity_level=0):
- global deffunction
- global codecontent
- code = self.extract_executable_python_code(markdown_input)
- if deffunction==1:
- match = re.search(r'def (\w+)', code)
-
- if match is None:
- #print("没有找到相关的函数")
- codecontent=code
- return
- else:
- deffunction=2
- global function_name
- function_name = match.group(1)
-
- # 确保工作目录存在
- folder_path = self.working_directory
- if not os.path.exists(folder_path):
- os.makedirs(folder_path)
-
- # 完整的文件路径
- file_path = os.path.join(folder_path, f"{function_name}.py")
-
- # 将代码写入文件
- with open(file_path, 'w', encoding='utf-8') as f:
- f.write(code)
-
- # 如果 verbosity_level 设置为 1,则输出函数的详细信息
- if verbosity_level == 1:
- with open(file_path, 'r', encoding='utf-8') as f:
- content = f.read()
- #print("函数详细信息如下:")
- #print(content)
复制代码
-
- SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question,functions)
- if SparkApi.answer !="":
- getText("assistant",SparkApi.answer)
- extractor = codeEXT.PythonCodeExtractor(working_directory="./")
- extractor.extract_and_save(SparkApi.answer, verbosity_level=1)
复制代码
使用该类,即可便捷的将大模型输出结果一键进行函数提取、保存和运行。
【实践测试】
星火 API 需要通过 WebSocket 进行连接,下面代码是讯飞配置示例 SparkApi.py 文件代码,做了部分修改。
- import _thread as thread
- import base64
- import datetime
- import hashlib
- import hmac
- import json
- from urllib.parse import urlparse
- import ssl
- from datetime import datetime
- from time import mktime
- from urllib.parse import urlencode
- from wsgiref.handlers import format_date_time
-
- import websocket # 使用websocket_client
- answer = ""
- function_name=""
- class Ws_Param(object):
- # 初始化
- def __init__(self, APPID, APIKey, APISecret, Spark_url):
- self.APPID = APPID
- self.APIKey = APIKey
- self.APISecret = APISecret
- self.host = urlparse(Spark_url).netloc
- self.path = urlparse(Spark_url).path
- self.Spark_url = Spark_url
-
- # 生成url
- def create_url(self):
- # 生成RFC1123格式的时间戳
- now = datetime.now()
- date = format_date_time(mktime(now.timetuple()))
-
- # 拼接字符串
- signature_origin = "host: " + self.host + "\n"
- signature_origin += "date: " + date + "\n"
- signature_origin += "GET " + self.path + " HTTP/1.1"
-
- # 进行hmac-sha256进行加密
- signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
- digestmod=hashlib.sha256).digest()
-
- signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
-
- authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
-
- authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
-
- # 将请求的鉴权参数组合为字典
- v = {
- "authorization": authorization,
- "date": date,
- "host": self.host
- }
- # 拼接鉴权参数,生成url
- url = self.Spark_url + '?' + urlencode(v)
- # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
- return url
-
-
- # 收到websocket错误的处理
- def on_error(ws, error):
- print("### error:", error)
-
-
- # 收到websocket关闭的处理
- def on_close(ws,one,two):
- print(" ")
-
-
- # 收到websocket连接建立的处理
- def on_open(ws):
- thread.start_new_thread(run, (ws,))
-
-
- def run(ws, *args):
- data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question,functions=ws.functions))
- #print(data)
- ws.send(data)
-
-
- # 收到websocket消息的处理
- def on_message(ws, message):
- #print(message)
- data = json.loads(message)
- code = data['header']['code']
- if code != 0:
- print(f'请求错误: {code}, {data}')
- ws.close()
- else:
- choices = data["payload"]["choices"]
- status = choices["status"]
- content = choices["text"][0]["content"]
- if content=="":
- #if choices["text"][0]["function_call"] is not None:
- global function_name
- function_name = choices["text"][0]["function_call"]["name"]
-
- else:
- print(content,end ="")
- global answer
- answer += content
- # print(1)
- if status == 2:
- #print(answer)
- ws.close()
-
-
- def gen_params(appid, domain,question,functions=0):
- """
- 通过appid和用户的提问来生成请参数
- """
- if functions==0:
- data = {
- "header": {
- "app_id": appid,
- "uid": "sxs1111234"
- },
- "parameter": {
- "chat": {
- "domain": domain,
- "temperature": 0.5,
- "max_tokens": 2048
- }
- },
- "payload": {
- "message": {
- "text": question
- }
- }
- }
- else:
- data = {
- "header": {
- "app_id": appid,
- "uid": "sxs1111234"
- },
- "parameter": {
- "chat": {
- "domain": domain,
- "temperature": 0.5,
- "max_tokens": 2048,
- "auditing": "default"
- }
- },
- "payload": {
- "message": {
- "text": question
- },
- "functions": {
- "text": [
- {
- "name": "输出Hello world",
- "description": "这个Python程序自定义函数,能输出Hello World"
-
-
- }
- ]
- }
- }
-
- }
-
- return data
-
-
- def main(appid, api_key, api_secret, Spark_url,domain, question,functions):
- # print("星火:")
- wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
- websocket.enableTrace(False)
- wsUrl = wsParam.create_url()
- ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
- ws.appid = appid
- ws.question = question
- ws.domain = domain
- ws.functions = functions
- ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
-
-
复制代码
增加“Functions”-
- "functions": {
- "text": [
- {
- "name": "输出Hello world",
- "description": "这个Python程序自定义函数,能输出Hello World"
-
-
- }
- ]
- }
复制代码
【主程序代码】-
- # -*- coding: utf-8 -*-
- import SparkApi
- import codeEXT
- import importlib
- #以下密钥信息从控制台获取
- appid = "******************" #填写控制台中获取的 APPID 信息
- api_secret = "******************" #填写控制台中获取的 APISecret 信息
- api_key ="******************" #填写控制台中获取的 APIKey 信息
-
- #用于配置大模型版本,默认“general/generalv2”
- #domain = "general" # v1.5版本
- domain = "generalv3.5" # v3.5版本
- #云端环境的服务地址
- #Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址
- Spark_url = "ws://spark-api.xf-yun.com/v3.5/chat" # v3.5环境的地址
-
-
- text =[]
-
- # length = 0
-
- def getText(role,content):
- jsoncon = {}
- jsoncon["role"] = role
- jsoncon["content"] = content
- text.append(jsoncon)
- return text
-
- def getlength(text):
- length = 0
- for content in text:
- temp = content["content"]
- leng = len(temp)
- length += leng
- return length
-
- def checklen(text):
- while (getlength(text) > 8000):
- del text[0]
- return text
-
- if __name__=='__main__':
- text.clear
-
- while(1):
- functions=0
- Input=input("\n"+"我:")
- if (Input)=="sys":
- Input = input("\n" +"System:")
- question = checklen(getText("system",Input))
- Input = input("\n" +"我:")
- if Input.find('自定义')!=-1:
- functions=1
- question = checklen(getText("user",Input))
- else:
- if Input.find('自定义')!=-1:
- functions=1
- question = checklen(getText("user",Input))
- SparkApi.answer =""
- print("星火:",end = "")
-
- SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question,functions)
- if SparkApi.answer !="":
- getText("assistant",SparkApi.answer)
- extractor = codeEXT.PythonCodeExtractor(working_directory="./")
- extractor.extract_and_save(SparkApi.answer, verbosity_level=1)
- if codeEXT.deffunction==1:
-
- codeEXT.codecontent=codeEXT.codecontent.replace("\n", "")
-
- params=importlib.import_module(codeEXT.function_name)
- c = compile("params."+codeEXT.codecontent,'','exec') # 编译为字节代码对象
- exec(c)
- elif SparkApi.function_name!="":
- print("以下内容是,命中函数后输出")
- importlib.import_module(codeEXT.function_name)
复制代码
1.区分系统指令和用户指令及是否使用Function Call功能-
- functions=0
- Input=input("\n"+"我:")
- if (Input)=="sys":
- Input = input("\n" +"System:")
- question = checklen(getText("system",Input))
- Input = input("\n" +"我:")
- if Input.find('自定义')!=-1:
- functions=1
- question = checklen(getText("user",Input))
- else:
- if Input.find('自定义')!=-1:
- functions=1
- question = checklen(getText("user",Input))
- SparkApi.answer =""
- print("星火:",end = "")
复制代码
使用系统指令,可再使用指令精减掉其它非程序代码文本及不需要的程序代码。
2.通过对话执行函数或通过Function Call执行函数
- SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question,functions)
- if SparkApi.answer !="":
- getText("assistant",SparkApi.answer)
- extractor = codeEXT.PythonCodeExtractor(working_directory="./")
- extractor.extract_and_save(SparkApi.answer, verbosity_level=1)
- if codeEXT.deffunction==1:
-
- codeEXT.codecontent=codeEXT.codecontent.replace("\n", "")
-
- params=importlib.import_module(codeEXT.function_name)
- c = compile("params."+codeEXT.codecontent,'','exec') # 编译为字节代码对象
- exec(c)
- elif SparkApi.function_name!="":
- print("以下内容是,命中函数后输出")
- importlib.import_module(codeEXT.function_name)
复制代码
3.测试1,通过对话执行函数,打印Hello,World!
4.测试2,使用定义的带参函数,输出1到100
5.测试3,使用Function Call功能
6.测试4,我让大模型编写一个画心形图案,它给了一个画五角星(星形)图案(大模型还xin、xing不分?)
-
- ```python
- import turtle
-
- def draw_heart(n):
- # 设置窗口大小
- turtle.setup(320, 240)
- # 设置画笔速度
- turtle.speed(1)
- # 将画笔移动到窗口中心
- turtle.penup()
- turtle.goto(-n / 2, -n / 2)
- turtle.pendown()
- # 画心形图案
- for i in range(n):
- turtle.forward(n)
- turtle.right(144)
-
- # 调用函数并传入整数参数n
- draw_heart(50)
-
- # 结束绘制,显示窗口
- turtle.done()
- ```
复制代码
再来一次,这次应该是对了???。
执行一下,还是不对
再来一次,(请为我创建一个Python函数,该函数的作用是使用海龟库画一个填充色为红色的“心形”图案,图案大小由整数参数n决定,窗口大小为320*240,图案位于窗口中心。)
-
- import turtle
-
- def draw_heart(n):
- # 设置窗口大小
- turtle.setup(320, 240)
-
- # 将画笔移动到窗口中心
- turtle.penup()
- turtle.goto(0, -n/2)
- turtle.pendown()
-
- # 设置填充颜色为红色
- turtle.fillcolor("red")
-
- # 开始填充
- turtle.begin_fill()
-
- # 画心形图案
- for i in range(1, n+1, 2):
- turtle.circle(i, 180)
- turtle.right(180)
-
- # 结束填充
- turtle.end_fill()
-
- # 隐藏画笔
- turtle.hideturtle()
-
- # 保持窗口打开
- turtle.done()
复制代码
看来心形对于大模型来说有点难。
6.测试5,画一个正多边形
-
-
- 我:请为我创建一个Python函数,该函数的作用是使用海龟库画一个填充色为红色的“正多边形”图案,正多边形的边数由整数参数n决定,窗口大小为320*240,图案位于窗口中心
- 星火:首先,我们需要导入turtle库,然后定义一个函数,接受一个整数参数n,表示正多边形的边数。接下来,我们需要设置窗口大小为320*240,并将画笔移动到窗口中心。最后,我们使用循环绘制正多
- 以下是代码:红色。
-
- ```python
- import turtle
-
- def draw_filled_polygon(n):
- # 设置窗口大小
- turtle.setup(320, 240)
-
- # 将画笔移动到窗口中心
- turtle.penup()
- turtle.goto(0, -100)
- turtle.pendown()
-
-
- 我:sys
-
- System:你是可以是一个专门的Python代码编辑器。你的任务是生成准确、高质量的Python代码。我只要纯Python代码,不要包含任何其他类型的文本或注释,不要出现符号“#”,不要出现“测试语句”,不要出现“调用语句”。
-
- 我:请为我创建一个Python函数,该函数的作用是使用海龟库画一个填充色为红色的“正多边形”图案,正多边形的边数由整数参数n决定,窗口大小为320*240,正多边形的中心位于窗口中心。
- 星火:以下是使用Python的turtle库创建的一个函数,该函数可以画出一个填充色为红色的正多边形。
- ```python
- import turtle
- def draw_polygon(n):
- # 设置窗口大小
- turtle.setup(320, 240)
- # 设置画笔颜色
- turtle.color("red")
- # 开始填充颜色
- turtle.begin_fill()
- # 计算内角
- angle = 360 / n
- # 画出正多边形
- for _ in range(n):
- turtle.forward(100)
- turtle.right(angle)
- # 结束填充颜色
- turtle.end_fill()
- # 隐藏海龟
- turtle.hideturtle()
- # 结束画图
- turtle.done()
-
- draw_polygon(5)
- ```
- 在这段代码中,我们首先导入了turtle模块,然后定义了一个名为draw_polygon的函数,该函数接受一个参数n,表示多边形的边数。然后我们设置了窗口的大小,并指定了画笔的颜色为红色。接着我们开
- 始了填充颜色,并计算出每个内角的角度。然后我们用一个for循环来画出正多边形,每条边的长度为100,每次转向的角度为计算出的内角。最后我们结束了填充颜色,并隐藏了海龟,然后结束了画图。
-
- 我:请将上面的代码去掉”调用语句“,再输出一下
- 星火:以下是去掉调用语句后的代码:
- ```python
- import turtle
-
- def draw_polygon(n):
- turtle.setup(320, 240)
- turtle.color("red")
- turtle.begin_fill()
- angle = 360 / n
- for _ in range(n):
- turtle.forward(100)
- turtle.right(angle)
- turtle.end_fill()
- turtle.hideturtle()
- turtle.done()
复制代码
直接在指令中加入“代码最后不要出现“调用函数”相关代码”
-
- 我:请为我创建一个Python函数,该函数的作用是使用海龟库画一个填充色为红色的“正多边形”图案,正多边形的边数由整数参数n决定,窗口大小为320*240,正多边形的中心位于窗口中心。代码最后不要出现“调用函数”相关代码。
- 星火:首先,我们需要导入turtle库,然后定义一个函数,该函数接受一个整数参数n,表示正多边形的边数。接下来,我们需要设置窗口大小为320*240,并将海龟移动到窗口中心。最后,我们使用循环绘制正多边形,并填充红色。
-
- 以下是代码:
-
- ```python
- import turtle
-
- def draw_filled_polygon(n):
- turtle.setup(320, 240)
- turtle.penup()
- turtle.goto(-160, -120)
- turtle.pendown()
- turtle.color("red")
- turtle.begin_fill()
- for _ in range(n):
- turtle.forward(100)
- turtle.left(360 / n)
- turtle.end_fill()
- turtle.done()
- ```
-
- 这段代码将创建一个窗口,并在其中绘制一个填充色为红色的正五边形。你可以通过更改`draw_filled_polygon`函数的参数来改变正多边形的边数。
复制代码
中心这件事,代码弄错了。有待加强!
【行空板】
1."functions"增加“画五角星图案”。
-
-
- { "name": "画五角星图案",
- "description": "这个Python程序自定义函数,该函数的作用是使用海龟库画一个填充色为红色的“五角星”图案,边长是100。"
- }
复制代码
2.测试指令
-
- 我:请为我创建一个Python函数,该函数的作用是使用海龟库画一个填充色为红色的“五角星”图案,边长是100。窗口大小为320*240,五角星的中心位于窗口中心。
- 星火:首先,我们需要导入turtle库。然后,创建一个名为draw_star的函数,该函数接受一个参数side_length,表示五角星的边长。在函数中,我们将设置窗口大小、画笔颜色和填充颜色。接下来,我们以下是代码:绘制五角星的每一条边。最后,我们调用这个函数并传入边长100作为参数。
-
- ```python
- import turtle
-
- def draw_star(side_length):
- # 设置窗口大小
- turtle.setup(320, 240)
-
- # 设置画笔颜色和填充颜色
- turtle.pencolor("black")
- turtle.fillcolor("red")
-
- # 开始填充
- turtle.begin_fill()
-
- # 绘制五角星
- for _ in range(5):
- turtle.forward(side_length)
- turtle.right(144)
-
- # 结束填充
- turtle.end_fill()
-
- # 隐藏海龟
- turtle.hideturtle()
-
- # 保持窗口打开
- turtle.done()
-
- # 调用函数,传入边长100
- draw_star(100)
- ```
-
- 我:请使用自定义函数,画一个五角星。
- 星火:
- 以下内容是,命中函数后输出
复制代码
3.文本输入使用讯飞语音识别。
监听程序文件linstening.py代码-
-
- import pyaudio,wave
- import numpy as np
- def listen():
-
- temp = 20
- CHUNK = 1024
- FORMAT = pyaudio.paInt16
- CHANNELS = 1
- RATE = 16000
- RECORD_SECONDS = 2
- WAVE_OUTPUT_FILENAME = 'record.wav'
-
- mindb=2000 #最小声音,大于则开始录音,否则结束
- delayTime=1.3 #小声1.3秒后自动终止
- p = pyaudio.PyAudio()
- stream = p.open(format=FORMAT,
- channels=CHANNELS,
- rate=RATE,
- input=True,
- frames_per_buffer=CHUNK)
- #snowboydecoder.play_audio_file()
- print("开始!计时")
-
- frames = []
- flag = False # 开始录音节点
- stat = True #判断是否继续录音
- stat2 = False #判断声音小了
-
- tempnum = 0 #tempnum、tempnum2、tempnum3为时间
- tempnum2 = 0
-
- while stat:
- data = stream.read(CHUNK,exception_on_overflow = False)
-
- audio_data = np.frombuffer(data, dtype=np.short)
- temp = np.max(audio_data)
- if temp > mindb and flag==False:
- flag =True
- print("开始录音")
- tempnum2=tempnum
-
- if flag:
- frames.append(data)
- if(temp < mindb and stat2==False):
- stat2 = True
- tempnum2 = tempnum
- print("声音小,且之前是是大的或刚开始,记录当前点")
- if(temp > mindb):
- stat2 =False
- tempnum2 = tempnum
- #刷新
-
- if(tempnum > tempnum2 + delayTime*15 and stat2==True):
- print("间隔%.2lfs后开始检测是否还是小声"%delayTime)
- if(stat2 and temp < mindb):
- stat = False
- #还是小声,则stat=True
- print("小声!")
- else:
- stat2 = False
- print("大声!")
-
-
- print(str(temp) + " " + str(tempnum))
- tempnum = tempnum + 1
- if tempnum > 3600:
- tempnum=0 #超时直接退出
- #stat = False
- print("录音结束")
-
- stream.stop_stream()
- stream.close()
- p.terminate()
- wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
- wf.setnchannels(CHANNELS)
- wf.setsampwidth(p.get_sample_size(FORMAT))
- wf.setframerate(RATE)
- wf.writeframes(b''.join(frames))
- wf.close()
-
-
复制代码
4.行空板主程序使引脚22上的按钮,启动语音识别。
-
- # -*- coding: utf-8 -*-
- import SparkApi
- import codeEXT
- import importlib
-
- import xunfeiasr
- from pinpong.extension.unihiker import *
- from pinpong.board import Board,Pin
- import listening
- #以下密钥信息从控制台获取
- appid = "**************" #填写控制台中获取的 APPID 信息
- api_secret = "*************" #填写控制台中获取的 APISecret 信息
- api_key ="***************" #填写控制台中获取的 APIKey 信息
-
- #用于配置大模型版本,默认“general/generalv2”
- #domain = "general" # v1.5版本
- domain = "generalv3.5" # v3.5版本
- #云端环境的服务地址
- #Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址
- Spark_url = "ws://spark-api.xf-yun.com/v3.5/chat" # v3.5环境的地址
- xunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)
- Board().begin()
- p_p22_in=Pin(Pin.P22, Pin.IN)
- text =[]
-
- # length = 0
-
- def getText(role,content):
- jsoncon = {}
- jsoncon["role"] = role
- jsoncon["content"] = content
- text.append(jsoncon)
- return text
-
- def getlength(text):
- length = 0
- for content in text:
- temp = content["content"]
- leng = len(temp)
- length += leng
- return length
-
- def checklen(text):
- while (getlength(text) > 8000):
- del text[0]
- return text
-
- if __name__=='__main__':
- text.clear
-
- while(1):
- functions=0
- if (p_p22_in.read_digital()==True):
- listening.listen()
- Input=xunfeiasr.xunfeiasr(r"record.wav")
- if (Input)=="sys":
- listening.listen()
- Input=xunfeiasr.xunfeiasr(r"record.wav")
- question = checklen(getText("system",Input))
- listening.listen()
- Input=xunfeiasr.xunfeiasr(r"record.wav")
- if Input.find('自定义')!=-1:
- functions=1
- question = checklen(getText("user",Input))
- else:
- if Input.find('自定义')!=-1:
- functions=1
- question = checklen(getText("user",Input))
- SparkApi.answer =""
- print("星火:",end = "")
-
- SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question,functions)
- if SparkApi.answer !="":
- getText("assistant",SparkApi.answer)
- extractor = codeEXT.PythonCodeExtractor(working_directory="./")
- extractor.extract_and_save(SparkApi.answer, verbosity_level=1)
- if codeEXT.deffunction==1:
-
- codeEXT.codecontent=codeEXT.codecontent.replace("\n", "")
-
- params=importlib.import_module(codeEXT.function_name)
- c = compile("params."+codeEXT.codecontent,'','exec') # 编译为字节代码对象
- exec(c)
- elif SparkApi.function_name!="":
- print("以下内容是,命中函数后输出")
- importlib.import_module(codeEXT.function_name)
复制代码
【演示视频】
|