聊聊machinery的TaskState

樊岐
• 阅读 1776

本文主要研究一下machinery的TaskState

TaskState

const (
    // StatePending - initial state of a task
    StatePending = "PENDING"
    // StateReceived - when task is received by a worker
    StateReceived = "RECEIVED"
    // StateStarted - when the worker starts processing the task
    StateStarted = "STARTED"
    // StateRetry - when failed task has been scheduled for retry
    StateRetry = "RETRY"
    // StateSuccess - when the task is processed successfully
    StateSuccess = "SUCCESS"
    // StateFailure - when processing of the task fails
    StateFailure = "FAILURE"
)

type TaskState struct {
    TaskUUID  string        `bson:"_id"`
    TaskName  string        `bson:"task_name"`
    State     string        `bson:"state"`
    Results   []*TaskResult `bson:"results"`
    Error     string        `bson:"error"`
    CreatedAt time.Time     `bson:"created_at"`
    TTL       int64         `bson:"ttl,omitempty"`
}

type TaskResult struct {
    Type  string      `bson:"type"`
    Value interface{} `bson:"value"`
}

// NewPendingTaskState ...
func NewPendingTaskState(signature *Signature) *TaskState {
    return &TaskState{
        TaskUUID:  signature.UUID,
        TaskName:  signature.Name,
        State:     StatePending,
        CreatedAt: time.Now().UTC(),
    }
}

// NewReceivedTaskState ...
func NewReceivedTaskState(signature *Signature) *TaskState {
    return &TaskState{
        TaskUUID: signature.UUID,
        State:    StateReceived,
    }
}

// NewStartedTaskState ...
func NewStartedTaskState(signature *Signature) *TaskState {
    return &TaskState{
        TaskUUID: signature.UUID,
        State:    StateStarted,
    }
}

// NewSuccessTaskState ...
func NewSuccessTaskState(signature *Signature, results []*TaskResult) *TaskState {
    return &TaskState{
        TaskUUID: signature.UUID,
        State:    StateSuccess,
        Results:  results,
    }
}

// NewFailureTaskState ...
func NewFailureTaskState(signature *Signature, err string) *TaskState {
    return &TaskState{
        TaskUUID: signature.UUID,
        State:    StateFailure,
        Error:    err,
    }
}

// NewRetryTaskState ...
func NewRetryTaskState(signature *Signature) *TaskState {
    return &TaskState{
        TaskUUID: signature.UUID,
        State:    StateRetry,
    }
}

// IsCompleted returns true if state is SUCCESS or FAILURE,
// i.e. the task has finished processing and either succeeded or failed.
func (taskState *TaskState) IsCompleted() bool {
    return taskState.IsSuccess() || taskState.IsFailure()
}

// IsSuccess returns true if state is SUCCESS
func (taskState *TaskState) IsSuccess() bool {
    return taskState.State == StateSuccess
}

// IsFailure returns true if state is FAILURE
func (taskState *TaskState) IsFailure() bool {
    return taskState.State == StateFailure
}
TaskState定义了PENDING、RECEIVED、STARTED、RETRY、SUCCESS、FAILURE状态;TaskState定义了TaskUUID、TaskName、State、Results、Error、CreatedAt、TTL属性;它提供了NewPendingTaskState、NewReceivedTaskState、NewStartedTaskState、NewSuccessTaskState、NewFailureTaskState、NewRetryTaskState方法来根据Signature来创建不同state的TaskState;另外还提供了IsCompleted、IsSuccess、IsFailure方法

Signature

// Signature represents a single task invocation
type Signature struct {
    UUID           string
    Name           string
    RoutingKey     string
    ETA            *time.Time
    GroupUUID      string
    GroupTaskCount int
    Args           []Arg
    Headers        Headers
    Priority       uint8
    Immutable      bool
    RetryCount     int
    RetryTimeout   int
    OnSuccess      []*Signature
    OnError        []*Signature
    ChordCallback  *Signature
    //MessageGroupId for Broker, e.g. SQS
    BrokerMessageGroupId string
    //ReceiptHandle of SQS Message
    SQSReceiptHandle string
    // StopTaskDeletionOnError used with sqs when we want to send failed messages to dlq,
    // and don't want machinery to delete from source queue
    StopTaskDeletionOnError bool
    // IgnoreWhenTaskNotRegistered auto removes the request when there is no handeler available
    // When this is true a task with no handler will be ignored and not placed back in the queue
    IgnoreWhenTaskNotRegistered bool
}

// Arg represents a single argument passed to invocation fo a task
type Arg struct {
    Name  string      `bson:"name"`
    Type  string      `bson:"type"`
    Value interface{} `bson:"value"`
}

// Headers represents the headers which should be used to direct the task
type Headers map[string]interface{}

// NewSignature creates a new task signature
func NewSignature(name string, args []Arg) (*Signature, error) {
    signatureID := uuid.New().String()
    return &Signature{
        UUID: fmt.Sprintf("task_%v", signatureID),
        Name: name,
        Args: args,
    }, nil
}
Signature代表对task的调用,它定义了UUID、Name、RoutingKey、ETA、GroupUUID、GroupTaskCount、Args、Headers、Priority、Immutable、RetryCount、RetryTimeout、OnSuccess、OnError、ChordCallback、BrokerMessageGroupId、SQSReceiptHandle、StopTaskDeletionOnError、IgnoreWhenTaskNotRegistered属性

Backend

// Backend represents a Redis result backend
type Backend struct {
    common.Backend
    host     string
    password string
    db       int
    pool     *redis.Pool
    // If set, path to a socket file overrides hostname
    socketPath string
    redsync    *redsync.Redsync
    redisOnce  sync.Once
    common.RedisConnector
}

// SetStatePending updates task state to PENDING
func (b *Backend) SetStatePending(signature *tasks.Signature) error {
    conn := b.open()
    defer conn.Close()

    taskState := tasks.NewPendingTaskState(signature)
    return b.updateState(conn, taskState)
}

// SetStateReceived updates task state to RECEIVED
func (b *Backend) SetStateReceived(signature *tasks.Signature) error {
    conn := b.open()
    defer conn.Close()

    taskState := tasks.NewReceivedTaskState(signature)
    b.mergeNewTaskState(conn, taskState)
    return b.updateState(conn, taskState)
}

// SetStateStarted updates task state to STARTED
func (b *Backend) SetStateStarted(signature *tasks.Signature) error {
    conn := b.open()
    defer conn.Close()

    taskState := tasks.NewStartedTaskState(signature)
    b.mergeNewTaskState(conn, taskState)
    return b.updateState(conn, taskState)
}

// SetStateRetry updates task state to RETRY
func (b *Backend) SetStateRetry(signature *tasks.Signature) error {
    conn := b.open()
    defer conn.Close()

    taskState := tasks.NewRetryTaskState(signature)
    b.mergeNewTaskState(conn, taskState)
    return b.updateState(conn, taskState)
}

// SetStateSuccess updates task state to SUCCESS
func (b *Backend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error {
    conn := b.open()
    defer conn.Close()

    taskState := tasks.NewSuccessTaskState(signature, results)
    b.mergeNewTaskState(conn, taskState)
    return b.updateState(conn, taskState)
}

// SetStateFailure updates task state to FAILURE
func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error {
    conn := b.open()
    defer conn.Close()

    taskState := tasks.NewFailureTaskState(signature, err)
    b.mergeNewTaskState(conn, taskState)
    return b.updateState(conn, taskState)
}

// GetState returns the latest task state
func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) {
    conn := b.open()
    defer conn.Close()

    return b.getState(conn, taskUUID)
}
此Backend是基于Redis实现的,它提供了SetStatePending、SetStateReceived、SetStateStarted、SetStateRetry、SetStateSuccess、SetStateFailure、GetState方法

小结

machinery的TaskState定义了PENDING、RECEIVED、STARTED、RETRY、SUCCESS、FAILURE状态;TaskState定义了TaskUUID、TaskName、State、Results、Error、CreatedAt、TTL属性;它提供了NewPendingTaskState、NewReceivedTaskState、NewStartedTaskState、NewSuccessTaskState、NewFailureTaskState、NewRetryTaskState方法来根据Signature来创建不同state的TaskState;另外还提供了IsCompleted、IsSuccess、IsFailure方法。

doc

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
科工人 科工人
4年前
聊聊golang的DDD项目结构
序本文主要研究一下golang的DDD项目结构interfacesfoodappserver/interfacesinterfacesgit:(master)tree.|____fileupload||____fileformat.go||____fileupload.go|____food_handler.go|__
Wesley13 Wesley13
4年前
jmxtrans+influxdb+grafana监控zookeeper实战
序本文主要研究一下如何使用jmxtransinfluxdbgranfa监控zookeeper配置zookeeperjmx在conf目录下新增zookeeperenv.sh,并使用chmodx赋予执行权限,内容如下JMXLOCALONLYfalseJMXDISABLEfals
Stella981 Stella981
4年前
Python+Selenium自动化篇
本篇文字主要学习selenium定位页面元素的集中方法,以百度首页为例子。0.元素定位方法主要有:id定位:find\_element\_by\_id('')name定位:find\_element\_by\_name('')class定位:find\_element\_by\_class\_name(''
Wesley13 Wesley13
4年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
4年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Stella981 Stella981
4年前
RedisTemplate读取slowlog
序本文主要研究一下如何使用RedisTemplate(lettuce类库)读取slowlogmaven<dependency<groupIdorg.springframework.boot</groupId<artifactIdspringbootstarterdata
Stella981 Stella981
4年前
Linux日志安全分析技巧
0x00前言我正在整理一个项目,收集和汇总了一些应急响应案例(不断更新中)。GitHub地址:https://github.com/Bypass007/EmergencyResponseNotes本文主要介绍Linux日志分析的技巧,更多详细信息请访问Github地址,欢迎Star。0x01日志简介Lin