本帖最后由 1784450870 于 2021-3-9 16:02 编辑
首先你需要装maixpy ide开发环境、ch340驱动、kflash gui Maixpy ide开发环境如图: Ide的下载位置: Kflash gui如图:
你需要下载一个固件:我是下载的0.5.1固件maixpy_v0.5.1_124_ga3f708c.bin 如果你要找可以在sipeed的下载站里找
有余力可以装一个uPyLoader-win(由于我插了sd,不像openmv那样可以直接打开查看sd卡内容,这个软件可以帮助你查看sd卡内容) 如图: 如何看自己装没装ch340驱动: 计算机右键属性到设备管理器->端口 这些软件在哪下载 网址:https://dl.sipeed.com/shareURL/MAIX/tools(这个就是sipeed的下载站)
首先k210 dock开发版里自带的wifi模块是esp8285,出厂时候厂家已经帮你把esp8285的at固件已经烧录进去了,如果你要更新固件可以参考文档的这个网址:https://maixpy.sipeed.com/zh/get_started/upgrade_esp8285_firmware.html。 第一步开始验证 ESP8285 是否能够正常工作最好使用xcom串口助手,我之前使用友善串口助手没测试成功,这个原因和串口助手内部的协议有关吧,推荐最好使用xcom。 下面是我的运行结果:esp8285没有问题 现在就是注意2.4g和5g的区别: 这里两个信号指的都是频段,5g不是移动网里的5g,现在家有的WiFi路由器可以看到会产生两个频段,2.4g和5g, 可以看到我这就是5g 该程序是基于的2.4g,可以手机开热点 Dock开发板代码(client端代码)
- import time, network
- from Maix import GPIO
- from machine import UART
- from fpioa_manager import fm
- from board import board_info
- WIFI_SSID = "wxw"
- WIFI_PASSWD = "12345678"
- class wifi():
-
- __is_m1w__ = True
- uart = None
- eb = None
- nic = None
-
- def init():
- if __class__.__is_m1w__:
- fm.register(0, fm.fpioa.GPIOHS1, force=True)
- M1wPower=GPIO(GPIO.GPIOHS1, GPIO.OUT)
- M1wPower.value(0) # b'\r\n ets Jan 8 2013,rst cause:1, boot mode:(7,6)\r\n\r\nwaiting for host\r\n'
-
- fm.register(board_info.WIFI_EN, fm.fpioa.GPIOHS0) # board_info.WIFI_EN == IO 8
- __class__.en = GPIO(GPIO.GPIOHS0,GPIO.OUT)
-
- fm.register(board_info.WIFI_RX,fm.fpioa.UART2_TX) # board_info.WIFI_RX == IO 7
- fm.register(board_info.WIFI_TX,fm.fpioa.UART2_RX) # board_info.WIFI_TX == IO 6
- __class__.uart = UART(UART.UART2, 115200, timeout=1000, read_buf_len=8192)
-
- def enable(en):
- __class__.en.value(en)
-
- def _at_cmd(cmd="AT\r\n", resp="OK\r\n", timeout=20):
- __class__.uart.write(cmd) # "AT+GMR\r\n"
- time.sleep_ms(timeout)
- tmp = __class__.uart.read()
- # print(tmp)
- if tmp and tmp.endswith(resp):
- return True
- return False
-
- def at_cmd(cmd="AT\r\n", timeout=20):
- __class__.uart.write(cmd) # "AT+GMR\r\n"
- time.sleep_ms(timeout)
- tmp = __class__.uart.read()
- return tmp
-
- def reset(force=False, reply=5):
- if force == False and __class__.isconnected():
- return True
- __class__.init()
- for i in range(reply):
- print('reset...')
- __class__.enable(False)
- time.sleep_ms(50)
- __class__.enable(True)
- time.sleep_ms(500) # at start > 500ms
- if __class__._at_cmd(timeout=500):
- break
- __class__._at_cmd()
- __class__._at_cmd('AT+UART_CUR=921600,8,1,0,0\r\n', "OK\r\n")
- __class__.uart = UART(UART.UART2, 921600, timeout=1000, read_buf_len=10240)
- # important! baudrate too low or read_buf_len too small will loose data
- #print(__class__._at_cmd())
- try:
- __class__.nic = network.ESP8285(__class__.uart)
- time.sleep_ms(500) # wait at ready to connect
- except Exception as e:
- print(e)
- return False
- return True
-
- def connect(ssid="wifi_name", pasw="pass_word"):
- if __class__.nic != None:
- return __class__.nic.connect(ssid, pasw)
-
- def ifconfig(): # should check ip != 0.0.0.0
- if __class__.nic != None:
- return __class__.nic.ifconfig()
-
- def isconnected():
- if __class__.nic != None:
- return __class__.nic.isconnected()
- return False
- def check_wifi_net(reply=5):
- if wifi.isconnected() != True:
- for i in range(reply):
- try:
- wifi.reset()
- print('try AT connect wifi...', wifi._at_cmd())
- wifi.connect(WIFI_SSID, WIFI_PASSWD)
- if wifi.isconnected():
- break
- except Exception as e:
- print(e)
- return wifi.isconnected()
- wifi.check_wifi_net()
-
- addr = ("192.168.43.6", 8000)
- ##################################
-
- import socket, time, sensor, image
- import lcd
-
- clock = time.clock()
- lcd.init()
- sensor.reset()
- sensor.set_pixformat(sensor.RGB565)
- sensor.set_framesize(sensor.QVGA)
- sensor.skip_frames(time = 2000)
-
- while True:
- # send pic
- while True:
- try:
- sock = socket.socket()
- print(sock)
- sock.connect(addr)
- break
- except Exception as e:
- print("connect error:", e)
- sock.close()
- continue
- sock.settimeout(5)
-
- send_len, count, err = 0, 0, 0
- while True:
- clock.tick()
- if err >=10:
- print("socket broken")
- break
- img = sensor.snapshot()
- lcd.display(img)
- img = img.compress(quality=60)
- img_bytes = img.to_bytes()
- print("send len: ", len(img_bytes))
- try:
- block = int(len(img_bytes)/2048)
- for i in range(block):
- send_len = sock.send(img_bytes[i*2048:(i+1)*2048])
- #time.sleep_ms(500)
- send_len2 = sock.send(img_bytes[block*2048:])
- #send_len = sock.send(img_bytes[0:2048])
- #send_len = sock.send(img_bytes[2048:])
- #time.sleep_ms(500)
- if send_len == 0:
- raise Exception("send fail")
- except OSError as e:
- if e.args[0] == 128:
- print("connection closed")
- break
- except Exception as e:
- print("send fail:", e)
- time.sleep(1)
- err += 1
- continue
- count += 1
- print("send:", count)
- print("fps:", clock.fps())
- #time.sleep_ms(500)
- print("close now")
- sock.close()
复制代码
服务端代码: - import socket
- import time
- import threading
- import datetime
- import pygame
- from pygame.locals import QUIT, KEYDOWN, K_f, K_F11, FULLSCREEN
-
- local_ip = "192.168.43.6"
- local_port = 8000
- width = 320
- height = 240
-
- # jpeg 20 fps
- # esp32 spi dma temp buffer MAX Len: 4k
-
-
- def receiveThread(conn):
- conn.settimeout(10)
- conn_end = False
- pack_size = 1024*5
- while True:
- if conn_end:
- break
- img = b""
- tmp = b''
- while True:
- try:
- client_data = conn.recv(1)
- except socket.timeout:
- conn_end = True
- break
- if tmp == b'\xFF' and client_data == b'\xD8':
- img = b'\xFF\xD8'
- break
- tmp = client_data
- while True:
- try:
- client_data = conn.recv(4096)
- except socket.timeout:
- client_data = None
- conn_end = True
- if not client_data:
- break
- # print("received data,len:",len(client_data) )
- img += client_data
- if img[-2:] == b'\xFF\xD9':
- break
- if len(client_data) > pack_size:
- break
- print("recive end, pic len:", len(img))
-
- if not img.startswith(b'\xFF\xD8') or not img.endswith(b'\xFF\xD9'):
- print("image error")
- continue
- f = open("tmp.jpg", "wb")
- f.write(img)
- f.close()
- try:
- surface = pygame.image.load("tmp.jpg").convert()
- screen.blit(surface, (0, 0))
- pygame.display.update()
- print("recieve ok")
- except Exception as e:
- print(e)
- conn.close()
- print("receive thread end")
-
-
- pygame.init()
- screen = pygame.display.set_mode((width, height), 0, 32)
- pygame.display.set_caption("pic from client")
-
- ip_port = (local_ip, local_port)
- sk = socket.socket()
- sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sk.bind(ip_port)
- sk.listen(50)
- print("accept now,wait for client")
-
-
- def server():
- while True:
- conn, addr = sk.accept()
- print("hello client,ip:")
- print(addr)
- t = threading.Thread(target=receiveThread, args=(conn,))
- t.setDaemon(True)
- t.start()
-
-
- tmp = threading.Thread(target=server, args=())
- tmp.setDaemon(True)
- tmp.start()
-
- while True:
-
- for event in pygame.event.get():
- if event.type == pygame.QUIT:
- exit()
复制代码
如果以上不够仔细的话:大家可以参考spieed文档里的内容。 测试视频
|