聊聊rocketmq-client-go的defaultProducer

逻辑踏云人
• 阅读 1069

本文主要研究一下rocketmq-client-go的defaultProducer

defaultProducer

rocketmq-client-go-v2.0.0/producer/producer.go

type defaultProducer struct {
    group       string
    client      internal.RMQClient
    state       int32
    options     producerOptions
    publishInfo sync.Map
    callbackCh  chan interface{}

    interceptor primitive.Interceptor
}
  • defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor

NewDefaultProducer

rocketmq-client-go-v2.0.0/producer/producer.go

func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
    defaultOpts := defaultProducerOptions()
    for _, apply := range opts {
        apply(&defaultOpts)
    }
    srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
    if err != nil {
        return nil, errors.Wrap(err, "new Namesrv failed.")
    }
    if !defaultOpts.Credentials.IsEmpty() {
        srvs.SetCredentials(defaultOpts.Credentials)
    }
    defaultOpts.Namesrv = srvs

    producer := &defaultProducer{
        group:      defaultOpts.GroupName,
        callbackCh: make(chan interface{}),
        options:    defaultOpts,
    }
    producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)

    producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)

    return producer, nil
}
  • NewDefaultProducer方法通过internal.NewNamesrv创建NameServerAddrs,之后实例化defaultProducer,然后实例化internal.GetOrNewRocketMQClient及primitive.ChainInterceptors

Start

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) Start() error {
    atomic.StoreInt32(&p.state, int32(internal.StateRunning))
    if len(p.options.NameServerAddrs) == 0 {
        p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName)
    }

    p.client.RegisterProducer(p.group, p)
    p.client.Start()
    return nil
}
  • Start方法之执行p.client.RegisterProducer及p.client.Start()

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) Shutdown() error {
    atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
    p.client.UnregisterProducer(p.group)
    p.client.Shutdown()
    return nil
}
  • Shutdown方法执行p.client.UnregisterProducer及p.client.Shutdown()

SendSync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Message) (*primitive.SendResult, error) {
    if err := p.checkMsg(msgs...); err != nil {
        return nil, err
    }

    msg := p.encodeBatch(msgs...)

    resp := new(primitive.SendResult)
    if p.interceptor != nil {
        primitive.WithMethod(ctx, primitive.SendSync)
        producerCtx := &primitive.ProducerCtx{
            ProducerGroup:     p.group,
            CommunicationMode: primitive.SendSync,
            BornHost:          utils.LocalIP,
            Message:           *msg,
            SendResult:        resp,
        }
        ctx = primitive.WithProducerCtx(ctx, producerCtx)

        err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error {
            var err error
            realReq := req.(*primitive.Message)
            realReply := reply.(*primitive.SendResult)
            err = p.sendSync(ctx, realReq, realReply)
            return err
        })
        return resp, err
    }

    err := p.sendSync(ctx, msg, resp)
    return resp, err
}
  • SendSync方法首先通过p.checkMsg校验消息,然后通过p.encodeBatch编码,之后对于p.interceptor不为null的执行p.interceptor,最后执行p.sendSync(ctx, msg, resp)

sendSync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {

    retryTime := 1 + p.options.RetryTimes

    var (
        err error
    )

    if p.options.Namespace != "" {
        msg.Topic = p.options.Namespace + "%" + msg.Topic
    }

    var producerCtx *primitive.ProducerCtx
    for retryCount := 0; retryCount < retryTime; retryCount++ {
        mq := p.selectMessageQueue(msg)
        if mq == nil {
            err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
            continue
        }

        addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
        if addr == "" {
            return fmt.Errorf("topic=%s route info not found", mq.Topic)
        }

        if p.interceptor != nil {
            producerCtx = primitive.GetProducerCtx(ctx)
            producerCtx.BrokerAddr = addr
            producerCtx.MQ = *mq
        }

        res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
        if _err != nil {
            err = _err
            continue
        }
        return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
    }
    return err
}
  • sendSync会重试retryCount,每次是先通过p.selectMessageQueue(msg)选择mq,然后通过p.options.Namesrv.FindBrokerAddrByName寻找addr,最后执行p.client.InvokeSync(ctx, addr, p.buildSendRequest

SendAsync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, *primitive.SendResult, error), msgs ...*primitive.Message) error {
    if err := p.checkMsg(msgs...); err != nil {
        return err
    }

    msg := p.encodeBatch(msgs...)

    if p.interceptor != nil {
        primitive.WithMethod(ctx, primitive.SendAsync)

        return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
            return p.sendAsync(ctx, msg, f)
        })
    }
    return p.sendAsync(ctx, msg, f)
}
  • SendAsync方法主要是执行p.sendAsync(ctx, msg, f)

