【行空板】
行空板有一个USB TYPE-A接口用于外接USB外设,可用于连接摄像头,在CV2进行调用,通过socket将视频传给电脑。
【行空板服务端】
-
- #!/usr/bin/env python
- # -*- coding=utf-8 -*-
-
- import socket
- import numpy as np
- import urllib
- import cv2 as cv
- import threading
- import time
-
-
- print('this is Server')
-
- cap = cv.VideoCapture(0)
- cap.set(cv.CAP_PROP_FRAME_WIDTH, 320)
- cap.set(cv.CAP_PROP_FRAME_HEIGHT, 240)
-
- def socket_service():
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # 防止socket server重启后端口被占用(socket.error: [Errno 98] Address already in use)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- #s.bind(('127.0.0.1', 6666))
- s.bind(('192.168.31.25', 6666))#这个是服务端机器的ip
- s.listen(10)
- except socket.error as msg:
- print (msg)
- sys.exit(1)
- print ('Waiting connection...')
-
- while True:
- conn, addr = s.accept()
- t = threading.Thread(target=deal_data, args=(conn, addr))
- t.start()
-
- def deal_data(conn, addr):
- print ('Accept new connection from {0}'.format(addr))
- while True:
- # get a frame
- ret, frame = cap.read()
- # '.jpg'表示把当前图片img按照jpg格式编码,按照不同格式编码的结果不一样
- img_encode = cv.imencode('.jpg', frame)[1]
- data_encode = np.array(img_encode)
- str_encode = data_encode.tostring()
- encode_len = str(len(str_encode))
- print('img size : %s'%encode_len)
- try:
- conn.send(str_encode)#发送图片的encode码
- except Exception as e:
- print(e)
- time.sleep(0.1)
- conn.close()
-
- socket_service()
-
复制代码
【电脑接收端程序】
-
- #!/usr/bin/env python
- # -*- coding=utf-8 -*-
-
- import socket
- import numpy as np
- import urllib
- import cv2
- import threading
- import time,sys
-
- def socket_client():
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('192.168.31.25', 6666))#连接服务端
- except socket.error as msg:
- print (msg)
- #sys.exit(1)
- #c_sock, c_addr =s.accept()
- print('this is Client')
- while True:
- try:
- receive_encode = s.recv(77777)#接收的字节数 最大值 2147483647 (31位的二进制)
- nparr = np.fromstring(receive_encode, dtype='uint8')
- img_decode = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
-
- #产生标签文本背景透明效果
-
- cv2.imshow("Client_show", img_decode)
-
- cv2.waitKey(1)
-
- except Exception as e:
- print(e)
-
-
- socket_client()
-
复制代码
【物联网——行空板】
-
-
- # -*- coding: utf-8 -*-
- import time
-
- import numpy as np
- from pinpong.board import Board
- from microbit_motor import Microbit_Motor #导入Microbit_Motor库
- import siot
- from pinpong.extension.unihiker import *
-
- from unihiker import GUI
-
- import socket
-
- import urllib
- import cv2 as cv
- import threading
-
-
-
- print('this is Server')
-
- cap = cv.VideoCapture(0)
- cap.set(cv.CAP_PROP_FRAME_WIDTH, 320)
- cap.set(cv.CAP_PROP_FRAME_HEIGHT, 240)
- u_gui=GUI()
- xianshi=u_gui.draw_text(text="开始",x=50,y=100,font_size=50, color="#0000FF")
- # 事件回调函数
- def on_message_callback(client, userdata, msg):
-
- if (msg.payload.decode("utf-8") == 'G'):
- forward(200)
- xianshi.config(text="前进")
- if (msg.payload.decode("utf-8")== 'S'):
- stop()
- xianshi.config(text="停止")
-
- if (msg.payload.decode("utf-8") == 'B'):
- back(200)
- xianshi.config(text="后退")
-
- if (msg.payload.decode("utf-8") == 'L'):
- left(200)
- xianshi.config(text="向左")
-
- if (msg.payload.decode("utf-8") == 'R'):
- right(200)
- xianshi.config(text="向右")
-
-
- siot.init(client_id="siot_192",server="192.168.31.25",port=1883,user="siot",password="dfrobot")
- siot.connect()
- siot.loop()
- siot.set_callback(on_message_callback)
- siot.getsubscribe(topic="car/control")
-
- Board("microbit").begin()
- motorbit = Microbit_Motor()
- def forward(speed):
- #前进
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
- def back(speed):
- #后退
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
- def left_turn(speed):
- #向左转
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
- def right_turn(speed):
- #向右转
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
- def left(speed):
- #向左
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CCW, speed)
- def right(speed):
- #向右
- #电机有M1,M2,M3,M4, CW代表正转,CCW代表反转,255是速度,范围0-255
- if speed>255:
- speed=255
- motorbit.motor_run(motorbit.M1, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M2, motorbit.CW, speed)
- motorbit.motor_run(motorbit.M3, motorbit.CCW, speed)
- motorbit.motor_run(motorbit.M4, motorbit.CW, speed)
- def stop():
- motorbit.motor_stop(motorbit.M1)
- motorbit.motor_stop(motorbit.M2)
- motorbit.motor_stop(motorbit.M3)
- motorbit.motor_stop(motorbit.M4)
- def socket_service():
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # 防止socket server重启后端口被占用(socket.error: [Errno 98] Address already in use)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- #s.bind(('127.0.0.1', 6666))
- s.bind(('192.168.31.25', 6666))#这个是服务端机器的ip
- s.listen(10)
- except socket.error as msg:
- print (msg)
- sys.exit(1)
- print ('Waiting connection...')
-
- while True:
- conn, addr = s.accept()
- t = threading.Thread(target=deal_data, args=(conn, addr))
- t.start()
-
- def deal_data(conn, addr):
- print ('Accept new connection from {0}'.format(addr))
- while True:
- # get a frame
- ret, frame = cap.read()
- # '.jpg'表示把当前图片img按照jpg格式编码,按照不同格式编码的结果不一样
- img_encode = cv.imencode('.jpg', frame)[1]
- data_encode = np.array(img_encode)
- str_encode = data_encode.tostring()
- encode_len = str(len(str_encode))
- print('img size : %s'%encode_len)
- try:
- conn.send(str_encode)#发送图片的encode码
- except Exception as e:
- print(e)
- time.sleep(0.1)
- conn.close()
- def trace_fun():
-
- while True:
- socket_service()
-
-
-
- if __name__ == '__main__':
- trace_fun()
复制代码
【物联网——电脑端】
-
- #!/usr/bin/env python
- # -*- coding=utf-8 -*-
-
- import socket
- import numpy as np
- import urllib
- import cv2
- import threading
- import time,sys
- import tkinter as tk
- from tkinter import *
- from PIL import Image, ImageTk
- import siot
- siot.init(client_id="siot_192",server="192.168.31.25",port=1883,user="siot",password="dfrobot")
- siot.connect()
- siot.loop()
-
- #生成窗体,布置相应文本框和按钮
- def forward():
- siot.publish('car/control', 'G')
- def left():
- siot.publish('car/control', 'L')
- def right():
- siot.publish('car/control', 'R')
- def stop():
- siot.publish('car/control', 'S')
- def back():
- siot.publish('car/control', 'B')
- top = tk.Tk()
- top.title('控制窗口')
- top.geometry('420x300')
- image_width = 425
- image_height = 300
-
- canvas = Canvas(top,bg = 'white',width = image_width,height = image_height )#绘制画布
- canvas.pack()
- img = Image.open('back.jpg')
- bg = ImageTk.PhotoImage(img)
- bgid = canvas.create_image(0, 0, image=bg, anchor='nw')
- canvas.place(x = 0,y = 0)
- #产生标签文本背景透明效果
- txtid=canvas.create_text(5,20, fill = 'red',font=("黑体", 15),anchor="nw")
- canvas.insert(txtid,1,"行空板四驱车")
- wx=5
- hy=20
-
- #生成按钮,并指定相应功能
- left = tk.Button(top,text='左转',height=2,width=15,command=left)
- left.place(x=wx,y=hy+100)
- forward = tk.Button(top,text='前进',height=2,width=15,command=forward)
- forward.place(x=wx+150,y=hy)
- right = tk.Button(top,text='右转',height=2,width=15,command=right)
- right.place(x=wx+300,y=hy+100)
- back = tk.Button(top,text='后退',height=2,width=15,command=back)
- back.place(x=wx+150,y=hy+200)
- stop = tk.Button(top,text='停止',height=2,width=15,command=stop)
- stop.place(x=wx+150,y=hy+100)
- def socket_client():
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('192.168.31.25', 6666))#连接服务端
- except socket.error as msg:
- print (msg)
- #sys.exit(1)
- #c_sock, c_addr =s.accept()
- print('this is Client')
- while True:
- try:
- receive_encode = s.recv(77777)#接收的字节数 最大值 2147483647 (31位的二进制)
- nparr = np.fromstring(receive_encode, dtype='uint8')
- img_decode = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
-
- #产生标签文本背景透明效果
-
- cv2.imshow("Client_show", img_decode)
-
- cv2.waitKey(1)
- top.update()
- top.after(100)
- except Exception as e:
- print(e)
-
-
- socket_client()
-
复制代码
【演示视频】
|