2024-11-15 09:54:17 [显示全部楼层]
11607浏览
查看: 11607|回复: 1

[M10项目] 行空板——“AI手相师”

[复制链接]

【项目背景】

        在当今这个科技迅猛发展的时代,人工智能(AI)已经渗透到了我们生活的方方面面,从智能家居到自动驾驶,从数据分析到娱乐休闲。而“AI手相师”项目正是将AI技术与传统手相学相结合,为用户提供一种全新的娱乐体验。

        手相学,作为一种古老的占卜艺术,一直吸引着对人类命运和性格的好奇。然而,传统手相学需要专业的手相大师进行解读,这不仅限制了其普及性,也增加了获取手相解读的难度。随着深度学习和图像识别技术的进步,我们有机会将这一传统艺术数字化,使其更加易于接触和理解。

【项目设计】

        “AI手相师”项目利用行空板和摄像头捕捉用户的手掌图像,通过Python的OpenCV库进行图像处理和采集。结合讯飞星火认知大模型的图像理解功能,AI能够分析手掌的线条、形状和纹理,从而提供个性化的手相解读。这种解读不仅基于传统手相学的理论,还融入了现代心理学和行为学的研究成果,使得解读更加科学和有趣。

        为了提升用户体验,项目还集成了语音合成功能,将AI生成的手相分析文本转换为自然流畅的语音。通过pyaudio库和wave库,这些语音信息可以通过蓝牙音箱播放,为用户提供一种更加沉浸式的体验。整个过程无需人工干预,用户只需简单操作即可获得专业的手相解读,极大地提高了娱乐性和便捷性。

        “AI手相师”项目不仅是一种娱乐工具,它还可能激发人们对自我认知和个性探索的兴趣,为用户提供了一种全新的自我了解和娱乐方式。通过这种方式,我们希望能够将传统手相学的魅力以现代科技的形式传承下去,让更多人享受到科技带来的乐趣。

【项目硬件】

        项目硬件:行空板、摄像头、蓝牙音箱、按钮、充电宝(电源)、手势提示板。

行空板——“AI手相师”图1

行空板——“AI手相师”图2

手势提示板分左右手

行空板——“AI手相师”图3

充电宝(电源)在底座里

【图像理解】

        用户输入一张图片和问题,从而识别出图片中的对象、场景等信息回答用户的问题。https://console.xfyun.cn/services/image

行空板——“AI手相师”图4


