背景 {#背景}
我们公司的产品有一些微应用依然使用的 Python 开发,由于历史原因,公司产品提供的 Python 环境是 2.7.18,所以当时选择的框架是支持 Python2 的异步框架 Tornado。
最近我有个需求是需要启动一个微服务提供几个接口,于是我本着不引入新依赖的原则打算基于平台提供的 Tornado 封装的模块去开发一个微服务。于是借此机会研究了一下这个古老的框架,这篇文章主要是记录一下在 Python2 环境下使用 Tornado 进行异步接口开发的方案。
注:本文的代码只是调试 Demo,并非实际开发代码,不要太纠结于代码质量。
Tornado 中进行异步请求 {#tornado-中进行异步请求}
首先说一下本文探讨的话题的两个关键点:
- 第一点:Python 环境是基于 Python2.7 下面,这个很重要,因为 Python2 实际上是没有异步函数的,所以在框架中实现异步并不是一件简单的事情。
- 第二点:我主要探讨的是在 Tornado 的接口的逻辑处理过程中去调用其他接口的异步行为。
使用线程池 ThreadPoolExecutor {#使用线程池-threadpoolexecutor}
这个方案是我咨询 ChatGPT 获得的方案,我的需求是希望能通过 requests
库来进行接口调用,然后需要实现异步,得到的方案是将请求放到 ThreadPoolExecutor
中,以下是实现的封装:
# -*- coding: utf-8 -*-
import logging
import tornado.web
import tornado.gen
import concurrent.futures
import requests
class BaseSDK(object):
def init(self, host, org, user='defaultUser', max_workers=8):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers)
self.api_host = "http://{host}".format(host=host)
self.headers = {
'org': str(org),
'user': user,
'Content-Type': 'application/json'
}
self.logger = logging.getLogger(name)
<span class="nd">@tornado</span><span class="o">.</span><span class="n">gen</span><span class="o">.</span><span class="n">coroutine</span>
<span class="k">def</span> <span class="nf">fetch_async</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">method</span><span class="p">,</span> <span class="n">api</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="c1"># 合并默认和自定义头部</span>
<span class="n">headers</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">headers</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">headers</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'headers'</span><span class="p">,</span> <span class="p">{}))</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">'headers'</span><span class="p">]</span> <span class="o">=</span> <span class="n">headers</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">'timeout'</span><span class="p">]</span> <span class="o">=</span> <span class="n">kwargs</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'timeout'</span><span class="p">,</span> <span class="mi">10</span><span class="p">)</span>
<span class="c1"># 拼接完整 URL</span>
<span class="n">url</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">api_host</span> <span class="o">+</span> <span class="n">api</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># 执行请求</span>
<span class="n">response</span> <span class="o">=</span> <span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">executor</span><span class="o">.</span><span class="n">submit</span><span class="p">(</span><span class="n">requests</span><span class="o">.</span><span class="n">request</span><span class="p">,</span> <span class="n">method</span><span class="o">=</span><span class="n">method</span><span class="p">,</span> <span class="n">url</span><span class="o">=</span><span class="n">url</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="n">response</span><span class="o">.</span><span class="n">raise_for_status</span><span class="p">()</span> <span class="c1"># 检查 HTTP 状态码</span>
<span class="n">data</span> <span class="o">=</span> <span class="n">response</span><span class="o">.</span><span class="n">json</span><span class="p">()</span>
<span class="k">except</span> <span class="n">requests</span><span class="o">.</span><span class="n">exceptions</span><span class="o">.</span><span class="n">RequestException</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="c1"># 请求错误</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Request failed: </span><span class="si">{url}</span><span class="s2">, error: </span><span class="si">{error}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">url</span><span class="o">=</span><span class="n">url</span><span class="p">,</span> <span class="n">error</span><span class="o">=</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)))</span>
<span class="n">data</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">except</span> <span class="ne">ValueError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="c1"># JSON 解析错误</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span>
<span class="s2">"Invalid JSON response: </span><span class="si">{url}</span><span class="s2">, error: </span><span class="si">{error}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">url</span><span class="o">=</span><span class="n">url</span><span class="p">,</span> <span class="n">error</span><span class="o">=</span><span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)))</span>
<span class="n">data</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Request succeeded: </span><span class="si">{url}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">url</span><span class="o">=</span><span class="n">url</span><span class="p">))</span>
<span class="k">raise</span> <span class="n">tornado</span><span class="o">.</span><span class="n">gen</span><span class="o">.</span><span class="n">Return</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
"""`
` 关闭线程池,释放资源`
` """`
`self.executor.shutdown(wait=True)`
`
这个方案在我看来是比较灵活的,毕竟从代码使用上看直接对 requests
的使用进行了一下处理而已,就是创建一个线程池将请求放进去。
使用内置的 AsyncHTTPClient {#使用内置的-asynchttpclient}
另一个方案是我查看 tornado
的文档看到的方案,也是我们公司的框架中使用的方案,就是直接使用 tornado
内置的异步请求类 AsyncHTTPClient
实现异步,下面这个是实现的一个封装类,这个类跟上面那种方案实现的功能是一模一样的,可以直接替换:
# -*- coding: utf-8 -*-
import json
import logging
import tornado.gen
from tornado.httpclient import AsyncHTTPClient, HTTPRequest, HTTPError
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(name)
class BaseSDK(object):
def init(self, host, org, user='defaultUser'):
self.api_host = "http://{}".format(host)
self.headers = {
'org': str(org),
'user': user,
'Content-Type': 'application/json'
}
<span class="nd">@tornado</span><span class="o">.</span><span class="n">gen</span><span class="o">.</span><span class="n">coroutine</span>
<span class="k">def</span> <span class="nf">fetch_async</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">method</span><span class="p">,</span> <span class="n">uri</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
"""
兼容 requests 参数风格的异步请求方法
:param method: 请求方法 ('GET', 'POST', 等)
:param uri: 接口地址
:param kwargs: 请求参数,兼容 requests 参数格式
- params: 查询参数 (dict),会自动附加到 URL 上
- data: 表单数据 (dict 或字符串),用于 POST/PUT 等方法
- json: JSON 数据 (dict),自动序列化为字符串
- headers: 请求头 (dict)
- timeout: 超时时间 (秒)
:return: 请求结果(解析后的 JSON 格式字典);如果状态码不是 200,返回空字典 {}
"""
# 处理参数
params = kwargs.get('params', None) # 查询参数
data = kwargs.get('data', None) # 表单数据
json_payload = kwargs.get('json', None) # JSON 数据
headers = kwargs.get('headers', self.headers) # 请求头
timeout = kwargs.get('timeout', 10) # 超时时间
<span class="c1"># 构建完整的 URL</span>
<span class="n">url</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">api_host</span> <span class="o">+</span> <span class="n">uri</span>
<span class="k">if</span> <span class="n">params</span><span class="p">:</span>
<span class="n">query_string</span> <span class="o">=</span> <span class="s2">"&"</span><span class="o">.</span><span class="n">join</span><span class="p">([</span><span class="s2">"</span><span class="si">{}</span><span class="s2">=</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span> <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">params</span><span class="o">.</span><span class="n">items</span><span class="p">()])</span>
<span class="n">url</span> <span class="o">=</span> <span class="s2">"</span><span class="si">{}</span><span class="s2">?</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">url</span><span class="p">,</span> <span class="n">query_string</span><span class="p">)</span>
<span class="c1"># 构建请求体</span>
<span class="n">body</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">json_payload</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">body</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">json_payload</span><span class="p">)</span> <span class="c1"># JSON 请求体</span>
<span class="n">headers</span><span class="p">[</span><span class="s1">'Content-Type'</span><span class="p">]</span> <span class="o">=</span> <span class="s1">'application/json'</span>
<span class="k">elif</span> <span class="n">data</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">data</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="n">body</span> <span class="o">=</span> <span class="s2">"&"</span><span class="o">.</span><span class="n">join</span><span class="p">([</span><span class="s2">"</span><span class="si">{}</span><span class="s2">=</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span> <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">items</span><span class="p">()])</span>
<span class="n">headers</span><span class="p">[</span><span class="s1">'Content-Type'</span><span class="p">]</span> <span class="o">=</span> <span class="s1">'application/x-www-form-urlencoded'</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">body</span> <span class="o">=</span> <span class="n">data</span> <span class="c1"># 如果 data 是字符串,直接使用</span>
<span class="c1"># 构建 HTTPRequest 对象</span>
<span class="n">request</span> <span class="o">=</span> <span class="n">HTTPRequest</span><span class="p">(</span>
<span class="n">url</span><span class="o">=</span><span class="n">url</span><span class="p">,</span>
<span class="n">method</span><span class="o">=</span><span class="n">method</span><span class="o">.</span><span class="n">upper</span><span class="p">(),</span>
<span class="n">headers</span><span class="o">=</span><span class="n">headers</span><span class="p">,</span>
<span class="n">body</span><span class="o">=</span><span class="n">body</span><span class="p">,</span>
<span class="n">request_timeout</span><span class="o">=</span><span class="n">timeout</span>
<span class="p">)</span>
<span class="c1"># 使用 AsyncHTTPClient 发起异步请求</span>
<span class="n">http_client</span> <span class="o">=</span> <span class="n">AsyncHTTPClient</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># 发送请求</span>
<span class="n">response</span> <span class="o">=</span> <span class="k">yield</span> <span class="n">http_client</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="n">request</span><span class="p">)</span>
<span class="c1"># 响应成功(状态码为 200)</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">response</span><span class="o">.</span><span class="n">body</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">response</span><span class="o">.</span><span class="n">body</span>
<span class="k">except</span> <span class="n">HTTPError</span> <span class="k">as</span> <span class="n">http_error</span><span class="p">:</span>
<span class="c1"># 区分非 200 响应和网络异常</span>
<span class="k">if</span> <span class="n">http_error</span><span class="o">.</span><span class="n">response</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span>
<span class="s2">"HTTPError: Non-200 status code: </span><span class="si">%s</span><span class="s2">, url: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span>
<span class="n">http_error</span><span class="o">.</span><span class="n">response</span><span class="o">.</span><span class="n">code</span><span class="p">,</span> <span class="n">url</span>
<span class="p">)</span>
<span class="n">result</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"HTTPError: Network error: </span><span class="si">%s</span><span class="s2">, url: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">http_error</span><span class="p">,</span> <span class="n">url</span><span class="p">)</span>
<span class="n">result</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Request failed: </span><span class="si">%s</span><span class="s2">, url: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">e</span><span class="p">,</span> <span class="n">url</span><span class="p">)</span>
<span class="n">result</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">raise</span> <span class="n">tornado</span><span class="o">.</span><span class="n">gen</span><span class="o">.</span><span class="n">Return</span><span class="p">(</span><span class="n">result</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">pass</span>
这个方案其实很简单,直接使用 AsyncHTTPClient
和 HTTPRequest
就可以进行异步请求,上面很多代码是在对请求参数进行处理,目的是为了兼容 requests
的请求参数。
开发异步接口 {#开发异步接口}
上面的两个类实现了同样的封装,可以作为基类,下面是一个实现具体请求的子类:
# -*- coding: utf-8 -*-
import tornado.gen
from base_sdk import BaseSDK
class CMDBSDK(BaseSDK):
def init(self, host, org, user='defaultUser'):
super(CMDBSDK, self).init(host, org, user)
<span class="nd">@tornado</span><span class="o">.</span><span class="n">gen</span><span class="o">.</span><span class="n">coroutine</span>
<span class="k">def</span> <span class="nf">instance_api_import_instance</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">object_id</span><span class="p">,</span> <span class="n">payload</span><span class="p">):</span>
<span class="n">api</span> <span class="o">=</span> <span class="s1">'/object/</span><span class="si">{object_id}</span><span class="s1">/instance/_import'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">object_id</span><span class="o">=</span><span class="n">object_id</span><span class="p">)</span>
<span class="n">data</span> <span class="o">=</span> <span class="k">yield</span> <span class="bp">self</span><span class="o">.</span><span class="n">fetch_async</span><span class="p">(</span><span class="s1">'POST'</span><span class="p">,</span> <span class="n">api</span><span class="p">,</span> <span class="n">json</span><span class="o">=</span><span class="n">payload</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">tornado</span><span class="o">.</span><span class="n">gen</span><span class="o">.</span><span class="n">Return</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
这个类里面可以封装具体的请求,用来异步请求其他平台的接口。
然后下面是一个开发的接口类:
# -*- coding: utf-8 -*-
import json
import tornado.web
import tornado.gen
from sdk.cmdb import CMDBSDK
class CMDBImportHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def post(self, pk):
cmdb_sdk = CMDBSDK('100.88.88.201:8079', '777777')
<span class="k">try</span><span class="p">:</span>
<span class="n">object_id</span> <span class="o">=</span> <span class="n">pk</span>
<span class="n">payload</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">request</span><span class="o">.</span><span class="n">body</span><span class="p">)</span>
<span class="c1"># 确保请求数据有效</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">object_id</span> <span class="ow">or</span> <span class="ow">not</span> <span class="n">payload</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_status</span><span class="p">(</span><span class="mi">400</span><span class="p">)</span> <span class="c1"># 错误请求</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write</span><span class="p">({</span><span class="s2">"message"</span><span class="p">:</span> <span class="s2">"Invalid input data"</span><span class="p">})</span>
<span class="k">return</span>
<span class="c1"># 调用 CMDB SDK 的导入接口</span>
<span class="n">data</span> <span class="o">=</span> <span class="k">yield</span> <span class="n">cmdb_sdk</span><span class="o">.</span><span class="n">instance_api_import_instance</span><span class="p">(</span><span class="n">object_id</span><span class="p">,</span> <span class="n">payload</span><span class="p">)</span>
<span class="c1"># 返回成功响应</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_status</span><span class="p">(</span><span class="mi">200</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write</span><span class="p">({</span>
<span class="s2">"message"</span><span class="p">:</span> <span class="s2">"POST request successful"</span><span class="p">,</span>
<span class="s2">"data"</span><span class="p">:</span> <span class="n">data</span>
<span class="p">})</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="c1"># 错误处理,返回错误响应</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_status</span><span class="p">(</span><span class="mi">500</span><span class="p">)</span> <span class="c1"># 错误请求</span>
<span class="bp">self</span><span class="o">.</span><span class="n">write</span><span class="p">({</span>
<span class="s2">"message"</span><span class="p">:</span> <span class="s2">"POST request failed"</span><span class="p">,</span>
<span class="s2">"error"</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
<span class="p">})</span>
<span class="k">finally</span><span class="p">:</span>
<span class="n">cmdb_sdk</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
启动服务 {#启动服务}
下面是一个启动服务的文件,简单启动微服务:
# -*- coding: utf-8 -*-
import tornado.ioloop
import tornado.web
import tornado.httpserver
from handlers.handlers import CMDBSearchHandler,CMDBImportHandler
def make_app():
"""
创建 Tornado 应用程序实例并注册路由
"""
return tornado.web.Application([
(r"/cmdb/search/(?P<pk>\w+)", CMDBSearchHandler),
(r"/cmdb/import/(?P<pk>\w+)", CMDBImportHandler),
])
def start_server(port, num_processes):
"""
启动 Tornado 服务并分配多线程
:param port: 监听的端口
:param num_processes: 启动的线程数(一般设置为 CPU 核心数)
"""
app = make_app()
server = tornado.httpserver.HTTPServer(app)
<span class="c1"># 设置监听端口</span>
<span class="n">server</span><span class="o">.</span><span class="n">bind</span><span class="p">(</span><span class="n">port</span><span class="p">)</span>
<span class="c1"># 启动多线程服务</span>
<span class="n">server</span><span class="o">.</span><span class="n">start</span><span class="p">(</span><span class="n">num_processes</span><span class="p">)</span> <span class="c1"># num_processes 为线程数,0 表示使用 CPU 核心数</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"Server started on port </span><span class="si">{}</span><span class="s2"> with </span><span class="si">{}</span><span class="s2"> processes."</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">port</span><span class="p">,</span> <span class="n">num_processes</span><span class="p">))</span>
<span class="c1"># 启动 Tornado I/O 循环</span>
<span class="n">tornado</span><span class="o">.</span><span class="n">ioloop</span><span class="o">.</span><span class="n">IOLoop</span><span class="o">.</span><span class="n">current</span><span class="p">()</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
if` `name` `==` `"main":`
`try:`
`# 启动服务,指定线程数`
`start_server(8888,` `4)`
`except` `KeyboardInterrupt:`
`print("\nServer stopped by user.")`
`except` `Exception` `as` `e:`
`print("Error starting server: {}".format(e))`
`
总结 {#总结}
Tornado 在 Python2 中实现异步操作,需要将异步的操作放到一个函数中,并且使用装饰器 @tornado.gen.coroutine
进行封装,然后在使用的时候使用 yield
获取调用结果。而在接口返回中,不能使用 return
返回结果,应该使用 raise tornado.gen.Return(data)
这种方式。
参考文档: