【项目设计】
回传行空板摄像头实时图像,并能控制行空板小车。
【行空板与电脑端测试】
行空板上server.py程序:
-
- import socket
- import cv2
-
- import sys
- import numpy as np
- address =('192.168.31.8', 5006) # 服务端地址和端口
- ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- ser.bind(address)
- ser.listen(5)
- # 阻塞式
- print('waiting。。。')
- conn, addr = ser.accept()
- print('建立连接...')
- print('连接对象:', addr)
-
- cap = cv2.VideoCapture(0)
- frames_num=cap.get(7)
- print('视频总帧数:',frames_num)
- print('发送目标...')
- count = 0
- while cap.isOpened():
- try:
- data = conn.recv(1024)
- data = data.decode()
- if not data:
- break
- ret, frame = cap.read()
- if not ret:
- continue
- frame = cv2.resize(frame,(320,240))
- cv2.imshow('send', frame)
- cv2.waitKey(1)
- count += 1
- # 数据打包有很多方式,也可以用json打包
- img_encode = cv2.imencode('.jpg', frame)[1]
-
- data_encode = np.array(img_encode)
- str_encode = data_encode.tostring()
-
- conn.sendall(str_encode)
- except KeyboardInterrupt:
- print('KeyboardInterrupt')
- sys.exit(0)
复制代码
电脑端程序computer.py-
-
- import socket
- import sys
- import cv2
- import numpy as np
- import time
- address = ('192.168.31.8', 5006) # 服务端地址和端口
- cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- cli.connect(address) # 尝试连接服务端
- except Exception:
- print('[!] Server not found or not open')
- sys.exit()
-
- frame_count = 1
- while True:
- time1 = time.time() if frame_count == 1 else time1
- trigger = 'ok'
- cli.sendall(trigger.encode())
- data = cli.recv(1024*1024*20)
- image = np.frombuffer(data, np.uint8)
- image = cv2.imdecode(image,cv2.IMREAD_COLOR)
- cv2.imshow('video',image)
- cv2.waitKey(1)
- end_time = time.time()
- time2 = time.time()
- print(image.shape[:2], int(frame_count / (time2 - time1)))
- frame_count += 1
- cli.close()
复制代码
行空板屏幕显示实现全屏:
-
- import socket
- import cv2
-
- import sys
- import numpy as np
- address =('192.168.31.8', 5006) # 服务端地址和端口
- ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- ser.bind(address)
- ser.listen(5)
- # 阻塞式
- print('waiting。。。')
- conn, addr = ser.accept()
- print('建立连接...')
- print('连接对象:', addr)
- #cap = cv2.VideoCapture(r"D:\project\dataset\video\测试.mp4")
- cap = cv2.VideoCapture(0)
- frames_num=cap.get(7)
- print('视频总帧数:',frames_num)
- print('发送目标...')
- count = 0
- cv2.namedWindow("send",cv2.WINDOW_KEEPRATIO)
- cv2.setWindowProperty("send", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
- while cap.isOpened():
- try:
- data = conn.recv(1024)
- data = data.decode()
- if not data:
- break
- ret, frame = cap.read()
- if not ret:
- continue
- frame = cv2.resize(frame,(480,360))
- frame=frame[20:340,120:360]
- cv2.imshow('send', frame)
- cv2.waitKey(1)
- count += 1
- # 数据打包有很多方式,也可以用json打包
- img_encode = cv2.imencode('.jpg', frame)[1]
-
- data_encode = np.array(img_encode)
- str_encode = data_encode.tostring()
-
- conn.sendall(str_encode)
- except KeyboardInterrupt:
- print('KeyboardInterrupt')
- sys.exit(0)
复制代码
【电脑控制行空车】
电脑端连接UNO+按钮键盘。
-
- import socket
- import sys
- import cv2
- import numpy as np
- import time
- from pinpong.board import Board,Pin
- Board("UNO") .begin()
- p_p8_in=Pin(Pin.D5, Pin.IN)
- p_p9_in=Pin(Pin.D6, Pin.IN)
- p_p12_in=Pin(Pin.D7, Pin.IN)
- p_p13_in=Pin(Pin.D8, Pin.IN)
- p_p14_in=Pin(Pin.D9, Pin.IN)
- bs1 = 0
- bs2 = 0
- bs3 = 0
- bs4 = 0
- bs5 = 0
- bs6 = 0
- bs7 = 0
- address = ('192.168.31.8', 5006) # 服务端地址和端口
- cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- cli.connect(address) # 尝试连接服务端
- except Exception:
- print('[!] Server not found or not open')
- sys.exit()
-
-
- cv2.namedWindow('video',cv2.WINDOW_KEEPRATIO)
- cv2.resizeWindow('video', 240, 320);
- #cv2.setWindowProperty('video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
- currtime=time.time()
- while True:
- bs6=0
-
- if(p_p8_in.read_digital()==True):
- if(bs1 == 0):
- cli.send(b'L0')
- bs1 = 1
- bs6=1
- else:
- if(bs1 == 1):
- cli.send(b'L1')
- bs1 = 0
- bs6=1
- if(p_p9_in.read_digital()==True):
- if(bs2 == 0):
- cli.send(b'B0')
- bs2 = 1
- bs6=1
- else:
- if(bs2 == 1):
- cli.send(b'B1')
- bs2 = 0
- bs6=1
- if(p_p12_in.read_digital()==True):
- if(bs3 == 0):
- cli.send(b'R0')
- bs3 = 1
- bs6=1
- else:
- if(bs3 == 1):
- cli.send(b'R1')
- bs3 = 0
- bs6=1
- if(p_p13_in.read_digital()==True):
- if(bs4 == 0):
- cli.send(b'F0')
- bs4 = 1
- bs6=1
- else:
- if(bs4 == 1):
- cli.send(b'F1')
- bs4 = 0
- bs6=1
- if(p_p14_in.read_digital()==True):
- if(bs5 == 0):
- cli.send(b'S0')
- bs5 = 1
- bs6=1
- else:
- if(bs5 == 1):
- cli.send(b'S1')
- bs5 = 0
- bs6=1
-
- if bs6==0:
- cli.send(b'OK')
- if 1:
- data = cli.recv(1024*40)
- image = np.frombuffer(data, np.uint8)
- image = cv2.imdecode(image,cv2.IMREAD_COLOR)
- if image is None:
- continue
- cv2.imshow('video',image)
- cv2.waitKey(1)
-
-
- cli.close()
复制代码
行空板程序:
-
- import socket
- import cv2
- import time
- import sys
- import numpy as np
-
- from pinpong.board import Board
- from dfrobot_motor import MOTOR
-
- Board().begin()
- M = MOTOR()
-
- address =('192.168.31.8', 5006) # 服务端地址和端口
- ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- ser.bind(address)
- ser.listen(5)
- # 阻塞式
- print('waiting。。。')
- conn, addr = ser.accept()
- print('建立连接...')
- print('连接对象:', addr)
- #cap = cv2.VideoCapture(r"D:\project\dataset\video\测试.mp4")
- cap = cv2.VideoCapture(0)
- frames_num=cap.get(7)
- print('视频总帧数:',frames_num)
- print('发送目标...')
-
- cv2.namedWindow("send",cv2.WINDOW_KEEPRATIO)
- cv2.setWindowProperty("send", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
- currtime=time.time()
- while cap.isOpened():
-
- data = conn.recv(2)
- data = data.decode()
- if data:
-
- if data=="R0":
- M.motor_run(M.M1,M.CW,200)
- M.motor_run(M.M2,M.CW,0)
- elif data=="L0":
- M.motor_run(M.M1,M.CW,0)
- M.motor_run(M.M2,M.CW,200)
- elif data=="B0":
- M.motor_run(M.ALL,M.CCW,200)
- elif data=="F0":
- M.motor_run(M.ALL,M.CW,200)
- elif data=="S0":
- M.motor_stop(M.ALL)
- elif data=="R1" or data=="L1" or data=="B1" or data=="F1" or data=="S1":
- M.motor_stop(M.ALL)
- #if time.time()-currtime>0.039:
- #currtime=time.time()
- if 1:
- ret, frame = cap.read()
- if not ret:
- continue
- frame = cv2.resize(frame,(480,360))
- frame=frame[20:340,120:360]
- cv2.imshow('send', frame)
- cv2.waitKey(1)
-
- # 数据打包有很多方式,也可以用json打包
- img_encode = cv2.imencode('.jpg', frame)[1]
-
- data_encode = np.array(img_encode)
- str_encode = data_encode.tostring()
-
- conn.sendall(str_encode)
-
- conn.close()
复制代码
【演示视频】
使用另一个行空板作为控制端代替电脑,正在努力中,处理有延迟,代码需优化。
|