Python:从头创建 Asyncio (2)

算法灯塔说
• 阅读 144

引言

现在,asyncio 已成为 Python 社区中的热门话题,并且名副其实——它提供了一种非常出色的处理 I/O 密集型程序的方法!在我探索 asyncio 的过程中,我起初并不太明白它的工作原理。但随着深入学习,我意识到 asyncio 实际上是在 Python 生成器的基础上增加了一层非常便利的封装。

本文中,我将展示如何仅用 Python 生成器来构建一个 asyncio 的简化模型。接着,我会演示如何利用 await 魔法方法,将示例代码改写为使用 async 和 await 关键字。最终,我会将我的简化版本替换为官方的 asyncio 库。通过这个过程,我相信你将对 asyncio 的神奇之处有一个更深入的理解。

Sleeping

如果我们沿用之前示例中的代码,我们可以通过 yield from 的应用,为我们的任务嵌入子生成器。例如,我在这里引入了一个休眠生成器,它会在指定的时间到达之前暂停任务的执行。这种机制之所以有效,是因为 sleep 函数会连续产生 yield,直到经过了设定的秒数,然后它将跳出 while 循环。

由于 sleep 函数中没有其他 yield 语句,这将引发一个 StopIteration 异常,这个异常告诉 yield from 语句在任务函数中跳过当前的生成器,继续执行下一行代码。

import time

def sleep(seconds):
   start_time = time.time()
   while time.time() - start_time < seconds:
       yield

def task1():
   while True:
       print('Task 1')
       yield from sleep(1)

def task2():
   while True:
       print('Task 2')
       yield from sleep(5)

event_loop = [task1(), task2()]

while True:
   for task in event_loop:
       next(task)

输出:

Task 1
Task 2
Task 1
Task 1
Task 1
Task 1
Task 2
Task 1
…

Yield to Await

我们现在可以将之前的代码示例,通过应用 _await__ 魔术方法和 async 关键字,从使用 yield 转变为使用 await。如果一个类定义了 _await__ 方法,我们就可以在该类的实例前加上 await 关键字来调用这个方法。在 asyncio 框架中,你通常通过调用如 asyncio.create_task 这样的函数来处理 Task 对象。这些 Task 对象是从 asyncio 的 Future 对象派生而来的,而 Future 对象定义了 _await__ 方法。我们还可以在协程前使用 await,协程是在函数定义时加上 async 关键字生成的对象。协程和生成器函数类似,它们的执行都能够被挂起和恢复。

你可以将 await 关键字理解为 yield from 的一个变体,它附带了一些额外的验证规则。因此,当你在代码中写 await object 时,你实际上是在指示从 "object" 类的实例中调用 _await__ 方法,或者 "object" 本身可能就是另一个协程(类似于子生成器)。

实际上,你甚至可以查看 Asyncio 的源代码,发现 Future 对象中的 _await__ 方法在调用时,如果未来(或任务)尚未完成,它基本上只是执行了 yield 操作。

Python:从头创建 Asyncio (2)

要将我们在上一节中编写的代码转移到使用 async 和 await,我们首先需要创建自己的 Task 类,因为函数不能具有 await dunder 方法。下面是我想出的一个简单版本:

from queue import Queue


event_loop = Queue()

class Task():
    def __init__(self, generator):
        self.iter = generator
        self.finished = False

    def done(self):
        return self.finished

    def __await__(self):
        while not self.finished:
            yield self


def create_task(generator):
    task = Task(generator)
    event_loop.put(task)

    return task

这一次,我们改用队列而非 Python 列表来构建事件循环,这样做更合理,因为我们希望添加或移除任务的操作能够快速完成,即在常数时间内完成。

在我们的 Task 类中,我们将生成器对象保存在 self.iter 属性中,并设置 self.finished 属性为 False,用以跟踪生成器是否已经运行完毕(当生成器引发 StopIteration 异常时,表示其运行结束)。Task 对象还定义了一个 await 魔术方法,这个方法将持续地将控制权交还给事件循环,直到任务完成。完成 Task 对象的创建后,我们使用 create_task 辅助函数将它加入到事件循环中,这将安排它按计划执行。

接下来,我们将构建事件循环管理器,它负责驱动任务的执行。

def run(main):
    event_loop.put(Task(main))

    while not event_loop.empty():
        task = event_loop.get()
        try:
            task.iter.send(None)
        except StopIteration:
            task.finished = True
        else:
            event_loop.put(task)

