6586| 1
|
[ESP32系列教程] ESP32 Picoweb教程:修改返回的HTTP代码 |
引 言 本文主要说明如何在MicroPython Picoweb应用程序中返回特定的HTTP代码。 我们将使用一个叫做http_error的函数。这个函数其实也可以返回其他类型的HTTP代码,比如在2xx范围内的代码(成功代码)。可用的HTTP响应代码可参见此处(https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html)。 测试使用的是一个集成在ESP32开发板中的DFRobot的sp - wroom -32设备。代码开发是在MicroPython IDE uPyCraft上完成的。你可以在前一篇文章:ESP32 MicroPython教程:uPyCraft IDE入门 中查看如何使用uPyCraft。 尽管本文将使用一个特殊的函数来返回状态代码,但是上一篇教程中用来将第一部分HTTP响应发送给客户端的start_response函数也有一个可选的状态参数可用于返回特殊的HTTP代码。默认情况下,其返回值是200,对应于OK [1]。 start_response函数代码如下: [mw_shl_code=applescript,true] # Picoweb web pico-framework for MicroPython# Copyright (c) 2014-2018 Paul Sokolovsky # SPDX-License-Identifier: MIT import sys import gc import micropython import utime import uio import ure as re import uerrno import uasyncio as asyncio import pkg_resources from .utils import parse_qs def get_mime_type(fname): # Provide minimal detection of important file # types to keep browsers happy if fname.endswith(".html"): return "text/html" if fname.endswith(".css"): return "text/css" if fname.endswith(".png") or fname.endswith(".jpg"): return "image" return "text/plain" def sendstream(writer, f): buf = bytearray(64) while True: l = f.readinto(buf) if not l: break yield from writer.awrite(buf, 0, l) def jsonify(writer, dict): import ujson yield from start_response(writer, "application/json") yield from writer.awrite(ujson.dumps(dict)) def start_response(writer, content_type="text/html", status="200", headers=None): yield from writer.awrite("HTTP/1.0 %s NA\r\n" % status) yield from writer.awrite("Content-Type: ") yield from writer.awrite(content_type) if not headers: yield from writer.awrite("\r\n\r\n") return yield from writer.awrite("\r\n") if isinstance(headers, bytes) or isinstance(headers, str): yield from writer.awrite(headers) else: for k, v in headers.items(): yield from writer.awrite(k) yield from writer.awrite(": ") yield from writer.awrite(v) yield from writer.awrite("\r\n") yield from writer.awrite("\r\n") def http_error(writer, status): yield from start_response(writer, status=status) yield from writer.awrite(status) class HTTPRequest: def __init__(self): pass def read_form_data(self): size = int(self.headers[b"Content-Length"]) data = yield from self.reader.read(size) form = parse_qs(data.decode()) self.form = form def parse_qs(self): form = parse_qs(self.qs) self.form = form class WebApp: def __init__(self, pkg, routes=None, serve_static=True): if routes: self.url_map = routes else: self.url_map = [] if pkg and pkg != "__main__": self.pkg = pkg.split(".", 1)[0] else: self.pkg = None if serve_static: self.url_map.append((re.compile("^/(static/.+)"), self.handle_static)) self.mounts = [] self.inited = False # Instantiated lazily self.template_loader = None self.headers_mode = "parse" def parse_headers(self, reader): headers = {} while True: l = yield from reader.readline() if l == b"\r\n": break k, v = l.split(b":", 1) headers[k] = v.strip() return headers def _handle(self, reader, writer): if self.debug > 1: micropython.mem_info() close = True try: request_line = yield from reader.readline() if request_line == b"": if self.debug >= 0: self.log.error("%s: EOF on request start" % reader) yield from writer.aclose() return req = HTTPRequest() # TODO: bytes vs str request_line = request_line.decode() method, path, proto = request_line.split() if self.debug >= 0: self.log.info('%.3f %s %s "%s %s"' % (utime.time(), req, writer, method, path)) path = path.split("?", 1) qs = "" if len(path) > 1: qs = path[1] path = path[0] #print("================") #print(req, writer) #print(req, (method, path, qs, proto), req.headers) # Find which mounted subapp (if any) should handle this request app = self while True: found = False for subapp in app.mounts: root = subapp.url #print(path, "vs", root) if path[:len(root)] == root: app = subapp found = True path = path[len(root):] if not path.startswith("/"): path = "/" + path break if not found: break # We initialize apps on demand, when they really get requests if not app.inited: app.init() # Find handler to serve this request in app's url_map found = False for e in app.url_map: pattern = e[0] handler = e[1] extra = {} if len(e) > 2: extra = e[2] if path == pattern: found = True break elif not isinstance(pattern, str): # Anything which is non-string assumed to be a ducktype # pattern matcher, whose .match() method is called. (Note: # Django uses .search() instead, but .match() is more # efficient and we're not exactly compatible with Django # URL matching anyway.) m = pattern.match(path) if m: req.url_match = m found = True break if not found: headers_mode = "skip" else: headers_mode = extra.get("headers", self.headers_mode) if headers_mode == "skip": while True: l = yield from reader.readline() if l == b"\r\n": break elif headers_mode == "parse": req.headers = yield from self.parse_headers(reader) else: assert headers_mode == "leave" if found: req.method = method req.path = path req.qs = qs req.reader = reader close = yield from handler(req, writer) else: yield from start_response(writer, status="404") yield from writer.awrite("404\r\n") #print(req, "After response write") except Exception as e: if self.debug >= 0: self.log.exc(e, "%.3f %s %s %r" % (utime.time(), req, writer, e)) if close is not False: yield from writer.aclose() if __debug__ and self.debug > 1: self.log.debug("%.3f %s Finished processing request", utime.time(), req) def mount(self, url, app): "Mount a sub-app at the url of current app." # Inspired by Bottle. It might seem that dispatching to # subapps would rather be handled by normal routes, but # arguably, that's less efficient. Taking into account # that paradigmatically there's difference between handing # an action and delegating responisibilities to another # app, Bottle's way was followed. app.url = url self.mounts.append(app) def route(self, url, **kwargs): def _route(f): self.url_map.append((url, f, kwargs)) return f return _route def add_url_rule(self, url, func, **kwargs): # Note: this method skips Flask's "endpoint" argument, # because it's alleged bloat. self.url_map.append((url, func, kwargs)) def _load_template(self, tmpl_name): if self.template_loader is None: import utemplate.source self.template_loader = utemplate.source.Loader(self.pkg, "templates") return self.template_loader.load(tmpl_name) def render_template(self, writer, tmpl_name, args=()): tmpl = self._load_template(tmpl_name) for s in tmpl(*args): yield from writer.awrite(s) def render_str(self, tmpl_name, args=()): #TODO: bloat tmpl = self._load_template(tmpl_name) return ''.join(tmpl(*args)) def sendfile(self, writer, fname, content_type=None, headers=None): if not content_type: content_type = get_mime_type(fname) try: with pkg_resources.resource_stream(self.pkg, fname) as f: yield from start_response(writer, content_type, "200", headers) yield from sendstream(writer, f) except OSError as e: if e.args[0] == uerrno.ENOENT: yield from http_error(writer, "404") else: raise def handle_static(self, req, resp): path = req.url_match.group(1) print(path) if ".." in path: yield from http_error(resp, "403") return yield from self.sendfile(resp, path) def init(self): """Initialize a web application. This is for overriding by subclasses. This is good place to connect to/initialize a database, for example.""" self.inited = True def run(self, host="127.0.0.1", port=8081, debug=False, lazy_init=False, log=None): if log is None and debug >= 0: import ulogging log = ulogging.getLogger("picoweb") if debug > 0: log.setLevel(ulogging.DEBUG) self.log = log gc.collect() self.debug = int(debug) self.init() if not lazy_init: for app in self.mounts: app.init() loop = asyncio.get_event_loop() if debug > 0: print("* Running on http://%s:%s/" % (host, port)) loop.create_task(asyncio.start_server(self._handle, host, port)) loop.run_forever() loop.close() [/mw_shl_code] 代 码 首先,我们要导入HTTP服务器设置所需要的picoweb模块,以及建立ESP32 WiFi网络连接所需要的网络模块。 [mw_shl_code=applescript,true] import picoweb import network[/mw_shl_code] 然后,连接到WiFi网络。这一段代码是通用代码,在上一篇教程:ESP32 Picoweb教程:提供JSON内容 中有详细说明。您只需要把ssid和密码变量换成您的WiFi网络信息,ESP32就能连接到WiFi网络。 请注意,我们将在程序末尾把分配给ESP32的IP地址存储到一个变量中,以便稍后将其作为参数传递给app(应用程序)实例的run(运行)方法。 [mw_shl_code=applescript,true]ssid = "yourNetworkName" password = "yourPassword" station = network.WLAN(network.STA_IF) station.active(True) station.connect(ssid, password) while station.isconnected() == False: pass ip = station.ifconfig()[/mw_shl_code] 连接到WiFi网络之后,我们就需要创建app(应用程序)实例,并声明它要监听的路由。在本例中,我们只使用一个路由对服务器返回的HTTP内部服务器错误(https://www.lifewire.com/500-internal-server-error-explained-2622938)进行测试。我们将监听“/internalerror”端点。 [mw_shl_code=applescript,true]app = picoweb.WebApp(__name__) @app.route("/internalerror") def internalError(req, resp): ##Handling function code[/mw_shl_code] 为简单起见,我们的路由处理函数会立即返回HTTP错误。当然,在实际的应用中,在向某个端点返回一条错误之前,肯定会有一些相关的控制逻辑。 只需调用picoweb模块的http_error函数(https://github.com/pfalcon/picoweb/blob/master/picoweb/__init__.py#L42)即可返回错误代码,函数参数是以字符串格式表示的客户端数据流写入器对象和错误代码。内部服务器错误对应的代码是500 [1]。您也可以对其他状态代码进行测试。 由于该函数内部使用了数据流写入器的awrite方法,所以我们需要使用yield from关键字。 [mw_shl_code=applescript,true]yield from picoweb.http_error(resp, "500")[/mw_shl_code] 最后,只需调用app(应用程序)实例的run(运行)方法启动服务器即可。最终的源代码如下所示,其中包含了函数调用(使用分配给ESP32的IP地址)。 [mw_shl_code=applescript,true]import picoweb import network ssid = "yourNetworkName" password = "yourPassword" station = network.WLAN(network.STA_IF) station.active(True) station.connect(ssid, password) while station.isconnected() == False: pass ip = station.ifconfig() app = picoweb.WebApp(__name__) @app.route("/internalerror") def internalError(req, resp): yield from picoweb.http_error(resp, "500") app.run(debug=True, host =ip[0]) [/mw_shl_code] 测试代码 要对代码进行测试,只需将脚本上传到ESP32开发板运行即可。脚本执行时,控制台上会显示一条消息,表示服务器正在监听的根路径。 您只需将URL复制到一个网页浏览器中并在后面加上internalerror字样(对应于应用程序正在监听的路由)。所得到的输出结果如图1所示。请注意,我打开了Chaome浏览器的开发者工具选项,以查看更详细的请求信息。 图1 - 返回的内部服务器错误HTTP代码。 如引言部分所述,尽管该函数名为http_error,但是实际上我们也可以用它来返回非错误代码(比如对应于新建资源的代码201 [1])。被返回的HTTP代码如图2所示。注意,我仅仅在之前的代码中把传递给http_error的状态代码从500改成了201,所以路由仍将其称为“internalerror”。 图2 - 修改的HTTP代码。 |
© 2013-2025 Comsenz Inc. Powered by Discuz! X3.4 Licensed