【程序代码】
        1.图像理解这段Python程序,它利用WebSocket协议与讯飞星火认知大模型的图像理解服务进行通信。脚本的主要功能是读取本地存储的手掌图像,将其编码为Base64格式,并通过WebSocket发送到服务器进行手相分析。服务器处理后返回的分析结果以文本形式输出。此外,脚本还具备错误处理和连接管理功能,确保了与服务器通信的稳定性和安全性。
  1. import _thread as thread
  2. import base64
  3. import datetime
  4. import hashlib
  5. import hmac
  6. import json
  7. from urllib.parse import urlparse
  8. import ssl
  9. from datetime import datetime
  10. from time import mktime
  11. from urllib.parse import urlencode
  12. from wsgiref.handlers import format_date_time
  13. import websocket  # 使用websocket_client
  14. appid = "**************"    #填写控制台中获取的 APPID 信息
  15. api_secret = "***************"   #填写控制台中获取的 APISecret 信息
  16. api_key ="*****************"    #填写控制台中获取的 APIKey 信息
  17. imagedata = open("hand.jpg",'rb').read()
  18. imageunderstanding_url = "wss://spark-api.cn-huabei-1.xf-yun.com/v2.1/image"#云端环境的服务地址
  19. text =[{"role": "user", "content": str(base64.b64encode(imagedata), 'utf-8'), "content_type":"image"}]
  20. class Ws_Param(object):
  21.     # 初始化
  22.     def __init__(self, APPID, APIKey, APISecret, imageunderstanding_url):
  23.         self.APPID = APPID
  24.         self.APIKey = APIKey
  25.         self.APISecret = APISecret
  26.         self.host = urlparse(imageunderstanding_url).netloc
  27.         self.path = urlparse(imageunderstanding_url).path
  28.         self.ImageUnderstanding_url = imageunderstanding_url
  29.     # 生成url
  30.     def create_url(self):
  31.         # 生成RFC1123格式的时间戳
  32.         now = datetime.now()
  33.         date = format_date_time(mktime(now.timetuple()))
  34.         # 拼接字符串
  35.         signature_origin = "host: " + self.host + "\n"
  36.         signature_origin += "date: " + date + "\n"
  37.         signature_origin += "GET " + self.path + " HTTP/1.1"
  38.         # 进行hmac-sha256进行加密
  39.         signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
  40.                                  digestmod=hashlib.sha256).digest()
  41.         signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
  42.         authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
  43.         authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  44.         # 将请求的鉴权参数组合为字典
  45.         v = {
  46.             "authorization": authorization,
  47.             "date": date,
  48.             "host": self.host
  49.         }
  50.         # 拼接鉴权参数,生成url
  51.         url = self.ImageUnderstanding_url + '?' + urlencode(v)
  52.         #print(url)
  53.         # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
  54.         return url
  55. # 收到websocket错误的处理
  56. def on_error(ws, error):
  57.     print("### error:", error)
  58. # 收到websocket关闭的处理
  59. def on_close(ws,one,two):
  60.     print(" ")
  61. # 收到websocket连接建立的处理
  62. def on_open(ws):
  63.     thread.start_new_thread(run, (ws,))
  64. def run(ws, *args):
  65.     data = json.dumps(gen_params(appid=ws.appid, question= ws.question ))
  66.     ws.send(data)
  67. # 收到websocket消息的处理
  68. def on_message(ws, message):
  69.     #print(message)
  70.     data = json.loads(message)
  71.     code = data['header']['code']
  72.     if code != 0:
  73.         print(f'请求错误: {code}, {data}')
  74.         ws.close()
  75.     else:
  76.         choices = data["payload"]["choices"]
  77.         status = choices["status"]
  78.         content = choices["text"][0]["content"]
  79.         print(content,end ="")
  80.         global answer
  81.         answer += content
  82.         # print(1)
  83.         if status == 2:
  84.             ws.close()
  85. def gen_params(appid, question):
  86.     """
  87.     通过appid和用户的提问来生成请参数
  88.     """
  89.     data = {
  90.         "header": {
  91.             "app_id": appid
  92.         },
  93.         "parameter": {
  94.             "chat": {
  95.                 "domain": "image",
  96.                 "temperature": 0.5,
  97.                 "top_k": 4,
  98.                 "max_tokens": 2028,
  99.                 "auditing": "default"
  100.             }
  101.         },
  102.         "payload": {
  103.             "message": {
  104.                 "text": question
  105.             }
  106.         }
  107. }
  108.     return data
  109. def main(appid, api_key, api_secret, imageunderstanding_url,question):
  110.     wsParam = Ws_Param(appid, api_key, api_secret, imageunderstanding_url)
  111.     websocket.enableTrace(False)
  112.     wsUrl = wsParam.create_url()
  113.     ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
  114.     ws.appid = appid
  115.     #ws.imagedata = imagedata
  116.     ws.question = question
  117.     ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
  118. def getText(role, content):
  119.     jsoncon = {}
  120.     jsoncon["role"] = role
  121.     jsoncon["content"] = content
  122.     text.append(jsoncon)
  123.     return text
  124. def getlength(text):
  125.     length = 0
  126.     for content in text:
  127.         temp = content["content"]
  128.         leng = len(temp)
  129.         length += leng
  130.     return length
  131. def checklen(text):
  132.     #print("text-content-tokens:", getlength(text[1:]))
  133.     while (getlength(text[1:])> 8000):
  134.         del text[1]
  135.     return text
  136. if __name__ == '__main__':
  137.     #text.clear
  138.     if 1:
  139.         Input = "你是一位传统手相学大师,下面请分析这张手相图片。手相解读形成一段话。开头要这样说:从你的手相上可看出你的……"
  140.         question = checklen(getText("user",Input))
  141.         answer = ""
  142.         print("答:",end = "")
  143.         main(appid, api_key, api_secret, imageunderstanding_url, question)
  144.         getText("assistant", answer)
  145.         # print(str(text))