你或许已经发现,我们的实现开始接近 asyncio 的实际 API。要启动事件循环,我们需要通过一个初始函数来调用 run。这个函数首先将主函数封装进 Task 对象,并加入到事件循环中。随后,while 循环会启动,并且在每次迭代中,通过队列来获取下一个待执行的任务。现在我们使用 task.iter.send(None) 替代了 next(task.iter),这在使用 async/await 关键字时显得有些奇特,但功能上是一致的。我们还需要将这个调用放在 try-except 块中,以便在抛出 StopIteration 异常时,可以将 task.finished 设置为 True;如果没有异常抛出,代码将执行 else 语句,这会把任务重新放回事件循环中,以便再次执行。

接下来,我们需要对 sleep 函数进行异步兼容改造。之前,我们通过一个带有 while 循环和单个 yield 的生成器函数来实现休眠功能。尽管我偏爱这种方法,但 await 关键字不能与生成器函数一起使用——它需要是一个定义了 await 魔术方法的对象或是一个协程函数。因此,为了解决这个问题,我将代码迁移到了另一个函数中,现在实际的 sleep 函数会创建一个任务对象并等待它完成。这个 await 调用将触发 Task 对象内的 await 方法,随后执行 yield,允许事件循环转向其他任务。当事件循环处理到新的 _sleep 任务时,它会检查时间,如果时间未到,同样会执行 yield,将控制权交还给事件循环。如果休眠的任务再次被事件循环调用,就像生成器保存其状态一样,协程仍在等待 sleep 函数返回。由于 sleep 函数还在等待 _sleep 任务完成,任务的 await 魔术方法将再次被调用,由于任务尚未结束,魔术方法中的 yield 将再次被执行。

import time

def _sleep(seconds):
    start_time = time.time()
    while time.time() - start_time < seconds:
        yield


async def sleep(seconds):
    task = create_task(_sleep(seconds))
    return await task

以下是所有代码的汇总:

from queue import Queue
import time


event_loop = Queue()


def _sleep(seconds):
    start_time = time.time()
    while time.time() - start_time < seconds:
        yield


async def sleep(seconds):
    task = create_task(_sleep(seconds))
    return await task


class Task():
    def __init__(self, generator):
        self.iter = generator
        self.finished = False

    def done(self):
        return self.finished

    def __await__(self):
        while not self.finished:
            yield self


def create_task(generator):
    task = Task(generator)
    event_loop.put(task)

    return task


def run(main):
    event_loop.put(Task(main))

    while not event_loop.empty():
        task = event_loop.get()
        try:
            task.iter.send(None)
        except StopIteration:
            task.finished = True
        else:
            event_loop.put(task)

既然我们已经成功构建了事件循环、任务创建机制和 sleep 函数,接下来我们可以引入名为 "jacobio.py" 的文件,并把之前使用 yield 语句的部分替换成 await 调用。同时,我们需要在那些使用了 await 的函数前加上 async 关键字,以表明这些函数是异步的,并且可以被其他代码等待执行。最后,我们还需要像在 asyncio 库中那样编写一个主函数,用于将任务排入事件循环的执行队列中。

import jacobio

async def task1():
    for _ in range(2):
        print('Task 1')
        await jacobio.sleep(1)

async def task2():
    for _ in range(3):
        print('Task 2')
        await jacobio.sleep(0)

async def main():
    one = jacobio.create_task(task1())
    two = jacobio.create_task(task2())

    await one
    await two
    
    print('done')


if __name__ == '__main__':
    jacobio.run(main())

输出:

Task 1
Task 2
Task 2
Task 2
Task 1
done

Await with AsyncIO

现在,我们可以从上面获取代码,并将所有出现的“jacobio”替换为“asyncio”,我们现在完全使用 asyncio 包!

import asyncio

async def task1():
    for _ in range(2):
        print('Task 1')
        await asyncio.sleep(1)

async def task2():
    for _ in range(3):
        print('Task 2')
        await asyncio.sleep(0)

async def main():
    one = asyncio.create_task(task1())
    two = asyncio.create_task(task2())

    await one
    await two
    
    print('done')


if __name__ == '__main__':
    asyncio.run(main())

Asyncio 在后台执行了许多复杂的操作,但我们成功地从基础的生成器出发,一步步重建了 asyncio 的核心功能!我努力使事件循环管理器的设计尽可能简洁,尽管这仅是 asyncio 工作理念的简化版,与实际的库相比,我的实现在细节上与官方源代码的执行流程有所不同。此外,既然我们现在拥有了完整的 asyncio 库的功能,就无需为了同时等待两个任务而分别创建它们;我们完全可以使用 asyncio.gather() 这样的函数来同时管理多个任务。

本文由mdnice多平台发布

