RocketMQ学习二-Producer发送消息流程

码海银月狩
• 阅读 1294

源码使用是版本是4.5.0
这章主要有

  1. MQProducer API介绍
  2. DefaultMQProducer#start()方法分析
  3. Producer发送同步消息的过程。

MQProducer API

    //启动消息发送者,在进行消息发送之前须调用此方法
    void start() throws MQClientException;

    //关闭消息发送者,不再需要此发送者时进行关闭
    void shutdown();

    //根据 Topic 查询所有的消息消费队列列表
    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;

    //发送消息,同步的方式,未指定超时时间默认为3S
    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;

    //发送消息,异步的方式
    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;

    //发送消息,oneway的方式
    void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException;

    //指定消息队列进行消息发送,这里为同步,重载的方法有异步,指定超时时间,oneway
    SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException;

    //消息发送时使用自定义队列负载机制,由 MessageQueueSelector 实现,Object arg 为传入 selector 中的参数。重载的方法有异步,指定超时,oneway
    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

    //发送事务消息,事务消息只有同步发送方式
TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException;

    //批量发送消息
    SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;

主要的API有:

  1. 启动与关闭Producer
  2. 发送同步消息,异步与oneway消息
  3. 发送指定超时时间消息
  4. 发送批量消息

DefaultMQProducer#start()方法分析


public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();
                // 注释3.3.2:改变名称为PID@host
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                //初始化mQClientFactory为MQClientInstance,并将该实例加入factoryTable
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //将producer注册到MQClientInstance.producerTable
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                //保存topic对应的路由信息
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    //启动MQClientInstance
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }
public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    // 注释5.4.1:pull线程池启动
                    //执行PullMessageService#pullMessage方法,以后台线程运行
                    this.pullMessageService.start();
                    // Start rebalance service
                    //给consumer分配队列,以后台线程运行
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

逻辑流程:

  1. 初始化mQClientFactory为MQClientInstance,并将该实例加入factoryTable
  2. 将producer注册到MQClientInstance.producerTable
  3. 保存topic对应的路由信息
  4. 启动MQClientInstance

关于启动MQClientInstance有如下逻辑:

  1. 启动Netty客户端,注意这里并没有创建连接,在producer发送消息的时候创建连接
  2. 启动一系列定时任务
  3. 在while循环里不间断拉取消息,以后台线程方式运行
  4. 给consumer分配队列,以后台线程运行

关于启动一系列定时任务有:

  1. 2分钟获取一次NameServer地址
  2. 默认30S更新一次topic的路由信息,频率可配置
  3. 30秒对Broker发送一次心跳检测,并将下线的broker删除
  4. 5秒持久化一次consumer的offset
  5. 每分钟调整线程池大小,不过里面代码注释掉了

定时任务使用的是scheduleAtFixedRate,如果上一次任务超时则一下次任务会立即执行。

发送同步消息

这里以发送同步消息为例,接口定义在MQProducer,实现是在DefaultMQProducer类里,而DefaultMQProducer又将真正实现细节委托给DefaultMQProducerImpl了:

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //这里会使用默认的超时时间3s
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //这里进一步明确指定了使用同步的方式发送
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //判断服务状态是否是RUNNING,默认值是CREATE_JUST,在调用start()方法时会修改成RUNNING
        this.makeSureStateOK();
        //这里会校验topic的合法性,msg body是否为空,是否有超过最大4M
        Validators.checkMessage(msg, this.defaultMQProducer);
        //使用Random生成一个随机值(Random使用CAS+AtomicLong实现,在高并发场景下性能不会特别好,可以考虑使用ThreadLocalRandom)
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 注释3.4.2:查找Topic的路由信息,先从本地找,本地没有会去NameServer寻找,如果还会找到抛异常
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            //如果是同步方式会重试3次,异步不重试
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                /**
                 * 根据每个broker处理的时间选择发送队列
                 * 每个队列MessageQueue里会有topic,brokerName及队列ID
                 */
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        //获取topic与选择队列整体超时了,需要抛出异常
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        //真正发送消息逻辑,这里的超时时间需要减去获取topic与选择队列的时间
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        //发送完消息后,维护此broker与发送消息所花时间的对应关系,这是上面selectOneMessageQueue选择broker的一个参考
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                //如果是同步,发送失败则根据条件尝试重试
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }

        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

