云天 发表于 2022-9-18 20:57:25

行空板——视频车

【项目设计】
回传行空板摄像头实时图像,并能控制行空板小车。

【行空板与电脑端测试】


https://www.bilibili.com/video/BV1d14y1a7Lj?share_source=copy_web

行空板上server.py程序:

import socket
import cv2

import sys
import numpy as np
address =('192.168.31.8', 5006)# 服务端地址和端口
ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ser.bind(address)
ser.listen(5)
# 阻塞式
print('waiting。。。')
conn, addr = ser.accept()
print('建立连接...')
print('连接对象:', addr)

cap = cv2.VideoCapture(0)
frames_num=cap.get(7)
print('视频总帧数:',frames_num)
print('发送目标...')
count = 0
while cap.isOpened():
    try:
      data = conn.recv(1024)
      data = data.decode()
      if not data:
            break
      ret, frame = cap.read()
      if not ret:
            continue
      frame = cv2.resize(frame,(320,240))
      cv2.imshow('send', frame)
      cv2.waitKey(1)
      count += 1
      # 数据打包有很多方式,也可以用json打包
      img_encode = cv2.imencode('.jpg', frame)

      data_encode = np.array(img_encode)
      str_encode = data_encode.tostring()

      conn.sendall(str_encode)
    except KeyboardInterrupt:
      print('KeyboardInterrupt')
      sys.exit(0)
电脑端程序computer.py

import socket
import sys
import cv2
import numpy as np
import time
address = ('192.168.31.8', 5006)# 服务端地址和端口
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
    cli.connect(address)# 尝试连接服务端
except Exception:
    print('[!] Server not found or not open')
    sys.exit()

frame_count = 1
while True:
    time1 = time.time() if frame_count == 1 else time1
    trigger = 'ok'
    cli.sendall(trigger.encode())
    data = cli.recv(1024*1024*20)
    image = np.frombuffer(data, np.uint8)
    image = cv2.imdecode(image,cv2.IMREAD_COLOR)
    cv2.imshow('video',image)
    cv2.waitKey(1)
    end_time = time.time()
    time2 = time.time()
    print(image.shape[:2], int(frame_count / (time2 - time1)))
    frame_count += 1
cli.close()

行空板屏幕显示实现全屏:


import socket
import cv2