点赞
收藏
评论区
推荐文章
亚瑟 亚瑟
4年前
Python Sanic 高并发服务开发指南
技术基础AsyncIOPython3.4开始引入AsyncIO(https://docs.python.org/3/library/asyncio.html)模块,使得Python也支持异步IO。3.5版本里添加了async/await关键字,使得异步IO代码编写更加方便。3.6和3.7版本继续进行了完善
Karen110 Karen110
3年前
建议收藏,22个Python迷你项目(附源码)
在使用Python的过程中,我最喜欢的就是Python的各种第三方库,能够完成很多操作。下面就给大家介绍22个通过Python构建的项目,以此来学习Python编程。大家也可根据项目的目的及提示,自己构建解决方法,提高编程水平。①骰子模拟器目的:创建一个程序来模拟掷骰子。提示:当用户询问时,使用random模块生成一个1到6之间
隔壁老王 隔壁老王
4年前
我的python总结
建议刚开始学习直接使用ANACONDAhttps://www.anaconda.com/download/配置python环境变量https://docs.python.org/3.6/using/cmdline.htmlenvvarPYTHONMALLOCPythonPEP8导包顺序1.Python标准
Karen110 Karen110
3年前
25条很棒的Python一行代码,建议收藏!
自从我用Python编写第一行代码以来,就被它的简单性、出色的可读性和特别流行的一行代码所吸引。在下面,我将给大家介绍并解释一些Python一行程序。可能有些你还不知道,但对你未来的Python项目很有用。▍1、交换两个变量 a  4 b  5a,b  b,a print(a,b)  5,4让我们通过交换两个变量作为一个简
Stella981 Stella981
3年前
25条很棒的Python一行代码,建议收藏!
点击上方“Python爬虫与数据挖掘”,进行关注回复“书籍”即可获赠Python从入门到进阶共10本电子书今日鸡汤中岁颇好道,晚家南山陲。自从我用Python编写第一行代码以来,就被它的简单性、出色的可读性和特别流行的一行代码所吸引。在下面,我将给大家介绍并解释一些Python一行程序。
Stella981 Stella981
3年前
Python快速入门到精通开发知识体系图
课程简介:Python语言已经在大数据、机器学习等领域胜出,这是毋庸置疑的事实了。它以简洁、优雅的语言风格征服了程序员和非专业程序员。并且,它还是一种非常容易学的语言。所以,学习Python性价比最高。本套课程适合零基础学员学习,是入门Python的必须课程。学完此课,你讲达到Python初级程序员的水平,具备开发基础,掌握面向对象编程思
Stella981 Stella981
3年前
Python 安装pyad库方法
问题:现在需要在Windows电脑上安装python,然后需要用到pyad这个库,安装这个库,我折腾了一下午,真是醉了自己了方法:1\.先到这个地址现在pyad的安装包:pyad下载地址2\.先不着急通过python安装它,应为安装它之前有个前提条件,是需要安装pywin32这个工具,这两个是需要配合使用的,pyad在git
Stella981 Stella981
3年前
Python 异步编程再添一利器
GINO填补了国内外asyncioORM领域的空白随着Tornado和asyncio等框架的陆续涌现,Python异步编程这个话题也在逐渐升温。在这个烧脑的异步世界里,有没有办法可以既方便快捷、又简单明了地访问数据库呢?GitHub千星项目GINO(https://www.oschina.net/action/GoToLin
Stella981 Stella981
3年前
Python asyncio 与 aiohttp 使用简单记录
asyncio的基本概念asyncio是在python3.4中被引进的异步IO库。你也可以通过python3.3的pypi来安装它。它相当的复杂,而且我不会介绍太多的细节。相反,我将会解释你需要知道些什么,以利用它来写异步的代码。简而言之,有两件事情你需要知道:协同程序和事件循环。协同程序像是方法,但是它们可以在代码中的特定点暂停和继
使用asyncio库和多线程实现高并发的异步IO操作的爬虫
摘要:本文介绍了如何使用Python的asyncio库和多线程实现高并发的异步IO操作,以提升爬虫的效率和性能。通过使用asyncio的协程和事件循环,结合多线程,我们可以同时处理多个IO任务,并实现对腾讯新闻网站的高并发访问。正文:在网络爬虫中,IO操作
异步爬虫实战:实际应用asyncio和aiohttp库构建异步爬虫
在网络爬虫的开发中,异步爬虫已经成为一种非常流行的技术。它能够充分利用计算机的资源,提高爬虫效率,并且能够处理大量的运算请求。Python中的asyncio和aiohttp库提供了强大的异步爬虫支持,使得开发者能够轻松构建高效的异步爬虫。什么是异动爬虫?为