select模块提供了等待数据流的事件功能,使用这个模块需要建立wifi网络连接。下面的示例需要先执行以下连接语句(这些语句将会创建一个UDP服务器,我们可以使用手机创建热点并使用下面的代码连接,之后使用手机网路调试助手创建UDP客户端进行测试)。
import socket
import network
import time
import select
SSID = "dfrobotYanfa" #修改为你WiFi的名称
PASSWORD = "hidfrobot" #修改为你WiFi的密码
port = 10000 #端口号
wlan = None #wlan
s = None #套接字
wlan = network.WLAN(network.STA_IF)
wlan.active(True) #激活网络
wlan.disconnect()
wlan.connect(SSID, PASSWORD)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建套接字
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #设置套接字
ip = wlan.ifconfig()[0]
s.bind((ip, port)) #绑定套接字
宏
- select.POLLIN = 1 — 有数据可读取
- select.POLLOUT = 4 — 可以写入数据
- select.POLLERR = 8 — 发生错误
- select.POLLHUP = 16 — 数据流结束/连接中断
函数
1. select.select(rlist, wlist, xlist[, timeout])
函数说明:监控对象何时可读或可写,一旦监控的对象状态改变,返回结果(阻塞线程)。这个函数是为了兼容,效率不高,推荐用 poll 函数。
rlist:等待读就绪的文件描述符数组
wlist:等待写就绪的文件描述符数组
xlist:等待异常的数组
timeout:等待时间(单位:秒)
示例:
def selectTest():
global s
rs, ws, es = select.select([s,], [], [])
#程序会在此等待直到对象s可读
print(rs)
for i in rs:
if i == s:
print("s can read now")
data,addr=s.recvfrom(1024)
print('received:',data,'from',addr)
2. select.poll()
函数说明: 创建轮询实例。
示例:
>>>poller = select.poll()
>>>print(poller)
<poll>
2.1. select.poll.register(obj, flag)
函数说明:注册一个用以监控的对象,并设置被监控对象的监控标志位flag。
obj:被监控的对象
flag:被监控的标志
select.POLLIN — 可读
select.POLLHUP — 已挂断
select.POLLERR — 出错
select.POLLOUT — 可写
示例:
>>>READ_ONLY = select.POLLIN | select.POLLHUP | select.POLLERR
>>>READ_WRITE = select.POLLOUT | READ_ONLY
>>>poller.register(s, READ_WRITE)
>>>poller.unregister(s)
2.2. select.poll.unregister(obj)
函数说明:解除监控的对象的注册。
obj:注册过的对象
示例:
>>>READ_ONLY = select.POLLIN | select.POLLHUP | select.POLLERR
>>>READ_WRITE = select.POLLOUT | READ_ONLY
>>>poller.register(s, READ_WRITE)
>>>poller.unregister(s)
2.3. select.poll.modify(obj, flag)
函数说明:修改已注册的对象监控标志。
obj:已注册的被监控对象
flag:修改为的监控标志
示例:
>>>READ_ONLY = select.POLLIN | select.POLLHUP | select.POLLERR
>>>READ_WRITE = select.POLLOUT | READ_ONLY
>>>poller.register(s, READ_WRITE)
>>>poller.modify(s, READ_ONLY)
综合示例
保存下面代码为.py文件并下载运行。
import socket
import network
import time
import select
SSID = "dfrobotYanfa" #修改为你WiFi的名称
PASSWORD = "hidfrobot" #修改为你WiFi的密码
port = 10000 #端口号
wlan = None #WLAN
s = None #套接字
wlan = network.WLAN(network.STA_IF)
wlan.active(True) #激活网络
wlan.disconnect() #断开上次连接
wlan.connect(SSID, PASSWORD) #连接WiFi
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建套接字
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #设置套接字
ip = wlan.ifconfig()[0]
s.bind((ip, port)) #绑定套接字
def selectTest():
global s
rs, ws, es = select.select([s,], [], [])
#程序会在此等待直到对象s可读
print(rs)
for i in rs:
if i == s:
print("s can read now")
data,addr=s.recvfrom(1024) #接收数据(1024字节)
print('received:',data,'from',addr)
try:
while True:
selectTest()
except:
if (s):
s.close()
wlan.disconnect() #断开WiFi连接
wlan.active(False) #冻结网络