聊聊CarreraProducer的sendDelay

协程潮汐
• 阅读 1469

本文主要研究一下CarreraProducer的sendDelay

ProducerInterface

DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/ProducerInterface.java

public interface ProducerInterface {

    void start() throws Exception;

    void shutdown();

    Result sendMessage(Message message);

    Result send(String topic, byte[] body);

    Result send(String topic, String body);

    Result sendByCharset(String topic, String body, String charsetName);

    Result send(String topic, String body, String key, String... tags);

    Result send(String topic, byte[] body, String key, String... tags);

    Result sendByCharset(String topic, String body, String charsetName, String key, String... tags);

    Result sendWithHashId(String topic, long hashId, String body, String key, String... tags);

    Result sendWithHashId(String topic, long hashId, byte[] body, String key, String... tags);

    Result sendWithHashIdByCharset(String topic, long hashId, String body, String charsetName, String key, String[] tags);

    Result sendWithPartition(String topic, int partitionId, long hashId, byte[] body, String key, String... tags);

    Result sendWithPartition(String topic, int partitionId, long hashId, String body, String key, String... tags);

    Result sendWithPartitionByCharset(String topic, int partitionId, long hashId, String body, String charsetName, String key, String[] tags);

    Result sendBatchConcurrently(List<Message> messages);

    Result sendBatchOrderly(List<Message> messages);

    DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta);

    DelayResult sendDelay(String topic, String body, DelayMeta delayMeta);

    DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName);

    DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags);

    DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags);

    DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags);

    DelayResult cancelDelay(String topic, String uniqDelayMsgId);

    DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags);

    Result sendBatchSync(List<Message> messages);
}
  • ProducerInterface定义了几个sendDelay及cancelDelay方法

CarreraProducer

DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/CarreraProducer.java

public class CarreraProducer implements ProducerInterface {
    private ProducerInterface producer;
    private CarreraConfig config;

    public CarreraProducer(CarreraConfig config) {
        producer = new LocalCarreraProducer(config);
        this.config = config;
    }

    public static CarreraProducer newCarreraProducer(CarreraConfig config) throws Exception {
        return new CarreraProducer(config);
    }

    public MessageBuilder messageBuilder() {
        return new MessageBuilder(this);
    }

    public AddDelayMessageBuilder addDelayMessageBuilder() {
        return new AddDelayMessageBuilder(this);
    }

    public CancelDelayMessageBuilder cancelDelayMessageBuilder() {
        return new CancelDelayMessageBuilder(this);
    }

    public AddTxMonitorMessageBuilder addTxMonitorMessageBuilder(AddDelayMessageBuilder addDelayMessageBuilder) {
        return new AddTxMonitorMessageBuilder(addDelayMessageBuilder);
    }

    public CancelTxMonitorMessageBuilder cancelTxMonitorMessageBuilder(CancelDelayMessageBuilder cancelDelayMessageBuilder) {
        return new CancelTxMonitorMessageBuilder(cancelDelayMessageBuilder);
    }

    public TxBusinessMessageBuilder txBusinessMessageBuilder(MessageBuilder messageBuilder) {
        return new TxBusinessMessageBuilder(messageBuilder);
    }

    //......

    @Override
    public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta) {
        return producer.sendDelay(topic, body, delayMeta);
    }

    @Override
    public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta) {
        return producer.sendDelay(topic, body, delayMeta);
    }

    @Override
    public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName) {
        return producer.sendDelayByCharset(topic, body, delayMeta, charsetName);
    }

    @Override
    public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags) {
        return producer.sendDelay(topic, body, delayMeta, tags);
    }

    @Override
    public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags) {
        return producer.sendDelay(topic, body, delayMeta, tags);
    }

    @Override
    public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags) {
        return producer.sendDelayByCharset(topic, body, delayMeta, charsetName, tags);
    }

    @Override
    public DelayResult cancelDelay(String topic, String uniqDelayMsgId) {
        return producer.cancelDelay(topic, uniqDelayMsgId);
    }

    @Override
    public DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags) {
        return producer.cancelDelay(topic, uniqDelayMsgId, tags);
    }

    //......    
}
  • CarreraProducer实现了ProducerInterface接口,其sendDelay、cancelDelay方法委托给了LocalCarreraProducer

LocalCarreraProducer

DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/LocalCarreraProducer.java

public class LocalCarreraProducer extends CarreraProducerBase implements ProducerInterface {

    public LocalCarreraProducer(CarreraConfig config) {
        super(config);
    }

    @Override
    protected void initNodeMgr() throws Exception {
        nodeMgr = NodeManager.newLocalNodeManager(config, config.getCarreraProxyList());
        nodeMgr.initConnectionPool();
    }
}
  • LocalCarreraProducer继承了CarreraProducerBase,实现了ProducerInterface接口

