支持异步的Generator“递归”

数字码影先锋
• 阅读 3603

Python的generator最常用的方式就是作为迭代器使用,在Python中,可迭代对象是非常的实用。但是generator远比迭代器来得强大,从某版本开始,generator就拥有send方法了,这使得generator具有了在执行过程中接收外部输入的值后继续执行的能力。

当我们持有一个generator之后,我们可以做什么呢?

我们可以获得它的输出,并且给它传值。这里,隐含了一种特殊的模式,generator输出任务,而我们传入任务的执行结果。这个过程看起来就像一次函数调用一样:)

这里最关键的一点是,我们持有了任务的处理过程,而generator并不关心任务是如何被处理的。实际上,我们可以直接执行,或者丢给异步引擎,或者丢到线程池,或者干脆提交到远程机器上执行。我们把这个分配任务的部分叫做调度器吧:)


递归

如果generator生成的任务本身也是generator呢?我们应当如何让generator完成递归呢?

如何返回值?

其实我们直接用yield返回值即可。但是我们需要把任务和值区分开来,因为python并没有提供什么有效的方式能把这两者分离。所以我们通过yield出的值的类型判断即可,我们可以定义一个接口,所有持有某个方法的对象都是任务。

调度器的困境

这里有一个重要的问题是,也就是generator的下一次操作,取决于任务是否完成。而这个操作是由任务决定的,调度器无法做到这一点。这导致一个问题,调度器不能直接控制generator的执行,它需要把控制的操作下传给任务,让任务在结束后自动完成这个操作。只有这样,调度器才不需要独立的线程或者额外的方式进行控制,因为它的触发是被动的。

执行任务

任务的执行需要接受一个callback和一个error_callback函数,当任务完成的时候,它执callback,出现错误则执行error_callback

我们需要把对于generator的控制封装到一个callback中,使得任务可以调用这个函数,完成它的功能。我们可以把重试的函数作为error_callback传入任务,这使得任务在失败之后可以被重试。

实现

下面是一个实现。包含了如下的一些特性。

  • 调度器可以作为装饰器使用(如果忽略错误和返回值)
  • 对于正常的函数,它能够直接返回结果(传递给callback)
  • 对于普通的generator输出,它能把generator的输出作为参数传递给callback
  • 所有的任务对象都包含一个dojob方法,它接受callbackerr_callback,用于完成任务
  • 任务包含了重试机制,当任务失败次数达到限额之后,整个任务会直接失败(整个递归过程)
RecTask任务
  • 默认的RecTask类型,是常规的python函数调用,包含函数和它的参数。
  • RecTask方法包含一个transform方法,用于对任务函数进行变换(完全是一个约定)。这个方法不会修改原始的任务函数,因为一个可变对象在重复调用的过程中会出现难以预计的问题。
  • RecTaskrun方法,用于接受新的任务函数(如果是None则直接执行原始的函数)
  • 通过继承RecTask,可以构造其它的任务执行方式。比如通过异步引擎来执行任务。这也使得异步架构能够完成一些递归任务:)

import sys def rec_gen(func, callback=None, err_callback=None): ''' callback: run after func finish ''' def trans_func(*args, **kwargs): def error_do(e): print('@rec_func_error:', e, file=sys.stderr) if err_callback is not None: err_callback() try: g = func(*args, **kwargs) except Exception as e: error_do(e) return if not isinstance(g, types.GeneratorType): #return if g is not generator if callback is not None: callback(g) return ans = [] def go_through(it=None): try: em = g.send(it) if not hasattr(em, 'dojob'): ans.append(em) go_through(None) else: try_count = 0 def todo_next(try_limit=10): ''' child proc with retry ''' nonlocal try_count if try_count < try_limit: try_count += 1 if try_count > 1: print('@retry:# try_count: %s' % try_count, file=sys.stderr) em.dojob(callback=go_through, err_callback=todo_next) else: #raise Exception('can not recover error after %s retrys' % try_limit) print('@rec_recover_failed: after try %s time at one node' % try_limit, file=sys.stderr) print('@task is complete failed:(') if err_callback is not None: #let the task tree fail g.close() err_callback(0) todo_next(10) except StopIteration as st: if callback is not None: callback(*ans) return except Exception as e: g.close() error_do(e) return go_through() return trans_func from functools import partial class RecTask(object): def __init__(self, func, *args, **kwargs): self.func = func self.args = args self.kwargs = kwargs def dojob(self, callback=None, err_callback=None): self.run(self.transform(partial(rec_gen, callback=callback, err_callback=err_callback))) def transform(self, f): return f(self.func) def run(self, func=None): if func is None: func = self.func return func(*self.args, **self.kwargs) if __name__ == '__main__': sys.setrecursionlimit(10000000) def fib(n): if n <= 1: yield n else: yield (yield RecTask(fib, n-1)) + (yield RecTask(fib, n-2)) pfib = rec_gen(fib, lambda x: print(x)) for i in range(15): pfib(i)

奉上一段fibonacci序列的代码。你可以单纯的把yield RecTask看成apply。这个程序的一个问题是,它对栈空间消耗特别大:)

sys.setrecursionlimit(10000000)

def fib(n):
    if n <= 1:
        yield n
    else:
        yield (yield RecTask(fib, n-1)) + (yield RecTask(fib, n-2))

pfib = rec_gen(fib, lambda x: print(x))
for i in range(15):
    pfib(i)
一个典型的异步HTTPTask

我们假定sender(request, callback)是一个异步接口。那么我们的异步Task任务如下:)

class HTTPTask(RecTask):
    def __init__(self, sender, req, callback):
        self.sender = sender
        self.req = req
        self.callback = callback

    def transform(self, f):
        return f(self.callback)

    def run(self, callback=None):
        if callback is None:
            callback = self.callback
        self.sender(self.req, callback)


点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Ruby on Rails 学习笔记(四)
当页面需要保持风格一致时,最简单的方法是采用模板。详见如下代码:<!doctype html<html <head  <meta charset"UTF8"  <meta name"Generator" content"EditPlus®"  <meta name"Author
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
3年前
Python 迭代器与生成器
python迭代器与生成器说到python迭代器,首先要明确两个概念:Iterable和Iterator,这两个概念还有Generator都是定义在collections模块里的。Iterable意为“可迭代的(对象)”,包括如下两种:1、实现了__getitem__(self,
Stella981 Stella981
3年前
OpenCV访问像素点
三种方法迭代器创建一个Mat::Iterator对象it,通过itMat::begin()来的到迭代首地址,递增迭代器知道itMat::end()结束迭代;while(it!Scr.end<Vec3b()){//(it)00;//蓝色通道置零;
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这