【项目背景】
前两天收到了”鳄霸王“,第一感觉它是一个带屏幕的遥控器,只是这个屏幕有点太小了,128*64,黑白屏。
既然像遥控器,那就把它做成遥控器,屏幕虽小,但也可以显示视频车的图传实时环境图像。
【演示视频】
先上演示视频
【准备工作】
1、获取摇杆信息
2、”鳄霸王“获取图传图像
行空板python程序
-
- def main():
- import cv2
- import numpy as np
- import socket
- import time
- import binascii
- import socket
- import time
- cap = cv2.VideoCapture(0)
- cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
- cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
- cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- cv2.namedWindow('car',cv2.WND_PROP_FULLSCREEN) #Set the windows to be full screen.
- cv2.setWindowProperty('car', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) #Set the windows to be full screen.
- W=128
- H=64
- size = (W,H)
- val=0
- str1=''
- while(1): # get a frame and show
- ret, frame1 = cap.read()
- if(ret):
- frame2=frame1.copy()
- frame1 = cv2.resize(frame1, size)
-
- gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
-
- BIN1= cv2.adaptiveThreshold(gray1,1,cv2.ADAPTIVE_THRESH_MEAN_C,cv2.THRESH_BINARY,7,3)
- frame2 = cv2.rotate(frame2, cv2.ROTATE_90_COUNTERCLOCKWISE)
- cv2.imshow('car', frame2)
-
- str1=""
- for h in range(H):
- for w in range(0,W,16):
- for i in range(4):
- val=(int(BIN1[h][w+i*4+0])*8 + int(BIN1[h][w+i*4+1])*4 + int(BIN1[h][w+i*4+2])*2 + int(BIN1[h][w+i*4+3])*1)
- if val>=10:
- if val == 10:
- str1=str1+'A'
- if val==11:
- str1=str1+'B'
- if val==12:
- str1=str1+'C'
- if val==13:
- str1=str1+'D'
- if val==14:
- str1=str1+'E'
- if val==15:
- str1=str1+'F'
- else:
- str1=str1+str(val)
- str1=str1+" "
- #print(str1)
- s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('192.168.31.216',10000))
- s.send(bytearray.fromhex(str1))
- time.sleep(0.19)
- if cv2.waitKey(1) & 0xFF == ord('q'):
- break
- cv2.destroyAllWindows()
- if __name__=='__main__':
- main()
复制代码
掌控板程序 -
-
- from mpython import *
- import usocket
- import network
-
- my_wifi = wifi()
- my_wifi.connectWiFi("yuntian","yuntian123456")
- while not my_wifi.sta.isconnected():
- pass
-
-
- oled.fill(0)
- oled.DispChar(my_wifi.sta.ifconfig()[0], 0, 0, 1)
- oled.show()
-
-
- #socket.socket() 创建了一个 socket 对象,并且支持 context manager type,你可以使用 with 语句,这样你就不用再手动调用 s.close() 来关闭 socket 了
- s=usocket.socket(usocket.AF_INET,usocket.SOCK_STREAM)
- #bind() 方法的入参取决于 socket 的地址族,在这个例子中我们使用了 socket.AF_INET (IPv4),它接收一个二元组(host, port)入参
- #端口号应该是 1-65535 之间的整数(0是保留的),这个整数就是用来接受客户端链接的 TCP 端口号,如果端口号小于 1024,有的操作系统会要求管理员权限
- s.bind((my_wifi.sta.ifconfig()[0],10000))
- #listen() 方法调用使服务器可以接受连接请求,这使它成为一个「监听中」的 socket
- #listen() 方法有一个 backlog 参数。它指定在拒绝新的连接之前系统将允许使用的 未接受的连接 数量。从 Python 3.5 开始,这是可选参数。如果不指定,Python 将取一个默认值
- s.listen(5)
-
- while True:
- #accept() 方法阻塞并等待传入连接。当一个客户端连接时,它将返回一个新的 socket 对象,对象中有表示当前连接的 conn 和一个由主机、端口号组成的 IPv4/v6 连接的元组
- con=s.accept()[0]
- data=con.recv(1024)
- #framebuf.FrameBuffer(buffer, width, height, format) 可以构建帧缓存对象, buffer 为缓存区数据,width 为图片宽度,height 为图片高度,format 为FrameBuffer的格式,即对应图片取模时数据输出的扫描模式:framebuf.MONO_HLSB 为水平方向;framebuf.MONO_VLSB 为垂直方向。
- fbuf=framebuf.FrameBuffer(bytearray(data),128,64,framebuf.MONO_HLSB)
- oled.fill(0)
- #oled.blit(fbuf, x, y) 使用OLED显示图片帧,fbuf 为FrameBuffer对象,x 、y 为起始点的坐标x、y。
- oled.blit(fbuf,0,0)
- oled.show()
- con.close()
-
复制代码
利用电脑先进行测试
行空板视频车
3、控制行空板视频车运动
开启行空板siot
行空板视频车Python程序
-
- from pinpong.board import Board
- from microbit_motor import Microbit_Motor #导入Microbit_Motor库
- import siot
- from pinpong.extension.unihiker import *
-
- from unihiker import GUI
- u_gui=GUI()
- xianshi=u_gui.draw_text(text="开始",x=50,y=100,font_size=50, color="#0000FF")
- # 事件回调函数
- def on_message_callback(client, userdata, msg):
-
- if (msg.payload.decode("utf-8") == 'G'):
- forward(200)
- xianshi.config(text="前进")
- if (msg.payload.decode("utf-8")== 'S'):
- stop()
- xianshi.config(text="停止")
-
- if (msg.payload.decode("utf-8") == 'B'):
- back(200)
- xianshi.config(text="后退")
-
- if (msg.payload.decode("utf-8") == 'L'):
- left(200)
- xianshi.config(text="向左")
-
- if (msg.payload.decode("utf-8") == 'R'):
- right(200)
- xianshi.config(text="向右")
-
- siot.init(client_id="siot_192",server="192.168.31.8",port=1883,user="siot",password="dfrobot")
- siot.connect()
- siot.loop()
- siot.set_callback(on_message_callback)
- siot.getsubscribe(topic="car/control")
-
- Board("microbit").begin()
- motorbit = Microbit_Motor()
- def forward(speed):
- #前进
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
- def back(speed):
- #后退
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
- def left_turn(speed):
- #向左转
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
- def right_turn(speed):
- #向右转
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
- def left(speed):
- #向左
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
- def right(speed):
- #向右
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
- def stop():
- motorbit.motor_stop(motorbit.M1)
- motorbit.motor_stop(motorbit.M2)
- motorbit.motor_stop(motorbit.M3)
- motorbit.motor_stop(motorbit.M4)
复制代码
”鳄霸王“掌控板控制程序
同micropython程序
-
- # MindPlus
- # mpython
- from umqtt.simple import MQTTClient
- from mpython import *
- import network
-
-
- my_wifi = wifi()
- p0=MPythonPin(0,PinMode.ANALOG)
- p1=MPythonPin(1,PinMode.ANALOG)
- my_wifi.connectWiFi("yuntian","yuntian123456")
- while not (my_wifi.sta.isconnected()):
- pass
- mqtt = MQTTClient("", "192.168.1.101", 1883, "siot", "dfrobot")
- try:
- mqtt.connect()
- print('MQTT Connected Successful')
- except:
- print('MQTT Connection Failed')
- oled.DispChar(my_wifi.sta.ifconfig()[0], 42, 22, 1)
- oled.show()
- BiaoShi = 0
- while True:
- if (p0.read_analog() < 10):
- if (not (BiaoShi == 1)):
- BiaoShi = 1
- mqtt.publish(str("car/control"), str("G").encode('utf-8'))
- elif (p0.read_analog() > 4000):
- if (not (BiaoShi == 2)):
- BiaoShi = 2
- mqtt.publish(str("car/control"), str("B").encode('utf-8'))
- elif (p1.read_analog() < 10):
- if (not (BiaoShi == 3)):
- BiaoShi = 3
- mqtt.publish(str("car/control"), str("R").encode('utf-8'))
- elif (p1.read_analog() > 4000):
- if (not (BiaoShi == 4)):
- BiaoShi = 4
- mqtt.publish(str("car/control"), str("L").encode('utf-8'))
- else:
- if (not (BiaoShi == 5)):
- mqtt.publish(str("car/control"), str("S").encode('utf-8'))
- BiaoShi = 5
-
复制代码
【完整程序】
行空板视频车完整程序
-
-
- import cv2
- import numpy as np
- import socket
- import time
- import binascii
- import socket
- import time
- from pinpong.board import Board
- from dfrobot_motor import MOTOR
- import siot
- from pinpong.extension.unihiker import *
-
- from unihiker import GUI
- u_gui=GUI()
- xianshi=u_gui.draw_text(text="开始",x=50,y=100,font_size=50, color="#0000FF")
-
- def forward(speed):
-
- #前进
- #电机有M1,M2, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- M.motor_run(M.ALL,M.CW,speed)
-
- def back(speed):
- #后退
- #电机有M1,M2, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- M.motor_run(M.ALL,M.CCW,speed)
- def left_turn(speed):
- #向左转
- #电机有M1,M2, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- M.motor_run(M.M1,M.CW,speed)
- M.motor_run(M.M2,M.CCW,speed)
- def right_turn(speed):
- #向右转
- #电机有M1,M2, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- M.motor_run(M.M1,M.CCW,speed)
- M.motor_run(M.M2,M.CW,speed)
-
- def stop():
- M.motor_stop(M.ALL)
- # 事件回调函数
- def on_message_callback(client, userdata, msg):
-
- if (msg.payload.decode("utf-8") == 'G'):
- forward(200)
- xianshi.config(text="前进")
-
- if (msg.payload.decode("utf-8")== 'S'):
- stop()
- xianshi.config(text="停止")
-
- if (msg.payload.decode("utf-8") == 'B'):
- back(200)
- xianshi.config(text="后退")
-
- if (msg.payload.decode("utf-8") == 'L'):
- left_turn(200)
- xianshi.config(text="向左")
-
- if (msg.payload.decode("utf-8") == 'R'):
- right_turn(200)
- xianshi.config(text="向右")
-
- siot.init(client_id="siot_192",server="192.168.1.101",port=1883,user="siot",password="dfrobot")
- siot.connect()
- siot.loop()
- siot.set_callback(on_message_callback)
- siot.getsubscribe(topic="car/control")
- Board().begin()
- M = MOTOR()
-
-
- cap = cv2.VideoCapture(0)
- cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
- cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
- cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
-
- W=128
- H=64
- size = (W,H)
- val=0
- str1=''
- while(1): # get a frame and show
- ret, frame1 = cap.read()
- if(ret):
-
- frame1 = cv2.resize(frame1, size)
-
- gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
-
- BIN1= cv2.adaptiveThreshold(gray1,1,cv2.ADAPTIVE_THRESH_MEAN_C,cv2.THRESH_BINARY,7,3)
-
-
-
- str1=""
- for h in range(H):
- for w in range(0,W,16):
- for i in range(4):
- val=(int(BIN1[h][w+i*4+0])*8 + int(BIN1[h][w+i*4+1])*4 + int(BIN1[h][w+i*4+2])*2 + int(BIN1[h][w+i*4+3])*1)
- if val>=10:
- if val == 10:
- str1=str1+'A'
- if val==11:
- str1=str1+'B'
- if val==12:
- str1=str1+'C'
- if val==13:
- str1=str1+'D'
- if val==14:
- str1=str1+'E'
- if val==15:
- str1=str1+'F'
- else:
- str1=str1+str(val)
- str1=str1+" "
- #print(str1)
- s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('192.168.1.100',10000))
- s.send(bytearray.fromhex(str1))
- time.sleep(0.19)
-
- cv2.destroyAllWindows()
-
复制代码
”鳄霸王“掌控板完整程序
-
- from mpython import *
- import usocket
- import network
- from umqtt.simple import MQTTClient
- my_wifi = wifi()
- my_wifi.connectWiFi("yuntian","yuntian123456")
- while not my_wifi.sta.isconnected():
- pass
-
- p0=MPythonPin(0,PinMode.ANALOG)
- p1=MPythonPin(1,PinMode.ANALOG)
- mqtt = MQTTClient("", "192.168.1.101", 1883, "siot", "dfrobot")
- try:
- mqtt.connect()
- print('MQTT Connected Successful')
- except:
- print('MQTT Connection Failed')
- oled.fill(0)
- oled.DispChar(my_wifi.sta.ifconfig()[0], 0, 0, 1)
- oled.show()
- BiaoShi = 0
-
- #socket.socket() 创建了一个 socket 对象,并且支持 context manager type,你可以使用 with 语句,这样你就不用再手动调用 s.close() 来关闭 socket 了
- s=usocket.socket(usocket.AF_INET,usocket.SOCK_STREAM)
- #bind() 方法的入参取决于 socket 的地址族,在这个例子中我们使用了 socket.AF_INET (IPv4),它接收一个二元组(host, port)入参
- #端口号应该是 1-65535 之间的整数(0是保留的),这个整数就是用来接受客户端链接的 TCP 端口号,如果端口号小于 1024,有的操作系统会要求管理员权限
- s.bind((my_wifi.sta.ifconfig()[0],10000))
- #listen() 方法调用使服务器可以接受连接请求,这使它成为一个「监听中」的 socket
- #listen() 方法有一个 backlog 参数。它指定在拒绝新的连接之前系统将允许使用的 未接受的连接 数量。从 Python 3.5 开始,这是可选参数。如果不指定,Python 将取一个默认值
- s.listen(5)
-
- while True:
- #accept() 方法阻塞并等待传入连接。当一个客户端连接时,它将返回一个新的 socket 对象,对象中有表示当前连接的 conn 和一个由主机、端口号组成的 IPv4/v6 连接的元组
- con=s.accept()[0]
- data=con.recv(1024)
- #framebuf.FrameBuffer(buffer, width, height, format) 可以构建帧缓存对象, buffer 为缓存区数据,width 为图片宽度,height 为图片高度,format 为FrameBuffer的格式,即对应图片取模时数据输出的扫描模式:framebuf.MONO_HLSB 为水平方向;framebuf.MONO_VLSB 为垂直方向。
- fbuf=framebuf.FrameBuffer(bytearray(data),128,64,framebuf.MONO_HLSB)
- oled.fill(0)
- #oled.blit(fbuf, x, y) 使用OLED显示图片帧,fbuf 为FrameBuffer对象,x 、y 为起始点的坐标x、y。
- oled.blit(fbuf,0,0)
- oled.show()
- con.close()
- if (p0.read_analog() < 10):
- if (not (BiaoShi == 1)):
- BiaoShi = 1
- mqtt.publish(str("car/control"), str("G").encode('utf-8'))
- elif (p0.read_analog() > 4000):
- if (not (BiaoShi == 2)):
- BiaoShi = 2
- mqtt.publish(str("car/control"), str("B").encode('utf-8'))
- elif (p1.read_analog() < 10):
- if (not (BiaoShi == 3)):
- BiaoShi = 3
- mqtt.publish(str("car/control"), str("R").encode('utf-8'))
- elif (p1.read_analog() > 4000):
- if (not (BiaoShi == 4)):
- BiaoShi = 4
- mqtt.publish(str("car/control"), str("L").encode('utf-8'))
- else:
- if (not (BiaoShi == 5)):
- mqtt.publish(str("car/control"), str("S").encode('utf-8'))
- BiaoShi = 5
-
复制代码
附件: dfrobot_motor.py需拷贝到行空板,与程序在同一目录
dfrobot_motor.zip
|