CarreraProducerBase

DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/CarreraProducerBase.java

public abstract class CarreraProducerBase implements ProducerInterface {
    private static final Logger LOGGER = LoggerFactory.getLogger(CarreraProducerBase.class);
    private static final Logger DROP_LOGGER = LoggerFactory.getLogger("DROP_LOG");

    private static final int DELAY_ACTIONS_ADD = 1;
    private static final int DELAY_ACTIONS_CANCEL = 2;
    private static final String TAGS_SEPARATOR = "||";
    private volatile boolean isRunning = false;
    protected NodeManager nodeMgr;
    protected CarreraConfig config;
    private ExecutorService executor;

    public CarreraProducerBase(CarreraConfig config) {
        this.config = config;
    }

    //......

    public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta) {
        return sendDelayMessage(buildDelayMessage4Add(topic, body, delayMeta, randomKey()));
    }

    public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta) {
        return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(), delayMeta, randomKey()));
    }

    public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName) {
        try {
            return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(charsetName), delayMeta));
        } catch (UnsupportedEncodingException e) {
            return new DelayResult(CHARSET_ENCODING_EXCEPTION, e.getMessage(), "");
        }
    }

    public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags) {
        return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(), delayMeta, tags));
    }

    public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags) {
        return sendDelayMessage(buildDelayMessage4Add(topic, body, delayMeta, tags));
    }

    public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags) {
        try {
            return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(charsetName), delayMeta, tags));
        } catch (UnsupportedEncodingException e) {
            return new DelayResult(CHARSET_ENCODING_EXCEPTION, e.getMessage(), "");
        }
    }

    public DelayResult cancelDelay(String topic, String uniqDelayMsgId) {
        return sendDelayMessage(buildDelayMessage4Cancel(topic, uniqDelayMsgId, randomKey()));
    }

    public DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags) {
        return sendDelayMessage(buildDelayMessage4Cancel(topic, uniqDelayMsgId, tags));
    }

    private DelayMessage buildDelayMessage4Add(String topic, byte[] body, DelayMeta delayMeta, String... tags) {
        DelayMessage delayMessage = new DelayMessage();
        delayMessage.setTopic(topic);
        delayMessage.setBody(body);
        delayMessage.setAction(DELAY_ACTIONS_ADD);
        delayMessage.setTimestamp(delayMeta.getTimestamp());
        delayMessage.setDmsgtype(delayMeta.getDmsgtype());
        delayMessage.setInterval(delayMeta.getInterval());
        delayMessage.setExpire(delayMeta.getExpire());
        delayMessage.setTimes(delayMeta.getTimes());
        delayMessage.setUuid(new UUID().toString());
        delayMessage.setVersion(VersionUtils.getVersion());
        if (null != delayMeta.getProperties() && delayMeta.getProperties().size() > 0) {
            delayMessage.setProperties(delayMeta.getProperties());
        }

        if (ArrayUtils.isNotEmpty(tags)) {
            delayMessage.setTags(StringUtils.join(tags, TAGS_SEPARATOR));
        }

        return delayMessage;
    }

    private DelayMessage buildDelayMessage4Cancel(String topic, String uniqDelayMsgId, String... tags) {
        DelayMessage delayMessage = new DelayMessage();
        delayMessage.setTopic(topic);
        delayMessage.setUniqDelayMsgId(uniqDelayMsgId);
        delayMessage.setAction(DELAY_ACTIONS_CANCEL);
        delayMessage.setVersion(VersionUtils.getVersion());
        delayMessage.setBody("c".getBytes()); // if body is null, new String(message.getBody()) will throw NullPointerException

        if (ArrayUtils.isNotEmpty(tags)) {
            delayMessage.setTags(StringUtils.join(tags, TAGS_SEPARATOR));
        }

        return delayMessage;
    }

    private DelayResult sendDelayMessage(DelayMessage message) {
        DelayResult result = new DelayResult(UNKNOWN_EXCEPTION, "unknown exception", "");
        if (!isRunning) {
            result.setCode(CLIENT_EXCEPTION);
            result.setMsg("please execute the start() method before sending the message");
            return result;
        }
        int retryCnt = 0;
        long start, used = 0;
        long begin = TimeUtils.getCurTime();
        String proxyAddress = null;
        do {
            CarreraConnection connection = null;
            try {
                connection = nodeMgr.borrowConnection(config.getCarreraClientTimeout());
                if (connection == null) {
                    if (result.getCode() == UNKNOWN_EXCEPTION) {
                        result.setCode(NO_MORE_HEALTHY_NODE);
                        result.setMsg("no more healthy node");
                    }
                    delay(config.getCarreraClientTimeout());
                    continue;
                }
                proxyAddress = connection.getNode().toString();
                start = TimeUtils.getCurTime();
                result = connection.sendDelay(message, this.config.getCarreraProxyTimeout());
                used = TimeUtils.getElapseTime(start);

                if (result.getCode() > OK) {
                    switch (result.getCode()) {
                        case FAIL_ILLEGAL_MSG:
                        case FAIL_TOPIC_NOT_ALLOWED:
                        case FAIL_TOPIC_NOT_EXIST:
                        case FAIL_TIMEOUT:
                        case FAIL_REFUSED_BY_RATE_LIMITER:
                            delay(Math.max(this.config.getCarreraClientTimeout() - TimeUtils.getElapseTime(start), 0));
                            break;
                        default:
                            nodeMgr.unhealthyNode(connection.getNode());
                            delay(Math.max(this.config.getCarreraClientTimeout() - TimeUtils.getElapseTime(start), 0));
                            break; //break switch
                    }
                } else {
                    break; //break loop
                }
            } catch (Exception e) {
                LOGGER.warn("sendMessage failed, retry count:" + retryCnt + ", topic:" + message.topic + ", key:" + message.uniqDelayMsgId, e);
                result.setCode(CLIENT_EXCEPTION);
                result.setMsg(e.toString());
            } finally {
                if (connection != null) {
                    nodeMgr.returnConnection(connection);
                }
            }
        } while (retryCnt++ < this.config.getCarreraClientRetry());

        if (result.getCode() > OK) {
            LOGGER.error("send delay msg result:{}; msg[ip:{},topic:{},uuid:{},uniqDelayMsgId:{},len:{},used:{},retryCount:{},ret.Code:{},ret.Msg:{}]",
                    resultToString(result), proxyAddress, message.getTopic(), message.getUuid(), message.getUniqDelayMsgId(),
                    StringUtils.length(new String(message.getBody())), TimeUtils.getElapseTime(begin), retryCnt, result.getCode(), result.getMsg());
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("send delay msg result:{}; msg[ip:{},topic:{},uniqDelayMsgId:{},len:{},used:{},retryCount:{}]",
                        resultToString(result), proxyAddress, message.getTopic(), result.getUniqDelayMsgId(),
                        StringUtils.length(new String(message.getBody())), used, retryCnt);
            }
        }
        return result;
    }

}
  • sendDelay通过buildDelayMessage4Add构造DelayMessage,而cancelDelay通过buildDelayMessage4Cancel构造DelayMessage,最后通过sendDelayMessage方法发送消息

