聊聊artemis的persistenceEnabled

本文主要研究一下artemis的persistenceEnabled



persistenceEnabled

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java

public class ConfigurationImpl implements Configuration, Serializable {? ? //......? ? private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled();? ? public boolean isPersistenceEnabled() { ? ?  return persistenceEnabled; ? }? ? public ConfigurationImpl setPersistenceEnabled(final boolean enable) { ? ?  persistenceEnabled = enable; ? ?  return this; ? }? ? //......} ? 
  • ConfigurationImpl定义了persistenceEnabled属性,默认为ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled()(true)

createStorageManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

public class ActiveMQServerImpl implements ActiveMQServer {? ? //......? ? protected StorageManager createStorageManager() { ? ?  if (configuration.isPersistenceEnabled()) { ? ? ? ? if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { ? ? ? ? ?  JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO); ? ? ? ? ?  this.getCriticalAnalyzer().add(journal); ? ? ? ? ?  return journal; ? ? ? ? } else { ? ? ? ? ?  // Default to File Based Storage Manager, (Legacy default configuration). ? ? ? ? ?  JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO); ? ? ? ? ?  this.getCriticalAnalyzer().add(journal); ? ? ? ? ?  return journal; ? ? ? ? } ? ?  } ? ?  return new NullStorageManager(); ? }? ? //......}
  • ActiveMQServerImpl的createStorageManager方法在configuration.isPersistenceEnabled()为true时创建的StorageManager是JDBCJournalStorageManager或者JournalStorageManager;否则创建的StorageManager是NullStorageManager

processRoute

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {? ? //......? ? public void processRoute(final Message message, ? ? ? ? ? ? ? ? ? ? ? ? ?  final RoutingContext context, ? ? ? ? ? ? ? ? ? ? ? ? ?  final boolean direct) throws Exception { ? ?  final List<MessageReference> refs = new ArrayList<>();? ? ?  Transaction tx = context.getTransaction();? ? ?  Long deliveryTime = message.getScheduledDeliveryTime();? ? ?  for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) { ? ? ? ? PagingStore store = pagingManager.getPageStore(entry.getKey());? ? ? ? ? if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) { ? ? ? ? ?  if (message.isLargeMessage()) { ? ? ? ? ? ? ? confirmLargeMessageSend(tx, message); ? ? ? ? ?  }? ? ? ? ? ?  // We need to kick delivery so the Queues may check for the cursors case they are empty ? ? ? ? ?  schedulePageDelivery(tx, entry); ? ? ? ? ?  continue; ? ? ? ? }? ? ? ? ? for (Queue queue : entry.getValue().getNonDurableQueues()) { ? ? ? ? ?  MessageReference reference = MessageReference.Factory.createReference(message, queue);? ? ? ? ? ?  if (deliveryTime != null) { ? ? ? ? ? ? ? reference.setScheduledDeliveryTime(deliveryTime); ? ? ? ? ?  } ? ? ? ? ?  refs.add(reference);? ? ? ? ? ?  message.incrementRefCount(); ? ? ? ? }? ? ? ? ? Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();? ? ? ? ? while (iter.hasNext()) { ? ? ? ? ?  Queue queue = iter.next();? ? ? ? ? ?  MessageReference reference = MessageReference.Factory.createReference(message, queue);? ? ? ? ? ?  if (context.isAlreadyAcked(context.getAddress(message), queue)) { ? ? ? ? ? ? ? reference.setAlreadyAcked(); ? ? ? ? ? ? ? if (tx != null) { ? ? ? ? ? ? ? ?  queue.acknowledge(tx, reference); ? ? ? ? ? ? ? } ? ? ? ? ?  }? ? ? ? ? ?  if (deliveryTime != null) { ? ? ? ? ? ? ? reference.setScheduledDeliveryTime(deliveryTime); ? ? ? ? ?  } ? ? ? ? ?  refs.add(reference);? ? ? ? ? ?  if (message.isDurable()) { ? ? ? ? ? ? ? int durableRefCount = message.incrementDurableRefCount();? ? ? ? ? ? ? ? if (durableRefCount == 1) { ? ? ? ? ? ? ? ?  if (tx != null) { ? ? ? ? ? ? ? ? ? ? storageManager.storeMessageTransactional(tx.getID(), message); ? ? ? ? ? ? ? ?  } else { ? ? ? ? ? ? ? ? ? ? storageManager.storeMessage(message); ? ? ? ? ? ? ? ?  }? ? ? ? ? ? ? ? ?  if (message.isLargeMessage()) { ? ? ? ? ? ? ? ? ? ? confirmLargeMessageSend(tx, message); ? ? ? ? ? ? ? ?  } ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? if (tx != null) { ? ? ? ? ? ? ? ?  storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());? ? ? ? ? ? ? ? ?  tx.setContainsPersistent(); ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ?  storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext()); ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? if (deliveryTime != null && deliveryTime > 0) { ? ? ? ? ? ? ? ?  if (tx != null) { ? ? ? ? ? ? ? ? ? ? storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); ? ? ? ? ? ? ? ?  } else { ? ? ? ? ? ? ? ? ? ? storageManager.updateScheduledDeliveryTime(reference); ? ? ? ? ? ? ? ?  } ? ? ? ? ? ? ? } ? ? ? ? ?  }? ? ? ? ? ?  message.incrementRefCount(); ? ? ? ? } ? ?  }? ? ?  if (tx != null) { ? ? ? ? tx.addOperation(new AddOperation(refs)); ? ?  } else { ? ? ? ? // This will use the same thread if there are no pending operations ? ? ? ? // avoiding a context switch on this case ? ? ? ? storageManager.afterCompleteOperations(new IOCallback() { ? ? ? ? ?  @Override ? ? ? ? ?  public void onError(final int errorCode, final String errorMessage) { ? ? ? ? ? ? ? ActiveMQServerLogger.LOGGER.ioErrorAddingReferences(errorCode, errorMessage); ? ? ? ? ?  }? ? ? ? ? ?  @Override ? ? ? ? ?  public void done() { ? ? ? ? ? ? ? context.processReferences(refs, direct); ? ? ? ? ?  } ? ? ? ? }); ? ?  } ? }? ? //......}
  • PostOfficeImpl的processRoute方法会判断message.isDurable(),若为true且durableRefCount为1则会执行storageManager.storeMessage或者storageManager.storeMessageTransactional方法

storeMessage

AbstractJournalStorageManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java

public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {? ? //......? ? protected Journal messageJournal;? ? //......? ? public void storeMessage(final Message message) throws Exception { ? ?  if (message.getMessageID() <= 0) { ? ? ? ? // Sanity check only... this shouldn't happen unless there is a bug ? ? ? ? throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); ? ?  }? ? ?  readLock(); ? ?  try { ? ? ? ? // Note that we don't sync, the add reference that comes immediately after will sync if ? ? ? ? // appropriate? ? ? ? ? if (message.isLargeMessage()) { ? ? ? ? ?  messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message, false, getContext(false)); ? ? ? ? } else { ? ? ? ? ?  messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message, false, getContext(false)); ? ? ? ? } ? ?  } finally { ? ? ? ? readUnLock(); ? ?  } ? }? ? //......}
  • JDBCJournalStorageManager及JournalStorageManager都继承了AbstractJournalStorageManager,其storeMessage方法会调用messageJournal.appendAddRecord方法;二者的messageJournal实现不同,一个是JDBCJournalImpl,一个是JournalImpl

NullStorageManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java

public class NullStorageManager implements StorageManager {? ? //......? ? public void storeMessage(final Message message) throws Exception { ? }? ? //......}
  • NullStorageManager实现了StorageManager接口,其storeMessage为空方法

小结

ConfigurationImpl定义了persistenceEnabled属性,默认为ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled()(true);ActiveMQServerImpl的createStorageManager方法在configuration.isPersistenceEnabled()为true时创建的StorageManager是JDBCJournalStorageManager或者JournalStorageManager;否则创建的StorageManager是NullStorageManager;PostOfficeImpl的processRoute方法会判断message.isDurable(),若为true且durableRefCount为1则会执行storageManager.storeMessage或者storageManager.storeMessageTransactional方法

doc

  • ConfigurationImpl

版权声明:

作者: freeclashnode

链接: https://www.freeclashnode.com/news/article-2165.htm

来源: FreeClashNode

文章版权归作者所有,未经允许请勿转载。

免费节点实时更新

热门文章

最新文章

归档