行空板——视频车
【项目设计】回传行空板摄像头实时图像,并能控制行空板小车。
【行空板与电脑端测试】
https://www.bilibili.com/video/BV1d14y1a7Lj?share_source=copy_web
行空板上server.py程序:
import socket
import cv2
import sys
import numpy as np
address =('192.168.31.8', 5006)# 服务端地址和端口
ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ser.bind(address)
ser.listen(5)
# 阻塞式
print('waiting。。。')
conn, addr = ser.accept()
print('建立连接...')
print('连接对象:', addr)
cap = cv2.VideoCapture(0)
frames_num=cap.get(7)
print('视频总帧数:',frames_num)
print('发送目标...')
count = 0
while cap.isOpened():
try:
data = conn.recv(1024)
data = data.decode()
if not data:
break
ret, frame = cap.read()
if not ret:
continue
frame = cv2.resize(frame,(320,240))
cv2.imshow('send', frame)
cv2.waitKey(1)
count += 1
# 数据打包有很多方式,也可以用json打包
img_encode = cv2.imencode('.jpg', frame)
data_encode = np.array(img_encode)
str_encode = data_encode.tostring()
conn.sendall(str_encode)
except KeyboardInterrupt:
print('KeyboardInterrupt')
sys.exit(0)
电脑端程序computer.py
import socket
import sys
import cv2
import numpy as np
import time
address = ('192.168.31.8', 5006)# 服务端地址和端口
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
cli.connect(address)# 尝试连接服务端
except Exception:
print('[!] Server not found or not open')
sys.exit()
frame_count = 1
while True:
time1 = time.time() if frame_count == 1 else time1
trigger = 'ok'
cli.sendall(trigger.encode())
data = cli.recv(1024*1024*20)
image = np.frombuffer(data, np.uint8)
image = cv2.imdecode(image,cv2.IMREAD_COLOR)
cv2.imshow('video',image)
cv2.waitKey(1)
end_time = time.time()
time2 = time.time()
print(image.shape[:2], int(frame_count / (time2 - time1)))
frame_count += 1
cli.close()
行空板屏幕显示实现全屏:
import socket
import cv2
import sys
import numpy as np
address =('192.168.31.8', 5006)# 服务端地址和端口
ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ser.bind(address)
ser.listen(5)
# 阻塞式
print('waiting。。。')
conn, addr = ser.accept()
print('建立连接...')
print('连接对象:', addr)
#cap = cv2.VideoCapture(r"D:\project\dataset\video\测试.mp4")
cap = cv2.VideoCapture(0)
frames_num=cap.get(7)
print('视频总帧数:',frames_num)
print('发送目标...')
count = 0
cv2.namedWindow("send",cv2.WINDOW_KEEPRATIO)
cv2.setWindowProperty("send", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
while cap.isOpened():
try:
data = conn.recv(1024)
data = data.decode()
if not data:
break
ret, frame = cap.read()
if not ret:
continue
frame = cv2.resize(frame,(480,360))
frame=frame
cv2.imshow('send', frame)
cv2.waitKey(1)
count += 1
# 数据打包有很多方式,也可以用json打包
img_encode = cv2.imencode('.jpg', frame)
data_encode = np.array(img_encode)
str_encode = data_encode.tostring()
conn.sendall(str_encode)
except KeyboardInterrupt:
print('KeyboardInterrupt')
sys.exit(0)
【电脑控制行空车】
电脑端连接UNO+按钮键盘。
import socket
import sys
import cv2
import numpy as np
import time
from pinpong.board import Board,Pin
Board("UNO") .begin()
p_p8_in=Pin(Pin.D5, Pin.IN)
p_p9_in=Pin(Pin.D6, Pin.IN)
p_p12_in=Pin(Pin.D7, Pin.IN)
p_p13_in=Pin(Pin.D8, Pin.IN)
p_p14_in=Pin(Pin.D9, Pin.IN)
bs1 = 0
bs2 = 0
bs3 = 0
bs4 = 0
bs5 = 0
bs6 = 0
bs7 = 0
address = ('192.168.31.8', 5006)# 服务端地址和端口
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
cli.connect(address)# 尝试连接服务端
except Exception:
print('[!] Server not found or not open')
sys.exit()
cv2.namedWindow('video',cv2.WINDOW_KEEPRATIO)
cv2.resizeWindow('video', 240, 320);
#cv2.setWindowProperty('video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
currtime=time.time()
while True:
bs6=0
if(p_p8_in.read_digital()==True):
if(bs1 == 0):
cli.send(b'L0')
bs1 = 1
bs6=1
else:
if(bs1 == 1):
cli.send(b'L1')
bs1 = 0
bs6=1
if(p_p9_in.read_digital()==True):
if(bs2 == 0):
cli.send(b'B0')
bs2 = 1
bs6=1
else:
if(bs2 == 1):
cli.send(b'B1')
bs2 = 0
bs6=1
if(p_p12_in.read_digital()==True):
if(bs3 == 0):
cli.send(b'R0')
bs3 = 1
bs6=1
else:
if(bs3 == 1):
cli.send(b'R1')
bs3 = 0
bs6=1
if(p_p13_in.read_digital()==True):
if(bs4 == 0):
cli.send(b'F0')
bs4 = 1
bs6=1
else:
if(bs4 == 1):
cli.send(b'F1')
bs4 = 0
bs6=1
if(p_p14_in.read_digital()==True):
if(bs5 == 0):
cli.send(b'S0')
bs5 = 1
bs6=1
else:
if(bs5 == 1):
cli.send(b'S1')
bs5 = 0
bs6=1
if bs6==0:
cli.send(b'OK')
if 1:
data = cli.recv(1024*40)
image = np.frombuffer(data, np.uint8)
image = cv2.imdecode(image,cv2.IMREAD_COLOR)
if image is None:
continue
cv2.imshow('video',image)
cv2.waitKey(1)
cli.close()
行空板程序:
import socket
import cv2
import time
import sys
import numpy as np
from pinpong.board import Board
from dfrobot_motor import MOTOR
Board().begin()
M = MOTOR()
address =('192.168.31.8', 5006)# 服务端地址和端口
ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ser.bind(address)
ser.listen(5)
# 阻塞式
print('waiting。。。')
conn, addr = ser.accept()
print('建立连接...')
print('连接对象:', addr)
#cap = cv2.VideoCapture(r"D:\project\dataset\video\测试.mp4")
cap = cv2.VideoCapture(0)
frames_num=cap.get(7)
print('视频总帧数:',frames_num)
print('发送目标...')
cv2.namedWindow("send",cv2.WINDOW_KEEPRATIO)
cv2.setWindowProperty("send", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
currtime=time.time()
while cap.isOpened():
data = conn.recv(2)
data = data.decode()
if data:
if data=="R0":
M.motor_run(M.M1,M.CW,200)
M.motor_run(M.M2,M.CW,0)
elif data=="L0":
M.motor_run(M.M1,M.CW,0)
M.motor_run(M.M2,M.CW,200)
elif data=="B0":
M.motor_run(M.ALL,M.CCW,200)
elif data=="F0":
M.motor_run(M.ALL,M.CW,200)
elif data=="S0":
M.motor_stop(M.ALL)
elif data=="R1" or data=="L1" or data=="B1" or data=="F1" or data=="S1":
M.motor_stop(M.ALL)
#if time.time()-currtime>0.039:
#currtime=time.time()
if 1:
ret, frame = cap.read()
if not ret:
continue
frame = cv2.resize(frame,(480,360))
frame=frame
cv2.imshow('send', frame)
cv2.waitKey(1)
# 数据打包有很多方式,也可以用json打包
img_encode = cv2.imencode('.jpg', frame)
data_encode = np.array(img_encode)
str_encode = data_encode.tostring()
conn.sendall(str_encode)
conn.close()
【演示视频】
https://www.bilibili.com/video/BV19D4y1v7Lh?share_source=copy_web
使用另一个行空板作为控制端代替电脑,正在努力中,处理有延迟,代码需优化。
厉害厉害 为大神的作品点赞 厉害厉害 赞!!! 非常不错 老师厉害
页:
[1]