零基础k210 dock实现esp8285wifi图传
本帖最后由 1784450870 于 2021-3-9 16:02 编辑首先你需要装maixpy ide开发环境、ch340驱动、kflash guiMaixpy ide开发环境如图: https://mc.dfrobot.com.cn/forum.php?mod=image&aid=120196&size=300x300&key=b95118e5256ef581&nocache=yes&type=fixnoneIde的下载位置: Kflash gui如图:
https://mc.dfrobot.com.cn/forum.php?mod=image&aid=120199&size=300x300&key=7127071bb4be706f&nocache=yes&type=fixnone你需要下载一个固件:我是下载的0.5.1固件maixpy_v0.5.1_124_ga3f708c.bin如果你要找可以在sipeed的下载站里找
有余力可以装一个uPyLoader-win(由于我插了sd,不像openmv那样可以直接打开查看sd卡内容,这个软件可以帮助你查看sd卡内容)如图:
如何看自己装没装ch340驱动:计算机右键属性到设备管理器->端口
这些软件在哪下载网址:https://dl.sipeed.com/shareURL/MAIX/tools(这个就是sipeed的下载站)
首先k210 dock开发版里自带的wifi模块是esp8285,出厂时候厂家已经帮你把esp8285的at固件已经烧录进去了,如果你要更新固件可以参考文档的这个网址:https://maixpy.sipeed.com/zh/get_started/upgrade_esp8285_firmware.html。第一步开始验证 ESP8285 是否能够正常工作最好使用xcom串口助手,我之前使用友善串口助手没测试成功,这个原因和串口助手内部的协议有关吧,推荐最好使用xcom。
下面是我的运行结果:esp8285没有问题
现在就是注意2.4g和5g的区别:这里两个信号指的都是频段,5g不是移动网里的5g,现在家有的WiFi路由器可以看到会产生两个频段,2.4g和5g,可以看到我这就是5g该程序是基于的2.4g,可以手机开热点
Dock开发板代码(client端代码)import time, network
from Maix import GPIO
from machine import UART
from fpioa_manager import fm
from board import board_info
WIFI_SSID = "wxw"
WIFI_PASSWD = "12345678"
class wifi():
__is_m1w__ = True
uart = None
eb = None
nic = None
def init():
if __class__.__is_m1w__:
fm.register(0, fm.fpioa.GPIOHS1, force=True)
M1wPower=GPIO(GPIO.GPIOHS1, GPIO.OUT)
M1wPower.value(0) # b'\r\n ets Jan8 2013,rst cause:1, boot mode:(7,6)\r\n\r\nwaiting for host\r\n'
fm.register(board_info.WIFI_EN, fm.fpioa.GPIOHS0) # board_info.WIFI_EN == IO 8
__class__.en = GPIO(GPIO.GPIOHS0,GPIO.OUT)
fm.register(board_info.WIFI_RX,fm.fpioa.UART2_TX) # board_info.WIFI_RX == IO 7
fm.register(board_info.WIFI_TX,fm.fpioa.UART2_RX) # board_info.WIFI_TX == IO 6
__class__.uart = UART(UART.UART2, 115200, timeout=1000, read_buf_len=8192)
def enable(en):
__class__.en.value(en)
def _at_cmd(cmd="AT\r\n", resp="OK\r\n", timeout=20):
__class__.uart.write(cmd) # "AT+GMR\r\n"
time.sleep_ms(timeout)
tmp = __class__.uart.read()
# print(tmp)
if tmp and tmp.endswith(resp):
return True
return False
def at_cmd(cmd="AT\r\n", timeout=20):
__class__.uart.write(cmd) # "AT+GMR\r\n"
time.sleep_ms(timeout)
tmp = __class__.uart.read()
return tmp
def reset(force=False, reply=5):
if force == False and __class__.isconnected():
return True
__class__.init()
for i in range(reply):
print('reset...')
__class__.enable(False)
time.sleep_ms(50)
__class__.enable(True)
time.sleep_ms(500) # at start > 500ms
if __class__._at_cmd(timeout=500):
break
__class__._at_cmd()
__class__._at_cmd('AT+UART_CUR=921600,8,1,0,0\r\n', "OK\r\n")
__class__.uart = UART(UART.UART2, 921600, timeout=1000, read_buf_len=10240)
# important! baudrate too low or read_buf_len too small will loose data
#print(__class__._at_cmd())
try:
__class__.nic = network.ESP8285(__class__.uart)
time.sleep_ms(500) # wait at ready to connect
except Exception as e:
print(e)
return False
return True
def connect(ssid="wifi_name", pasw="pass_word"):
if __class__.nic != None:
return __class__.nic.connect(ssid, pasw)
def ifconfig(): # should check ip != 0.0.0.0
if __class__.nic != None:
return __class__.nic.ifconfig()
def isconnected():
if __class__.nic != None:
return __class__.nic.isconnected()
return False
def check_wifi_net(reply=5):
if wifi.isconnected() != True:
for i in range(reply):
try:
wifi.reset()
print('try AT connect wifi...', wifi._at_cmd())
wifi.connect(WIFI_SSID, WIFI_PASSWD)
if wifi.isconnected():
break
except Exception as e:
print(e)
return wifi.isconnected()
wifi.check_wifi_net()
addr = ("192.168.43.6", 8000)
##################################
import socket, time, sensor, image
import lcd
clock = time.clock()
lcd.init()
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(time = 2000)
while True:
# send pic
while True:
try:
sock = socket.socket()
print(sock)
sock.connect(addr)
break
except Exception as e:
print("connect error:", e)
sock.close()
continue
sock.settimeout(5)
send_len, count, err = 0, 0, 0
while True:
clock.tick()
if err >=10:
print("socket broken")
break
img = sensor.snapshot()
lcd.display(img)
img = img.compress(quality=60)
img_bytes = img.to_bytes()
print("send len: ", len(img_bytes))
try:
block = int(len(img_bytes)/2048)
for i in range(block):
send_len = sock.send(img_bytes)
#time.sleep_ms(500)
send_len2 = sock.send(img_bytes)
#send_len = sock.send(img_bytes)
#send_len = sock.send(img_bytes)
#time.sleep_ms(500)
if send_len == 0:
raise Exception("send fail")
except OSError as e:
if e.args == 128:
print("connection closed")
break
except Exception as e:
print("send fail:", e)
time.sleep(1)
err += 1
continue
count += 1
print("send:", count)
print("fps:", clock.fps())
#time.sleep_ms(500)
print("close now")
sock.close()
服务端代码:import socket
import time
import threading
import datetime
import pygame
from pygame.locals import QUIT, KEYDOWN, K_f, K_F11, FULLSCREEN
local_ip = "192.168.43.6"
local_port = 8000
width = 320
height = 240
# jpeg 20 fps
# esp32 spi dma temp buffer MAX Len: 4k
def receiveThread(conn):
conn.settimeout(10)
conn_end = False
pack_size = 1024*5
while True:
if conn_end:
break
img = b""
tmp = b''
while True:
try:
client_data = conn.recv(1)
except socket.timeout:
conn_end = True
break
if tmp == b'\xFF' and client_data == b'\xD8':
img = b'\xFF\xD8'
break
tmp = client_data
while True:
try:
client_data = conn.recv(4096)
except socket.timeout:
client_data = None
conn_end = True
if not client_data:
break
# print("received data,len:",len(client_data) )
img += client_data
if img[-2:] == b'\xFF\xD9':
break
if len(client_data) > pack_size:
break
print("recive end, pic len:", len(img))
if not img.startswith(b'\xFF\xD8') or not img.endswith(b'\xFF\xD9'):
print("image error")
continue
f = open("tmp.jpg", "wb")
f.write(img)
f.close()
try:
surface = pygame.image.load("tmp.jpg").convert()
screen.blit(surface, (0, 0))
pygame.display.update()
print("recieve ok")
except Exception as e:
print(e)
conn.close()
print("receive thread end")
pygame.init()
screen = pygame.display.set_mode((width, height), 0, 32)
pygame.display.set_caption("pic from client")
ip_port = (local_ip, local_port)
sk = socket.socket()
sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk.bind(ip_port)
sk.listen(50)
print("accept now,wait for client")
def server():
while True:
conn, addr = sk.accept()
print("hello client,ip:")
print(addr)
t = threading.Thread(target=receiveThread, args=(conn,))
t.setDaemon(True)
t.start()
tmp = threading.Thread(target=server, args=())
tmp.setDaemon(True)
tmp.start()
while True:
for event in pygame.event.get():
if event.type == pygame.QUIT:
exit()
如果以上不够仔细的话:大家可以参考spieed文档里的内容。测试视频
https://www.bilibili.com/video/BV1py4y1a7S8下面视频是大佬鼠的测试视频:https://www.bilibili.com/video/BV1ff4y1B7oT
前来回复,祝maixpy越来越牛逼 厉害厉害 empty 发表于 2021-3-1 12:33
厉害厉害
{:6_202:}好耶好耶 那个,client端的代码乱码了,可以重新发一遍吗 我朝女王扔石头 发表于 2021-3-7 15:27
那个,client端的代码乱码了,可以重新发一遍吗
import socket
import time
import threading
import datetime
import pygame
from pygame.locals import QUIT, KEYDOWN, K_f, K_F11, FULLSCREEN
local_ip = "192.168.43.6"
local_port = 8000
width = 320
height = 240
# jpeg 20 fps
# esp32 spi dma temp buffer MAX Len: 4k
def receiveThread(conn):
conn.settimeout(10)
conn_end = False
pack_size = 1024*5
while True:
if conn_end:
break
img = b""
tmp = b''
while True:
try:
client_data = conn.recv(1)
except socket.timeout:
conn_end = True
break
if tmp == b'\xFF' and client_data == b'\xD8':
img = b'\xFF\xD8'
break
tmp = client_data
while True:
try:
client_data = conn.recv(4096)
except socket.timeout:
client_data = None
conn_end = True
if not client_data:
break
# print("received data,len:",len(client_data) )
img += client_data
if img[-2:] == b'\xFF\xD9':
break
if len(client_data) > pack_size:
break
print("recive end, pic len:", len(img))
if not img.startswith(b'\xFF\xD8') or not img.endswith(b'\xFF\xD9'):
print("image error")
continue
f = open("tmp.jpg", "wb")
f.write(img)
f.close()
try:
surface = pygame.image.load("tmp.jpg").convert()
screen.blit(surface, (0, 0))
pygame.display.update()
print("recieve ok")
except Exception as e:
print(e)
conn.close()
print("receive thread end")
pygame.init()
screen = pygame.display.set_mode((width, height), 0, 32)
pygame.display.set_caption("pic from client")
ip_port = (local_ip, local_port)
sk = socket.socket()
sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk.bind(ip_port)
sk.listen(50)
print("accept now,wait for client")
def server():
while True:
conn, addr = sk.accept()
print("hello client,ip:")
print(addr)
t = threading.Thread(target=receiveThread, args=(conn,))
t.setDaemon(True)
t.start()
tmp = threading.Thread(target=server, args=())
tmp.setDaemon(True)
tmp.start()
while True:
for event in pygame.event.get():
if event.type == pygame.QUIT:
exit() 博主,这个服务端和客服端都是用K210做的吗? 太牛了。。。。老哥你这个wifi的速率可以用来传输视频流吗?
页:
[1]