RocketMq系列之Producer普通消息发送(三)

Stella981
• 阅读 340

往昔源码

Eureka精品源码

Quartz系列全集

xxl-job系列

sharding-jdbc精品源码合集

构建sleuth+zipkin全链路监控系统(完结篇)

普通消息发送

下面是普通消息发送的示例

public static void main(String[] args) throws MQClientException, InterruptedException {        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");        producer.start();        for (int i = 0; i < 1000; i++) {            try {                Message msg = new Message("TopicTest","TagA",                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */                );                  // 发送结果                SendResult sendResult = producer.send(msg);                System.out.printf("%s%n", sendResult);            } catch (Exception e) {                e.printStackTrace();                Thread.sleep(1000);            }        }        producer.shutdown();    }

下面主要看一下producer.send(msg) 方法

@Overridepublic SendResult send(  Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  Validators.checkMessage(msg, this);  // 设置topic  msg.setTopic(withNamespace(msg.getTopic()));  // 发送  return this.defaultMQProducerImpl.send(msg);}

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send方法

/** * Timeout for sending messages. */private int sendMsgTimeout = 3000;/** * DEFAULT SYNC ------------------------------------------------------- , 默认同步发送 */public SendResult send(  Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  // 超时时间3秒  return send(msg, this.defaultMQProducer.getSendMsgTimeout());}

上面获取了this.defaultMQProducer.getSendMsgTimeout() 该参数为发送超时时间,默认 sendMsgTimeout 为3秒 。下面看一下send方法

private SendResult sendDefaultImpl(            Message msg,            final CommunicationMode communicationMode,            final SendCallback sendCallback,            final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        this.makeSureStateOK();        Validators.checkMessage(msg, this.defaultMQProducer);              //         final long invokeID = random.nextLong();        long beginTimestampFirst = System.currentTimeMillis(); // 当前时间        long beginTimestampPrev = beginTimestampFirst;        long endTimestamp = beginTimestampFirst; //        //1. 获取topic信息        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());        if (topicPublishInfo != null && topicPublishInfo.ok()) {            boolean callTimeout = false;            MessageQueue mq = null;            Exception exception = null;            SendResult sendResult = null;            // 2. 计算重试次数            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;            int times = 0;            String[] brokersSent = new String[timesTotal];            // 在重试次数内进行重试            for (; times < timesTotal; times++) {                String lastBrokerName = null == mq ? null : mq.getBrokerName();                // 3. 获取消息队列 , 默认轮询                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);                if (mqSelected != null) { // 队列不为空                    mq = mqSelected;                    brokersSent[times] = mq.getBrokerName(); //                     try {                            // 开始发送时间                        beginTimestampPrev = System.currentTimeMillis();                        if (times > 0) {                            //Reset topic with namespace during resend.                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                        }                        // 查看是否超时(包含了获取topic,队列等操作的耗时)                        long costTime = beginTimestampPrev - beginTimestampFirst;                        if (timeout < costTime) {                            callTimeout = true;                            break;                        }                                                // 4. 发送消息                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                        // 5.发送消息模式                        switch (communicationMode) {                            case ASYNC: // 异步                                return null;                            case ONEWAY: // 仅发一次                                return null;                            case SYNC:  // 同步发送,默认方式                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                        // 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 ,默认false                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                        continue;                                    }                                }                                return sendResult;                            default:                                break;                        }                    } catch (异常){                      // 异常信息处理                    }                } else {                    break;                }            }            if (sendResult != null) {                return sendResult;            }            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",                    times,                    System.currentTimeMillis() - beginTimestampFirst,                    msg.getTopic(),                    Arrays.toString(brokersSent));            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);            MQClientException mqClientException = new MQClientException(info, exception);            if (callTimeout) {                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");            }            if (exception instanceof MQBrokerException) {                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());            } else if (exception instanceof RemotingConnectException) {                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);            } else if (exception instanceof RemotingTimeoutException) {                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);            } else if (exception instanceof MQClientException) {                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);            }            throw mqClientException;        }                // 判断nameServerAddressList是否为空,为空报错        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();        if (null == nsList || nsList.isEmpty()) {            throw new MQClientException(                    "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);        }        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);    }

普通消息的发送用一张流程图来表示

RocketMq系列之Producer普通消息发送(三)

上面的源码其实已经涵盖了普通同步消息的大部分流程,剩下的就是调用MQClientApiImpl进行发送消息了, 下面讲一下异步消息

网上有一个观点: 异步消息没有重试?

这个观点是否正确?,下面我们看一下具体的源码,事实胜于雄辩

public SendResult sendMessage(        final String addr,        final String brokerName,        final Message msg,        final SendMessageRequestHeader requestHeader,        final long timeoutMillis,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final TopicPublishInfo topicPublishInfo,        final MQClientInstance instance,        final int retryTimesWhenSendFailed,        final SendMessageContext context,        final DefaultMQProducerImpl producer    ) throws RemotingException, MQBrokerException, InterruptedException {        long beginStartTime = System.currentTimeMillis();        RemotingCommand request = null;        if (sendSmartMsg || msg instanceof MessageBatch) {            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);        } else {            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);        }        request.setBody(msg.getBody());        switch (communicationMode) {            case ONEWAY:                this.remotingClient.invokeOneway(addr, request, timeoutMillis);                return null;            case ASYNC:                    // 异步消息。                final AtomicInteger times = new AtomicInteger();                long costTimeAsync = System.currentTimeMillis() - beginStartTime;                if (timeoutMillis < costTimeAsync) {                    throw new RemotingTooMuchRequestException("sendMessage call timeout");                }                // 发送异步消息                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,                    retryTimesWhenSendFailed, times, context, producer);                return null;            case SYNC:                long costTimeSync = System.currentTimeMillis() - beginStartTime;                if (timeoutMillis < costTimeSync) {                    throw new RemotingTooMuchRequestException("sendMessage call timeout");                }                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);            default:                assert false;                break;        }        return null;    }

sendMessageAsync

private void sendMessageAsync(        final String addr,        final String brokerName,        final Message msg,        final long timeoutMillis,        final RemotingCommand request,        final SendCallback sendCallback,        final TopicPublishInfo topicPublishInfo,        final MQClientInstance instance,        final int retryTimesWhenSendFailed,        final AtomicInteger times,        final SendMessageContext context,        final DefaultMQProducerImpl producer    ) throws InterruptedException, RemotingException {        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {            @Override            public void operationComplete(ResponseFuture responseFuture) {                RemotingCommand response = responseFuture.getResponseCommand();                if (null == sendCallback && response != null) {                    try {                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);                        if (context != null && sendResult != null) {                            context.setSendResult(sendResult);                            context.getProducer().executeSendMessageHookAfter(context);                        }                    } catch (Throwable e) {                    }                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);                    return;                }                if (response != null) {                    try {                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);                        assert sendResult != null;                        if (context != null) {                            context.setSendResult(sendResult);                            context.getProducer().executeSendMessageHookAfter(context);                        }                        try {                            sendCallback.onSuccess(sendResult);                        } catch (Throwable e) {                        }                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);                    } catch (Exception e) {                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);                        // 重点//// 重点                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,                            retryTimesWhenSendFailed, times, e, context, false, producer);                    }                } else {                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);                    if (!responseFuture.isSendRequestOK()) {                        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,                            retryTimesWhenSendFailed, times, ex, context, true, producer);                    } else if (responseFuture.isTimeout()) {                        MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",                            responseFuture.getCause());                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,                            retryTimesWhenSendFailed, times, ex, context, true, producer);                    } else {                        MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,                            retryTimesWhenSendFailed, times, ex, context, true, producer);                    }                }            }        });    }

在异步消息发送出现问题,依旧会调用onExceptionImpl 方法,该方法里面进行了重试

private void onExceptionImpl(final String brokerName,        final Message msg,        final long timeoutMillis,        final RemotingCommand request,        final SendCallback sendCallback,        final TopicPublishInfo topicPublishInfo,        final MQClientInstance instance,        final int timesTotal,        final AtomicInteger curTimes,        final Exception e,        final SendMessageContext context,        final boolean needRetry,        final DefaultMQProducerImpl producer    ) {        int tmp = curTimes.incrementAndGet();        if (needRetry && tmp <= timesTotal) {            String retryBrokerName = brokerName;//by default, it will send to the same broker            if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send                MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);                retryBrokerName = mqChosen.getBrokerName();            }            String addr = instance.findBrokerAddressInPublish(retryBrokerName);            log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,                retryBrokerName);            try {                request.setOpaque(RemotingCommand.createNewRequestId());                sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,                    timesTotal, curTimes, context, producer);            } catch (InterruptedException e1) {                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,                    context, false, producer);            } catch (RemotingConnectException e1) {                producer.updateFaultItem(brokerName, 3000, true);                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,                    context, true, producer);            } catch (RemotingTooMuchRequestException e1) {                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,                    context, false, producer);            } catch (RemotingException e1) {                producer.updateFaultItem(brokerName, 3000, true);                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,                    context, true, producer);            }        } else {            if (context != null) {                context.setException(e);                context.getProducer().executeSendMessageHookAfter(context);            }            try {                sendCallback.onException(e);            } catch (Exception ignored) {            }        }    }

所以结论是,异步消息一样会进行重试

关于发送消息的一些配置,做如下说明:

retryTimesWhenSendFailed

配置说明:同步发送失败的话,rocketmq内部重试多少次

默认值:2

retryTimesWhenSendAsyncFailed

配置说明:异步发送失败的话,rocketmq内部重试多少次

默认值:2

retryAnotherBrokerWhenNotStoreOK

配置说明:发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发

默认值:false

发送结果总共有4钟:

SEND_OK, //状态成功,无论同步还是存储FLUSH_DISK_TIMEOUT, // broker刷盘策略为同步刷盘(SYNC_FLUSH)的话时候,等待刷盘的时候超时FLUSH_SLAVE_TIMEOUT, // master role采取同步复制策略(SYNC_MASTER)的时候,消息尝试同步到slave超时SLAVE_NOT_AVAILABLE, //slave不可用

注:从源码上看,此配置项只对同步发送有效,异步、oneway(由于无法获取结果,肯定无效)均无效


欢迎关注我的微信公众号:【sharedCode】

RocketMq系列之Producer普通消息发送(三)

回复:“资源”、“架构”等关键词获取海量免费学习资料。

本文分享自微信公众号 - sharedCode(sharedCode)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
JDK源码分析
概述前文「JDK源码分析PriorityQueue(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzU4NzYyMDE4MQ%3D%3D%26mid%3D2247483966%26idx%3D1%26sn%3D
Stella981 Stella981
2年前
RocketMQ系列之pull(拉)消息模式(七)
Eureka精品源码(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzU2MTYyMDY1NA%3D%3D%26mid%3D2247483828%26idx%3D1%26sn%3Daf3ba8d129735fa3c5bec5
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这