复制代码
      2.图像采集       这段Python脚本利用OpenCV库和Pinpong框架,通过连接到行空板的摄像头实时捕获视频流,并在一个名为“Mind+'s Windows”的窗口中显示。用户可以通过按下ESC键来终止视频捕获。此外,脚本还监测连接到Pin P21的数字输入信号,一旦检测到高电平信号,它会将当前的视频帧保存为“pic.png”图片文件。这个程序可以用于简单的图像捕获任务,例如在特定触发条件下自动保存摄像头的实时画面。
  1. #  -*- coding: UTF-8 -*-
  2. # MindPlus
  3. # Python
  4. import cv2
  5. from pinpong.board import Board,Pin
  6. from pinpong.extension.unihiker import *
  7. Board().begin()
  8. p_p21_in=Pin(Pin.P21, Pin.IN)
  9. vd = cv2.VideoCapture()
  10. vd.open(0)
  11. while not (vd.isOpened()):
  12.     pass
  13. cv2.namedWindow("Mind+'s Windows", cv2.WINDOW_NORMAL)
  14. BiaoShi = 0
  15. while True:
  16.     if vd.grab():
  17.         ret, img = vd.read()
  18.         cv2.imshow("Mind+'s Windows", img)
  19.         if cv2.waitKey(20) & 0xff== 27:
  20.             break
  21.         if (p_p21_in.read_digital()==True):
  22.             BiaoShi = 0
  23.             cv2.imwrite("pic.png", img)
复制代码

3.音频播放
        这段Python脚本通过pyaudiowave库实现了WAV音频文件的播放功能。脚本定义了一个playwav函数,该函数接受一个WAV文件名作为参数。它首先使用wave库打开指定的WAV文件,然后创建一个pyaudio对象来处理音频流。接着,脚本根据WAV文件的属性(如采样宽度、通道数和采样率)配置音频流,并开始从文件中读取音频数据。数据以1024字节为单位读取,并通过音频流实时播放。当文件中的所有数据都被读取和播放后,脚本会停止音频流,关闭流,并终止pyaudio对象,从而完成整个播放过程。这个脚本为简单的音频播放提供了一个基础的实现框架。
  1. import pyaudio
  2. import wave
  3. # 打开WAV文件
  4. def playwav(wavname):
  5.   wf = wave.open(wavname, 'rb')
  6. # 创建PyAudio对象
  7.   p = pyaudio.PyAudio()
  8. # 打开流
  9.   stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
  10.                 channels=wf.getnchannels(),
  11.                 rate=wf.getframerate(),
  12.                 output=True)
  13. # 读取数据
  14.   data = wf.readframes(1024)
  15. # 播放
  16.   while len(data) > 0:
  17.     stream.write(data)
  18.     data = wf.readframes(1024)
  19. # 停止流
  20.   stream.stop_stream()
  21.   stream.close()
  22. # 关闭PyAudio
  23.   p.terminate()