小结

ProducerInterface定义了几个sendDelay及cancelDelay方法;CarreraProducer实现了ProducerInterface接口,其sendDelay、cancelDelay方法委托给了LocalCarreraProducer;LocalCarreraProducer继承了CarreraProducerBase,实现了ProducerInterface接口;CarreraProducerBase的sendDelay通过buildDelayMessage4Add构造DelayMessage,而cancelDelay通过buildDelayMessage4Cancel构造DelayMessage,最后通过sendDelayMessage方法发送消息

doc

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
待兔 待兔
1年前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
科工人 科工人
4年前
聊聊golang的DDD项目结构
序本文主要研究一下golang的DDD项目结构interfacesfoodappserver/interfacesinterfacesgit:(master)tree.|____fileupload||____fileformat.go||____fileupload.go|____food_handler.go|__
Wesley13 Wesley13
3年前
jmxtrans+influxdb+grafana监控zookeeper实战
序本文主要研究一下如何使用jmxtransinfluxdbgranfa监控zookeeper配置zookeeperjmx在conf目录下新增zookeeperenv.sh,并使用chmodx赋予执行权限,内容如下JMXLOCALONLYfalseJMXDISABLEfals
Wesley13 Wesley13
3年前
4cast
4castpackageloadcsv.KumarAwanish发布:2020122117:43:04.501348作者:KumarAwanish作者邮箱:awanish00@gmail.com首页:
Stella981 Stella981
3年前
RedisTemplate读取slowlog
序本文主要研究一下如何使用RedisTemplate(lettuce类库)读取slowlogmaven<dependency<groupIdorg.springframework.boot</groupId<artifactIdspringbootstarterdata
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Kafka 生产者与可靠性保证ACK(2)
生产者消息发送流程消息发送的整体流程,生产端主要由两个线程协调运行。分别是main线程和sender线程(发送线程)。在Kafka(2.6.0版本)源码中,可以看到。源码地址:kafka\clients\src\main\java\org.apache.kafka.clients.producer.KafkaProdu
Java服务总在半夜挂,背后的真相竟然是... | 京东云技术团队
最近有用户反馈测试环境Java服务总在凌晨00:00左右挂掉,用户反馈Java服务没有定时任务,也没有流量突增的情况,Jvm配置也合理,莫名其妙就挂了