sendAsync

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
    if p.options.Namespace != "" {
        msg.Topic = p.options.Namespace + "%" + msg.Topic
    }
    mq := p.selectMessageQueue(msg)
    if mq == nil {
        return errors.Errorf("the topic=%s route info not found", msg.Topic)
    }

    addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
    if addr == "" {
        return errors.Errorf("topic=%s route info not found", mq.Topic)
    }

    ctx, _ = context.WithTimeout(ctx, 3*time.Second)
    return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
        resp := new(primitive.SendResult)
        if err != nil {
            h(ctx, nil, err)
        } else {
            p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
            h(ctx, resp, nil)
        }
    })
}
  • sendAsync主要是执行p.client.InvokeAsync

SendOneWay

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Message) error {
    if err := p.checkMsg(msgs...); err != nil {
        return err
    }

    msg := p.encodeBatch(msgs...)

    if p.interceptor != nil {
        primitive.WithMethod(ctx, primitive.SendOneway)
        return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
            return p.SendOneWay(ctx, msg)
        })
    }

    return p.sendOneWay(ctx, msg)
}
  • SendOneWay主要是执行p.sendOneWay(ctx, msg)

sendOneWay

rocketmq-client-go-v2.0.0/producer/producer.go

func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
    retryTime := 1 + p.options.RetryTimes

    if p.options.Namespace != "" {
        msg.Topic = p.options.Namespace + "%" + msg.Topic
    }

    var err error
    for retryCount := 0; retryCount < retryTime; retryCount++ {
        mq := p.selectMessageQueue(msg)
        if mq == nil {
            err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
            continue
        }

        addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
        if addr == "" {
            return fmt.Errorf("topic=%s route info not found", mq.Topic)
        }

        _err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
        if _err != nil {
            err = _err
            continue
        }
        return nil
    }
    return err
}
  • sendOneWay主要是重试执行p.client.InvokeOneWay

小结

defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptor;它提供了NewDefaultProducer、Start、Shutdown、SendSync、SendAsync、SendOneWay方法

doc

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Easter79 Easter79
3年前
vue+element 表格formatter数据格式化并且插入html标签
前言   vue中element框架,其中表格组件,我既要行内数据格式化,又要插入html标签一贯思维,二者不可兼得也一、element表格数据格式化  !(https://oscimg.oschina.net/oscnet/3c43a1cb3cbdeb5b5ad58acb45a42612b00.p
待兔 待兔
1年前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
科工人 科工人
4年前
聊聊golang的DDD项目结构
序本文主要研究一下golang的DDD项目结构interfacesfoodappserver/interfacesinterfacesgit:(master)tree.|____fileupload||____fileformat.go||____fileupload.go|____food_handler.go|__
Wesley13 Wesley13
3年前
jmxtrans+influxdb+grafana监控zookeeper实战
序本文主要研究一下如何使用jmxtransinfluxdbgranfa监控zookeeper配置zookeeperjmx在conf目录下新增zookeeperenv.sh,并使用chmodx赋予执行权限,内容如下JMXLOCALONLYfalseJMXDISABLEfals
Easter79 Easter79
3年前
storm drpc实例
序本文主要演示一下stormdrpc实例配置version:'2'services:supervisor:image:stormcontainer_name:supervisorcommand:stormsupe
Wesley13 Wesley13
3年前
redis的HyperLogLog实战
序本文主要研究一下redis的HyperLogLog的用场相关命令pfadd每添加一个元素的复杂度为O(1)127.0.0.1:6379pfadduv0907uid1uid2uid3(integer)1添加元素到HyperLogLog中,如果内部有变动返回1,没有
Wesley13 Wesley13
3年前
4cast
4castpackageloadcsv.KumarAwanish发布:2020122117:43:04.501348作者:KumarAwanish作者邮箱:awanish00@gmail.com首页:
Stella981 Stella981
3年前
RedisTemplate读取slowlog
序本文主要研究一下如何使用RedisTemplate(lettuce类库)读取slowlogmaven<dependency<groupIdorg.springframework.boot</groupId<artifactIdspringbootstarterdata