聊聊artemis的persistenceEnabled
序
本文主要研究一下artemis的persistenceEnabled
persistenceEnabled
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
public class ConfigurationImpl implements Configuration, Serializable {? ? //......? ? private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled();? ? public boolean isPersistenceEnabled() { ? ? return persistenceEnabled; ? }? ? public ConfigurationImpl setPersistenceEnabled(final boolean enable) { ? ? persistenceEnabled = enable; ? ? return this; ? }? ? //......} ?
- ConfigurationImpl定义了persistenceEnabled属性,默认为ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled()(true)
createStorageManager
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
public class ActiveMQServerImpl implements ActiveMQServer {? ? //......? ? protected StorageManager createStorageManager() { ? ? if (configuration.isPersistenceEnabled()) { ? ? ? ? if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { ? ? ? ? ? JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO); ? ? ? ? ? this.getCriticalAnalyzer().add(journal); ? ? ? ? ? return journal; ? ? ? ? } else { ? ? ? ? ? // Default to File Based Storage Manager, (Legacy default configuration). ? ? ? ? ? JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO); ? ? ? ? ? this.getCriticalAnalyzer().add(journal); ? ? ? ? ? return journal; ? ? ? ? } ? ? } ? ? return new NullStorageManager(); ? }? ? //......}
- ActiveMQServerImpl的createStorageManager方法在configuration.isPersistenceEnabled()为true时创建的StorageManager是JDBCJournalStorageManager或者JournalStorageManager;否则创建的StorageManager是NullStorageManager
processRoute
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {? ? //......? ? public void processRoute(final Message message, ? ? ? ? ? ? ? ? ? ? ? ? ? final RoutingContext context, ? ? ? ? ? ? ? ? ? ? ? ? ? final boolean direct) throws Exception { ? ? final List<MessageReference> refs = new ArrayList<>();? ? ? Transaction tx = context.getTransaction();? ? ? Long deliveryTime = message.getScheduledDeliveryTime();? ? ? for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) { ? ? ? ? PagingStore store = pagingManager.getPageStore(entry.getKey());? ? ? ? ? if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) { ? ? ? ? ? if (message.isLargeMessage()) { ? ? ? ? ? ? ? confirmLargeMessageSend(tx, message); ? ? ? ? ? }? ? ? ? ? ? // We need to kick delivery so the Queues may check for the cursors case they are empty ? ? ? ? ? schedulePageDelivery(tx, entry); ? ? ? ? ? continue; ? ? ? ? }? ? ? ? ? for (Queue queue : entry.getValue().getNonDurableQueues()) { ? ? ? ? ? MessageReference reference = MessageReference.Factory.createReference(message, queue);? ? ? ? ? ? if (deliveryTime != null) { ? ? ? ? ? ? ? reference.setScheduledDeliveryTime(deliveryTime); ? ? ? ? ? } ? ? ? ? ? refs.add(reference);? ? ? ? ? ? message.incrementRefCount(); ? ? ? ? }? ? ? ? ? Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();? ? ? ? ? while (iter.hasNext()) { ? ? ? ? ? Queue queue = iter.next();? ? ? ? ? ? MessageReference reference = MessageReference.Factory.createReference(message, queue);? ? ? ? ? ? if (context.isAlreadyAcked(context.getAddress(message), queue)) { ? ? ? ? ? ? ? reference.setAlreadyAcked(); ? ? ? ? ? ? ? if (tx != null) { ? ? ? ? ? ? ? ? queue.acknowledge(tx, reference); ? ? ? ? ? ? ? } ? ? ? ? ? }? ? ? ? ? ? if (deliveryTime != null) { ? ? ? ? ? ? ? reference.setScheduledDeliveryTime(deliveryTime); ? ? ? ? ? } ? ? ? ? ? refs.add(reference);? ? ? ? ? ? if (message.isDurable()) { ? ? ? ? ? ? ? int durableRefCount = message.incrementDurableRefCount();? ? ? ? ? ? ? ? if (durableRefCount == 1) { ? ? ? ? ? ? ? ? if (tx != null) { ? ? ? ? ? ? ? ? ? ? storageManager.storeMessageTransactional(tx.getID(), message); ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? storageManager.storeMessage(message); ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? if (message.isLargeMessage()) { ? ? ? ? ? ? ? ? ? ? confirmLargeMessageSend(tx, message); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? if (tx != null) { ? ? ? ? ? ? ? ? storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());? ? ? ? ? ? ? ? ? tx.setContainsPersistent(); ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext()); ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? if (deliveryTime != null && deliveryTime > 0) { ? ? ? ? ? ? ? ? if (tx != null) { ? ? ? ? ? ? ? ? ? ? storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? storageManager.updateScheduledDeliveryTime(reference); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? } ? ? ? ? ? }? ? ? ? ? ? message.incrementRefCount(); ? ? ? ? } ? ? }? ? ? if (tx != null) { ? ? ? ? tx.addOperation(new AddOperation(refs)); ? ? } else { ? ? ? ? // This will use the same thread if there are no pending operations ? ? ? ? // avoiding a context switch on this case ? ? ? ? storageManager.afterCompleteOperations(new IOCallback() { ? ? ? ? ? @Override ? ? ? ? ? public void onError(final int errorCode, final String errorMessage) { ? ? ? ? ? ? ? ActiveMQServerLogger.LOGGER.ioErrorAddingReferences(errorCode, errorMessage); ? ? ? ? ? }? ? ? ? ? ? @Override ? ? ? ? ? public void done() { ? ? ? ? ? ? ? context.processReferences(refs, direct); ? ? ? ? ? } ? ? ? ? }); ? ? } ? }? ? //......}
- PostOfficeImpl的processRoute方法会判断message.isDurable(),若为true且durableRefCount为1则会执行storageManager.storeMessage或者storageManager.storeMessageTransactional方法
storeMessage
AbstractJournalStorageManager
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {? ? //......? ? protected Journal messageJournal;? ? //......? ? public void storeMessage(final Message message) throws Exception { ? ? if (message.getMessageID() <= 0) { ? ? ? ? // Sanity check only... this shouldn't happen unless there is a bug ? ? ? ? throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); ? ? }? ? ? readLock(); ? ? try { ? ? ? ? // Note that we don't sync, the add reference that comes immediately after will sync if ? ? ? ? // appropriate? ? ? ? ? if (message.isLargeMessage()) { ? ? ? ? ? messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message, false, getContext(false)); ? ? ? ? } else { ? ? ? ? ? messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message, false, getContext(false)); ? ? ? ? } ? ? } finally { ? ? ? ? readUnLock(); ? ? } ? }? ? //......}
- JDBCJournalStorageManager及JournalStorageManager都继承了AbstractJournalStorageManager,其storeMessage方法会调用messageJournal.appendAddRecord方法;二者的messageJournal实现不同,一个是JDBCJournalImpl,一个是JournalImpl
NullStorageManager
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
public class NullStorageManager implements StorageManager {? ? //......? ? public void storeMessage(final Message message) throws Exception { ? }? ? //......}
- NullStorageManager实现了StorageManager接口,其storeMessage为空方法
小结
ConfigurationImpl定义了persistenceEnabled属性,默认为ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled()(true);ActiveMQServerImpl的createStorageManager方法在configuration.isPersistenceEnabled()为true时创建的StorageManager是JDBCJournalStorageManager或者JournalStorageManager;否则创建的StorageManager是NullStorageManager;PostOfficeImpl的processRoute方法会判断message.isDurable(),若为true且durableRefCount为1则会执行storageManager.storeMessage或者storageManager.storeMessageTransactional方法
doc
- ConfigurationImpl
版权声明:
作者: freeclashnode
链接: https://www.freeclashnode.com/news/article-2165.htm
来源: FreeClashNode
文章版权归作者所有,未经允许请勿转载。
热门文章
- 10月12日|20.2M/S,Shadowrocket/V2ray/SSR/Clash免费节点订阅链接每天更新
- 10月11日|22.7M/S,Shadowrocket/Clash/V2ray/SSR免费节点订阅链接每天更新
- 10月9日|19M/S,Clash/V2ray/SSR/Shadowrocket免费节点订阅链接每天更新
- 10月13日|18.6M/S,V2ray/SSR/Shadowrocket/Clash免费节点订阅链接每天更新
- 10月10日|21.9M/S,V2ray/Clash/SSR/Shadowrocket免费节点订阅链接每天更新
- 10月15日|18.5M/S,V2ray/Shadowrocket/SSR/Clash免费节点订阅链接每天更新
- 10月16日|22.5M/S,Clash/V2ray/SSR/Shadowrocket免费节点订阅链接每天更新
- 10月14日|22.5M/S,V2ray/Clash/Shadowrocket/SSR免费节点订阅链接每天更新
- 10月8日|18.9M/S,Clash/SSR/V2ray/Shadowrocket免费节点订阅链接每天更新
- 10月17日|18.2M/S,Clash/SSR/V2ray/Shadowrocket免费节点订阅链接每天更新
最新文章
- 11月3日|18.7M/S,V2ray/Shadowrocket/Clash/SSR免费节点订阅链接每天更新
- 11月2日|20.2M/S,Clash/Shadowrocket/V2ray/SSR免费节点订阅链接每天更新
- 11月1日|19.1M/S,SSR/Clash/Shadowrocket/V2ray免费节点订阅链接每天更新
- 10月31日|22.2M/S,V2ray/SSR/Clash/Shadowrocket免费节点订阅链接每天更新
- 10月30日|22.7M/S,Clash/V2ray/Shadowrocket/SSR免费节点订阅链接每天更新
- 10月29日|21.8M/S,SSR/Shadowrocket/V2ray/Clash免费节点订阅链接每天更新
- 10月28日|20.4M/S,SSR/Clash/Shadowrocket/V2ray免费节点订阅链接每天更新
- 10月27日|19.3M/S,SSR/Clash/V2ray/Shadowrocket免费节点订阅链接每天更新
- 10月26日|19.1M/S,SSR/V2ray/Clash/Shadowrocket免费节点订阅链接每天更新
- 10月25日|18.2M/S,Shadowrocket/SSR/V2ray/Clash免费节点订阅链接每天更新