复制代码
4.完整程序
  1. import _thread as thread
  2. import base64
  3. import datetime
  4. import hashlib
  5. import hmac
  6. import json
  7. from urllib.parse import urlparse
  8. import ssl
  9. from datetime import datetime
  10. from time import mktime
  11. from urllib.parse import urlencode
  12. from wsgiref.handlers import format_date_time
  13. import websocket  # 使用websocket_client
  14. import sys
  15. sys.path.append("/root/mindplus/.lib/thirdExtension/nick-base64-thirdex")
  16. from df_xfyun_speech import XfTts
  17. import play
  18. import cv2
  19. from pinpong.board import Board,Pin
  20. from pinpong.extension.unihiker import *
  21. Board().begin()
  22. p_p21_in=Pin(Pin.P21, Pin.IN)
  23. appId = "****************"
  24. apiKey ="****************"
  25. apiSecret = "****************"
  26. options = {}
  27. business_args = {"aue":"raw","vcn":"x4_lingbosong","tte":"utf8","speed":50,"volume":50,"pitch":50,"bgs":0}
  28. options["business_args"] = business_args
  29. tts = XfTts(appId, apiKey, apiSecret, options)
  30. appid = "****************"    #填写控制台中获取的 APPID 信息
  31. api_secret = "****************"   #填写控制台中获取的 APISecret 信息
  32. api_key ="****************"    #填写控制台中获取的 APIKey 信息
  33. imageunderstanding_url = "wss://spark-api.cn-huabei-1.xf-yun.com/v2.1/image"#云端环境的服务地址
  34. text =[{"role": "user", "content": "", "content_type":"image"}]
  35. bs=0
  36. class Ws_Param(object):
  37.     # 初始化
  38.     def __init__(self, APPID, APIKey, APISecret, imageunderstanding_url):
  39.         self.APPID = APPID
  40.         self.APIKey = APIKey
  41.         self.APISecret = APISecret
  42.         self.host = urlparse(imageunderstanding_url).netloc
  43.         self.path = urlparse(imageunderstanding_url).path
  44.         self.ImageUnderstanding_url = imageunderstanding_url
  45.     # 生成url
  46.     def create_url(self):
  47.         # 生成RFC1123格式的时间戳
  48.         now = datetime.now()
  49.         date = format_date_time(mktime(now.timetuple()))
  50.         # 拼接字符串
  51.         signature_origin = "host: " + self.host + "\n"
  52.         signature_origin += "date: " + date + "\n"
  53.         signature_origin += "GET " + self.path + " HTTP/1.1"
  54.         # 进行hmac-sha256进行加密
  55.         signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
  56.                                  digestmod=hashlib.sha256).digest()
  57.         signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
  58.         authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
  59.         authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  60.         # 将请求的鉴权参数组合为字典
  61.         v = {
  62.             "authorization": authorization,
  63.             "date": date,
  64.             "host": self.host
  65.         }
  66.         # 拼接鉴权参数,生成url
  67.         url = self.ImageUnderstanding_url + '?' + urlencode(v)
  68.         #print(url)
  69.         # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
  70.         return url
  71. # 收到websocket错误的处理
  72. def on_error(ws, error):
  73.     print("### error:", error)
  74. # 收到websocket关闭的处理
  75. def on_close(ws,one,two):
  76.     print(" ")
  77. # 收到websocket连接建立的处理
  78. def on_open(ws):
  79.     thread.start_new_thread(run, (ws,))
  80. def run(ws, *args):
  81.     data = json.dumps(gen_params(appid=ws.appid, question= ws.question ))
  82.     ws.send(data)
  83. # 收到websocket消息的处理
  84. def on_message(ws, message):
  85.     #print(message)
  86.     data = json.loads(message)
  87.     code = data['header']['code']
  88.     if code != 0:
  89.         print(f'请求错误: {code}, {data}')
  90.         ws.close()
  91.     else:
  92.         choices = data["payload"]["choices"]
  93.         status = choices["status"]
  94.         content = choices["text"][0]["content"]
  95.         #print(content,end ="")
  96.         global answer
  97.         answer += content
  98.         # print(1)
  99.         if status == 2:
  100.             bs=1
  101.             ws.close()
  102. def gen_params(appid, question):
  103.     """
  104.     通过appid和用户的提问来生成请参数
  105.     """
  106.     data = {
  107.         "header": {
  108.             "app_id": appid
  109.         },
  110.         "parameter": {
  111.             "chat": {
  112.                 "domain": "image",
  113.                 "temperature": 0.5,
  114.                 "top_k": 4,
  115.                 "max_tokens": 2028,
  116.                 "auditing": "default"
  117.             }
  118.         },
  119.         "payload": {
  120.             "message": {
  121.                 "text": question
  122.             }
  123.         }
  124. }
  125.     return data
  126. def main(appid, api_key, api_secret, imageunderstanding_url,question):
  127.     wsParam = Ws_Param(appid, api_key, api_secret, imageunderstanding_url)
  128.     websocket.enableTrace(False)
  129.     wsUrl = wsParam.create_url()
  130.     ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
  131.     ws.appid = appid
  132.     #ws.imagedata = imagedata
  133.     ws.question = question
  134.     ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
  135. def getText(role, content):
  136.     jsoncon = {}
  137.     jsoncon["role"] = role
  138.     jsoncon["content"] = content
  139.     text.append(jsoncon)
  140.     return text
  141. def getlength(text):
  142.     length = 0
  143.     for content in text:
  144.         temp = content["content"]
  145.         leng = len(temp)
  146.         length += leng
  147.     return length
  148. def checklen(text):
  149.     #print("text-content-tokens:", getlength(text[1:]))
  150.     while (getlength(text[1:])> 8000):
  151.         del text[1]
  152.     return text
  153. if __name__ == '__main__':
  154.     vd = cv2.VideoCapture()
  155.     vd.open(0)
  156.     while not (vd.isOpened()):
  157.         pass
  158.     cv2.namedWindow("Mind+'s Windows", cv2.WINDOW_NORMAL)
  159.     cv2.setWindowProperty("Mind+'s Windows", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
  160.     img = cv2.imread("back.png", cv2.IMREAD_UNCHANGED)
  161.     cv2.waitKey(1)            
  162.     Input = "你是一位传统手相学大师,下面请分析这张手相图片。手相解读形成一段话。开头要这样说:从你的手相上可看出你的……"
  163.     #tts.synthesis("初始化完成", "chushihua.wav")
  164.     #tts.synthesis("请将手平稳放好,正在拍照", "paizhao.wav")
  165.     #tts.synthesis("拍照完成,正在分析中", "fenxi.wav")
  166.     play.playwav("chushihua.wav")
  167.     #text.clear
  168.     while True:
  169.      if vd.grab():
  170.         ret, img = vd.read()
  171.         cp_img = img.copy()
  172.         cp_img = cv2.rotate(cp_img,cv2.ROTATE_180)
  173.         cp_img = cv2.resize(cp_img,( 240, 320))
  174.         cv2.imshow("Mind+'s Windows", cp_img)
  175.         if cv2.waitKey(1) & 0xff== 27:
  176.             break
  177.         if (p_p21_in.read_digital()==True):
  178.             
  179.             play.playwav("paizhao.wav")
  180.             ret, img = vd.read()
  181.             cv2.imwrite("hand.jpg", img)
  182.             imagedata = open("hand.jpg",'rb').read()
  183.             play.playwav("fenxi.wav")
  184.             bs=0
  185.             text =[{"role": "user", "content": str(base64.b64encode(imagedata), 'utf-8'), "content_type":"image"}]
  186.             question = checklen(getText("user",Input))
  187.             answer = ""
  188.             print("答:",end = "")
  189.             main(appid, api_key, api_secret, imageunderstanding_url, question)
  190.             getText("assistant", answer)
  191.             while bs:
  192.                 pass
  193.             print(answer)
  194.             # print(str(text))
  195.             tts.synthesis(answer, "speech.wav")
  196.             play.playwav("speech.wav")
复制代码
行空板——“AI手相师”图5

【演示视频】





rzegkly  版主

发表于 2024-11-22 08:23:28

很好的创意,漂亮
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

为本项目制作心愿单
购买心愿单
心愿单 编辑
[[wsData.name]]

硬件清单

  • [[d.name]]
btnicon
我也要做!
点击进入购买页面
上海智位机器人股份有限公司 沪ICP备09038501号-4

© 2013-2024 Comsenz Inc. Powered by Discuz! X3.4 Licensed

mail