聊聊artemis的maxDeliveryAttempts

公孙胜
• 阅读 986

本文主要研究一下artemis的maxDeliveryAttempts

maxDeliveryAttempts

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java

public class AddressSettings implements Mergeable<AddressSettings>, Serializable, EncodingSupport {

   //......

   public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;

   private Integer maxDeliveryAttempts = null;

   private SimpleString deadLetterAddress = null;

   //......

   public int getMaxDeliveryAttempts() {
      return maxDeliveryAttempts != null ? maxDeliveryAttempts : AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS;
   }

   public AddressSettings setMaxDeliveryAttempts(final int maxDeliveryAttempts) {
      this.maxDeliveryAttempts = maxDeliveryAttempts;
      return this;
   }

   public SimpleString getDeadLetterAddress() {
      return deadLetterAddress;
   }

   public AddressSettings setDeadLetterAddress(final SimpleString deadLetterAddress) {
      this.deadLetterAddress = deadLetterAddress;
      return this;
   }

   //......
}   
  • AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10

checkRedelivery

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
                                  final long timeBase,
                                  final boolean ignoreRedeliveryDelay) throws Exception {
      Message message = reference.getMessage();

      if (internalQueue) {
         if (logger.isTraceEnabled()) {
            logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
         }
         // no DLQ check on internal queues
         return new Pair<>(true, false);
      }

      if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
         storageManager.updateDeliveryCount(reference);
      }

      AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());

      int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
      int deliveryCount = reference.getDeliveryCount();

      // First check DLA
      if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) {
         if (logger.isTraceEnabled()) {
            logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
         }
         boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());

         return new Pair<>(false, dlaResult);
      } else {
         // Second check Redelivery Delay
         long redeliveryDelay = addressSettings.getRedeliveryDelay();
         if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
            redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount);

            if (logger.isTraceEnabled()) {
               logger.trace("Setting redeliveryDelay=" + redeliveryDelay + " on reference=" + reference);
            }

            reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);

            if (!reference.isPaged() && reference.isDurable() && isDurable()) {
               storageManager.updateScheduledDeliveryTime(reference);
            }
         }

         decDelivering(reference);

         return new Pair<>(true, false);
      }
   }

   //......
}
  • QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress

sendToDeadLetterAddress

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {
   
   //......

   private boolean sendToDeadLetterAddress(final Transaction tx,
                                        final MessageReference ref,
                                        final SimpleString deadLetterAddress) throws Exception {
      if (deadLetterAddress != null) {
         Bindings bindingList = postOffice.lookupBindingsForAddress(deadLetterAddress);

         if (bindingList == null || bindingList.getBindings().isEmpty()) {
            ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
            ref.acknowledge(tx, AckReason.KILLED, null);
         } else {
            ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
            move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
            return true;
         }
      } else {
         ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);

         ref.acknowledge(tx, AckReason.KILLED, null);
      }

      return false;
   }

   private void move(final Transaction originalTX,
                     final SimpleString address,
                     final Binding binding,
                     final MessageReference ref,
                     final boolean rejectDuplicate,
                     final AckReason reason,
                     final ServerConsumer consumer) throws Exception {
      Transaction tx;

      if (originalTX != null) {
         tx = originalTX;
      } else {
         // if no TX we create a new one to commit at the end
         tx = new TransactionImpl(storageManager);
      }

      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);

      copyMessage.setAddress(address);

      postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);

      acknowledge(tx, ref, reason, consumer);

      if (originalTX == null) {
         tx.commit();
      }
   }

   //......
}
  • sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

incrementDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

   //......

   public HandleStatus handle(final MessageReference ref) throws Exception {
      // available credits can be set back to null with a flow control option.
      AtomicInteger checkInteger = availableCredits;
      if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
         if (logger.isDebugEnabled()) {
            logger.debug(this + " is busy for the lack of credits. Current credits = " +
                            availableCredits +
                            " Can't receive reference " +
                            ref);
         }

         return HandleStatus.BUSY;
      }

      synchronized (lock) {
         // If the consumer is stopped then we don't accept the message, it
         // should go back into the
         // queue for delivery later.
         // TCP-flow control has to be done first than everything else otherwise we may lose notifications
         if ((callback != null && !callback.isWritable(this, protocolContext)) || !started || transferring) {
            return HandleStatus.BUSY;
         }

         // If there is a pendingLargeMessage we can't take another message
         // This has to be checked inside the lock as the set to null is done inside the lock
         if (largeMessageDeliverer != null) {
            if (logger.isDebugEnabled()) {
               logger.debug(this + " is busy delivering large message " +
                               largeMessageDeliverer +
                               ", can't deliver reference " +
                               ref);
            }
            return HandleStatus.BUSY;
         }
         final Message message = ref.getMessage();

         if (!message.acceptsConsumer(sequentialID())) {
            return HandleStatus.NO_MATCH;
         }

         if (filter != null && !filter.match(message)) {
            if (logger.isTraceEnabled()) {
               logger.trace("Reference " + ref + " is a noMatch on consumer " + this);
            }
            return HandleStatus.NO_MATCH;
         }

         if (logger.isTraceEnabled()) {
            logger.trace("ServerConsumerImpl::" + this + " Handling reference " + ref);
         }
         if (!browseOnly) {
            if (!preAcknowledge) {
               deliveringRefs.add(ref);
            }

            ref.handled();

            ref.setConsumerId(this.id);

            ref.incrementDeliveryCount();

            // If updateDeliveries = false (set by strict-update),
            // the updateDeliveryCountAfterCancel would still be updated after c
            if (strictUpdateDeliveryCount && !ref.isPaged()) {
               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
                  !ref.getQueue().isInternalQueue() &&
                  !ref.isPaged()) {
                  storageManager.updateDeliveryCount(ref);
               }
            }

            if (preAcknowledge) {
               if (message.isLargeMessage()) {
                  // we must hold one reference, or the file will be deleted before it could be delivered
                  ((LargeServerMessage) message).incrementDelayDeletionCount();
               }

               // With pre-ack, we ack *before* sending to the client
               ref.getQueue().acknowledge(ref, this);
               acks++;
            }

            if (message.isLargeMessage() && this.supportLargeMessage) {
               largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, ref);
            }

         }

         pendingDelivery.countUp();

         return HandleStatus.HANDLED;
      }
   }

   //......
}
  • ServerConsumerImpl的handle方法会在非browseOnly的情况下会调用ref.incrementDeliveryCount()来增加deliveryCount;必要的时候会执行storageManager.updateDeliveryCount(ref)

updateDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java

public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {

   //......

   public void updateDeliveryCount(final MessageReference ref) throws Exception {
      // no need to store if it's the same value
      // otherwise the journal will get OME in case of lots of redeliveries
      if (ref.getDeliveryCount() == ref.getPersistedCount()) {
         return;
      }

      ref.setPersistedCount(ref.getDeliveryCount());
      DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());

      readLock();
      try {
         messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
      } finally {
         readUnLock();
      }
   }

   //......
}
  • AbstractJournalStorageManager的updateDeliveryCount方法会更新persistedCount到storage

小结

AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10;QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress;sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

doc

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Stella981 Stella981
3年前
MacOS VSCode 安装 GO 插件失败问题解决
0x00问题重现Installinggolang.org/x/tools/cmd/guruFAILEDInstallinggolang.org/x/tools/cmd/gorenameFAILEDInstallinggolang.org/x/lint/golintFAILEDInst
Wesley13 Wesley13
3年前
activemq artemis broker.xml 可选配置
activemqartemisbroker.xml可选配置https://my.oschina.net/zengfr(https://my.oschina.net/zengfr)"urn:activemq:core":systempropertyprefix,"urn:activemq:core":internal
Wesley13 Wesley13
3年前
activemq artemis address 详细配置
activemqartemis address详细配置https://my.oschina.net/zengfr(https://my.oschina.net/zengfr)'{"urn:activemq:core":autocreatedeadletterresources, "urn:activemq:core":dead
Wesley13 Wesley13
3年前
activemq artemis address 详细配置实例
activemqartemisaddress详细配置实例https://my.oschina.net/zengfr(https://my.oschina.net/zengfr)默认配置:<addresssettingmatch""<deadletteraddressDLQ</d
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
HBase启动失败
如果在hbase的shell中输入了status报错,hbase(main):001:0statusERROR:org.apache.hadoop.hbase.ipc.ServerNotRunningYetException:Serverisnotrunningyetatorg.apache.ha
Wesley13 Wesley13
3年前
ActiveMQ:初见&安装试运行
官网:http://activemq.apache.org/!(https://oscimg.oschina.net/oscnet/1922332e779bf1240345d5396fb0bb138d0.png)ActiveMQ是一个消息中间件,在大型互联网应用中有广泛的使用。当前最新版本:5.15.4,发布于20180522,开源、Ap
Stella981 Stella981
3年前
Kafka 生产者与可靠性保证ACK(2)
生产者消息发送流程消息发送的整体流程,生产端主要由两个线程协调运行。分别是main线程和sender线程(发送线程)。在Kafka(2.6.0版本)源码中,可以看到。源码地址:kafka\clients\src\main\java\org.apache.kafka.clients.producer.KafkaProdu