import sys
import numpy as np
address =('192.168.31.8', 5006)# 服务端地址和端口
ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ser.bind(address)
ser.listen(5)
# 阻塞式
print('waiting。。。')
conn, addr = ser.accept()
print('建立连接...')
print('连接对象:', addr)
#cap = cv2.VideoCapture(r"D:\project\dataset\video\测试.mp4")
cap = cv2.VideoCapture(0)
frames_num=cap.get(7)
print('视频总帧数:',frames_num)
print('发送目标...')
count = 0
cv2.namedWindow("send",cv2.WINDOW_KEEPRATIO)
cv2.setWindowProperty("send", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
while cap.isOpened():
    try:
      data = conn.recv(1024)
      data = data.decode()
      if not data:
            break
      ret, frame = cap.read()
      if not ret:
            continue
      frame = cv2.resize(frame,(480,360))
      frame=frame
      cv2.imshow('send', frame)
      cv2.waitKey(1)
      count += 1
      # 数据打包有很多方式,也可以用json打包
      img_encode = cv2.imencode('.jpg', frame)

      data_encode = np.array(img_encode)
      str_encode = data_encode.tostring()

      conn.sendall(str_encode)
    except KeyboardInterrupt:
      print('KeyboardInterrupt')
      sys.exit(0)


【电脑控制行空车】

电脑端连接UNO+按钮键盘。

import socket
import sys
import cv2
import numpy as np
import time
from pinpong.board import Board,Pin
Board("UNO") .begin()
p_p8_in=Pin(Pin.D5, Pin.IN)
p_p9_in=Pin(Pin.D6, Pin.IN)
p_p12_in=Pin(Pin.D7, Pin.IN)
p_p13_in=Pin(Pin.D8, Pin.IN)
p_p14_in=Pin(Pin.D9, Pin.IN)
bs1 = 0
bs2 = 0
bs3 = 0
bs4 = 0
bs5 = 0
bs6 = 0
bs7 = 0
address = ('192.168.31.8', 5006)# 服务端地址和端口
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
    cli.connect(address)# 尝试连接服务端
except Exception:
    print('[!] Server not found or not open')
    sys.exit()


cv2.namedWindow('video',cv2.WINDOW_KEEPRATIO)
cv2.resizeWindow('video', 240, 320);
#cv2.setWindowProperty('video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
currtime=time.time()
while True:
    bs6=0
   
    if(p_p8_in.read_digital()==True):
          if(bs1 == 0):   
             cli.send(b'L0')
             bs1 = 1
             bs6=1
    else:
          if(bs1 == 1):   
             cli.send(b'L1')
             bs1 = 0
             bs6=1
    if(p_p9_in.read_digital()==True):
          if(bs2 == 0):   
             cli.send(b'B0')
             bs2 = 1
             bs6=1
    else:
            if(bs2 == 1):   
               cli.send(b'B1')
               bs2 = 0
               bs6=1
    if(p_p12_in.read_digital()==True):
            if(bs3 == 0):   
               cli.send(b'R0')
               bs3 = 1
               bs6=1
    else:
            if(bs3 == 1):   
               cli.send(b'R1')
               bs3 = 0
               bs6=1
    if(p_p13_in.read_digital()==True):
            if(bs4 == 0):   
               cli.send(b'F0')
               bs4 = 1
               bs6=1
    else:
            if(bs4 == 1):   
               cli.send(b'F1')
               bs4 = 0
               bs6=1
    if(p_p14_in.read_digital()==True):
            if(bs5 == 0):   
            cli.send(b'S0')
            bs5 = 1
            bs6=1
    else:
            if(bs5 == 1):   
            cli.send(b'S1')
            bs5 = 0
            bs6=1
            
    if bs6==0:
       cli.send(b'OK')
    if 1:
      data = cli.recv(1024*40)
      image = np.frombuffer(data, np.uint8)
      image = cv2.imdecode(image,cv2.IMREAD_COLOR)
      if image is None:
            continue
      cv2.imshow('video',image)
      cv2.waitKey(1)
      

cli.close()
行空板程序:

import socket
import cv2
import time
import sys
import numpy as np

from pinpong.board import Board
from dfrobot_motor import MOTOR

Board().begin()
M = MOTOR()

address =('192.168.31.8', 5006)# 服务端地址和端口
ser = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ser.bind(address)
ser.listen(5)
# 阻塞式
print('waiting。。。')
conn, addr = ser.accept()
print('建立连接...')
print('连接对象:', addr)
#cap = cv2.VideoCapture(r"D:\project\dataset\video\测试.mp4")
cap = cv2.VideoCapture(0)
frames_num=cap.get(7)
print('视频总帧数:',frames_num)
print('发送目标...')

cv2.namedWindow("send",cv2.WINDOW_KEEPRATIO)
cv2.setWindowProperty("send", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
currtime=time.time()
while cap.isOpened():

      data = conn.recv(2)
      data = data.decode()
      if data:
                  
         if data=="R0":
         M.motor_run(M.M1,M.CW,200)
         M.motor_run(M.M2,M.CW,0)
         elif data=="L0":
         M.motor_run(M.M1,M.CW,0)
         M.motor_run(M.M2,M.CW,200)
         elif data=="B0":
         M.motor_run(M.ALL,M.CCW,200)
         elif data=="F0":
         M.motor_run(M.ALL,M.CW,200)
         elif data=="S0":
         M.motor_stop(M.ALL)
         elif data=="R1" or data=="L1" or data=="B1" or data=="F1" or data=="S1":
         M.motor_stop(M.ALL)
      #if time.time()-currtime>0.039:
         #currtime=time.time()
      if 1:   
         ret, frame = cap.read()
         if not ret:
             continue
         frame = cv2.resize(frame,(480,360))
         frame=frame
         cv2.imshow('send', frame)
         cv2.waitKey(1)
      
      # 数据打包有很多方式,也可以用json打包
         img_encode = cv2.imencode('.jpg', frame)

         data_encode = np.array(img_encode)
         str_encode = data_encode.tostring()

         conn.sendall(str_encode)

conn.close()      


【演示视频】



https://www.bilibili.com/video/BV19D4y1v7Lh?share_source=copy_web

使用另一个行空板作为控制端代替电脑,正在努力中,处理有延迟,代码需优化。

Mr-k 发表于 2022-9-29 15:00:45

厉害厉害

34603471 发表于 2023-1-24 16:26:14

为大神的作品点赞

花生编程 发表于 2023-1-30 08:25:28

厉害厉害

花生编程 发表于 2023-1-30 08:28:05

赞!!!

手机连接万物 发表于 2023-12-11 14:39:16

非常不错

easy猿 发表于 2023-12-19 20:17:10

老师厉害
页: [1]
查看完整版本: 行空板——视频车