同步发送消息的逻辑还是比较简单的:

  1. 如果没指定超时时间,则使用默认的3S超时,需要注意下这里的超时时间指是超时时间而非单次重试的时间(4.3.0版本为单次超时时间)
  2. 校验Producer服务状态是否正确,topic是否有效,消息体长度是否合法
  3. 获取topic路由信息,先在本地查找,本地没有查寻NameServer
  4. 根据每个broker处理的时间选择发送队列,每次发送完消息后会记录broker与每次发送消息所花时间
  5. 发送消息。如果是同步发送最多会重试3次,也可通过配置进行调整
  6. 记录此broker与发送消息所花时间的对应关系

总结一下Producer发送消息流程:

  1. 启动Producer客户端
  2. 获取topic路由信息
  3. 选择队列发送消息
点赞
收藏
评论区
推荐文章
Stella981 Stella981
3年前
Kafka、RabbitMQ、RocketMQ等消息中间件的对比 —— 消息发送性能和区别
Kafka、RabbitMQ、RocketMQ等消息中间件的对比——消息发送性能和区别那么,消息中间件性能究竟哪家强?带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka、RabbitMQ、RocketMQ)做了
Stella981 Stella981
3年前
RocketMQ消息轨迹
RocketMQ消息轨迹主要包含两篇文章:设计篇与源码分析篇,本节将详细介绍RocketMQ消息轨迹设计相关。RocketMQ消息轨迹,主要跟踪消息发送、消息消费的轨迹,即详细记录消息各个处理环节的日志,从设计上至少需要解决如下三个核心问题:消费轨迹数据格式记录消息轨迹(消息日志)消息轨迹数据存储在哪?1、消息轨迹数
Stella981 Stella981
3年前
RabbitMQ 基础概念介绍
AMQP消息模型RabbitMQ是基于AMQP(高级消息队列协议)的一个开源实现,其内部实际也是AMQP的基本概念。AMQP的消息发送流程有如下几个步骤:1.消息生产者(producer)将消息发布到Exchange中;2.Exchange根据队列的绑定关系将消息分发到不同的队列(Queue
Wesley13 Wesley13
3年前
JMS介绍
JMS消息传送模型    点对点消息传送模型  在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。点对点消息模型有一些特性,如下:每个消息只有一个接收者;消息发送者和接收者并没有时间依
Stella981 Stella981
3年前
Kafka中produer发送消息回调超时错误
Kafka版本0.10.1.1producer发送消息后出现如下错误消息: Theproducerhasaerror:Expiring1record(s)fortesttopic0dueto30039mshaspassedsincelastappendTheproducerhasaerror:Expi
Stella981 Stella981
3年前
Kafka 生产者与可靠性保证ACK(2)
生产者消息发送流程消息发送的整体流程,生产端主要由两个线程协调运行。分别是main线程和sender线程(发送线程)。在Kafka(2.6.0版本)源码中,可以看到。源码地址:kafka\clients\src\main\java\org.apache.kafka.clients.producer.KafkaProdu
Wesley13 Wesley13
3年前
JMS消息的概念解释
1、默认生产者消息是持久的:会存数据库\消费者的持久:createDurableSubscriber是指消费者能收到所有它订阅时间点之后的消息,即使消费者注册后关闭,当它重启就能收到注册时间点之后所有的消息;即当此消费用户ID(AAA)在producer发送之前就已经注册,那么此id能收到producer发送的所有消息,如果是在produce
Stella981 Stella981
3年前
RocketMQ消息发送【源码笔记】(一)
1.消息发送代码需要设置produerGroup需要设置NameServer地址DefaultMQProducerproducernewDefaultMQProducer("melontst");producer.setNamesrvAddr("localhost:9876");producer.s
Stella981 Stella981
3年前
RocketMq系列之Producer普通消息发送(三)
往昔源码Eureka精品源码(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzU2MTYyMDY1NA%3D%3D%26mid%3D2247483828%26idx%3D1%26sn%3Daf3ba8d12973
消息丢失排查方法?
遇到丢消息问题,如果是单聊,群聊,聊天室,系统消息可以在开发者后台北极星自助查询一下消息是否发送成功。根据您实际发送的相关信息(发送者、接收者、时间、消息ID……)看是否可以查到消息如果消息查不到一般有几种可能:信息有误(获取token的用户id跟您系统中