package org.apache.activemq.store.rapid;

import edu.emory.mathcs.backport.java.util.concurrent.Callable;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller;
import org.apache.activemq.store.kahadaptor.CommandMarshaller;
import org.apache.activemq.store.rapid.RapidTransactionStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-core-4.1.2.jar:org/apache/activemq/store/rapid/RapidPersistenceAdapter.class */
public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener {
    private static final Log log;
    private final Journal journal;
    private ThreadPoolExecutor checkpointExecutor;
    private TaskRunner checkpointTask;
    private boolean fullCheckPoint;
    Store store;
    private boolean useExternalMessageReferences;
    static Class class$org$apache$activemq$store$rapid$RapidPersistenceAdapter;
    private final WireFormat wireFormat = new OpenWireFormat();
    private final ConcurrentHashMap queues = new ConcurrentHashMap();
    private final ConcurrentHashMap topics = new ConcurrentHashMap();
    private long checkpointInterval = 300000;
    private long lastCheckpointRequest = System.currentTimeMillis();
    private int maxCheckpointWorkers = 10;
    private int maxCheckpointMessageAddSize = 5000;
    private RapidTransactionStore transactionStore = new RapidTransactionStore(this);
    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
    private AtomicBoolean started = new AtomicBoolean(false);
    private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();

    final Runnable createPeriodicCheckpointTask() {
        return new Runnable(this) { // from class: org.apache.activemq.store.rapid.RapidPersistenceAdapter.1
            private final RapidPersistenceAdapter this$0;

            {
                this.this$0 = this;
            }

            @Override // java.lang.Runnable
            public void run() {
                if (System.currentTimeMillis() > this.this$0.lastCheckpointRequest + this.this$0.checkpointInterval) {
                    this.this$0.checkpoint(false, true);
                }
            }
        };
    }

    public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException {
        this.journal = journal;
        journal.setJournalEventListener(this);
        this.store = StoreFactory.open(new StringBuffer().append(((JournalImpl) journal).getLogDirectory().getAbsolutePath()).append(File.separator).append("kaha.db").toString(), "rw");
        this.checkpointTask = taskRunnerFactory.createTaskRunner(new Task(this) { // from class: org.apache.activemq.store.rapid.RapidPersistenceAdapter.2
            private final RapidPersistenceAdapter this$0;

            {
                this.this$0 = this;
            }

            @Override // org.apache.activemq.thread.Task
            public boolean iterate() {
                return this.this$0.doCheckpoint();
            }
        }, "ActiveMQ Checkpoint Worker");
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public Set getDestinations() {
        HashSet hashSet = new HashSet();
        try {
            for (Object obj : this.store.getMapContainerIds()) {
                if (obj instanceof ActiveMQDestination) {
                    hashSet.add(obj);
                }
            }
        } catch (IOException e) {
            log.error("Failed to get destinations ", e);
        }
        return hashSet;
    }

    private MessageStore createMessageStore(ActiveMQDestination activeMQDestination) throws IOException {
        return activeMQDestination.isQueue() ? createQueueMessageStore((ActiveMQQueue) activeMQDestination) : createTopicMessageStore((ActiveMQTopic) activeMQDestination);
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public MessageStore createQueueMessageStore(ActiveMQQueue activeMQQueue) throws IOException {
        RapidMessageStore rapidMessageStore = (RapidMessageStore) this.queues.get(activeMQQueue);
        if (rapidMessageStore == null) {
            rapidMessageStore = new RapidMessageStore(this, activeMQQueue, getMapContainer(activeMQQueue, "topic-data"));
            this.queues.put(activeMQQueue, rapidMessageStore);
        }
        return rapidMessageStore;
    }

    protected MapContainer getMapContainer(Object obj, String str) throws IOException {
        MapContainer mapContainer = this.store.getMapContainer(obj, str);
        mapContainer.setKeyMarshaller(new StringMarshaller());
        if (this.useExternalMessageReferences) {
            mapContainer.setValueMarshaller(new StringMarshaller());
        } else {
            mapContainer.setValueMarshaller(new CommandMarshaller(this.wireFormat));
        }
        mapContainer.load();
        return mapContainer;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public TopicMessageStore createTopicMessageStore(ActiveMQTopic activeMQTopic) throws IOException {
        RapidTopicMessageStore rapidTopicMessageStore = (RapidTopicMessageStore) this.topics.get(activeMQTopic);
        if (rapidTopicMessageStore == null) {
            MapContainer mapContainer = getMapContainer(activeMQTopic, "topic-data");
            MapContainer mapContainer2 = getMapContainer(new StringBuffer().append(activeMQTopic.toString()).append("-subscriptions").toString(), "topic-subs");
            MapContainer mapContainer3 = this.store.getMapContainer(activeMQTopic.toString(), "topic-acks");
            mapContainer3.setKeyMarshaller(new StringMarshaller());
            mapContainer3.setValueMarshaller(new AtomicIntegerMarshaller());
            rapidTopicMessageStore = new RapidTopicMessageStore(this, activeMQTopic, mapContainer, mapContainer2, mapContainer3);
            this.topics.put(activeMQTopic, rapidTopicMessageStore);
        }
        return rapidTopicMessageStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public TransactionStore createTransactionStore() throws IOException {
        return this.transactionStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long getLastMessageBrokerSequenceId() throws IOException {
        return 0L;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void beginTransaction(ConnectionContext connectionContext) throws IOException {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void commitTransaction(ConnectionContext connectionContext) throws IOException {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void rollbackTransaction(ConnectionContext connectionContext) throws IOException {
    }

    @Override // org.apache.activemq.Service
    public synchronized void start() throws Exception {
        if (this.started.compareAndSet(false, true)) {
            this.checkpointExecutor = new ThreadPoolExecutor(this.maxCheckpointWorkers, this.maxCheckpointWorkers, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory(this) { // from class: org.apache.activemq.store.rapid.RapidPersistenceAdapter.3
                private final RapidPersistenceAdapter this$0;

                {
                    this.this$0 = this;
                }

                @Override // edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable, "Journal checkpoint worker");
                    thread.setPriority(7);
                    return thread;
                }
            });
            this.checkpointExecutor.allowCoreThreadTimeOut(true);
            createTransactionStore();
            recover();
            Scheduler.executePeriodically(this.periodicCheckpointTask, this.checkpointInterval / 10);
        }
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        if (this.started.compareAndSet(true, false)) {
            Scheduler.cancel(this.periodicCheckpointTask);
            checkpoint(false, true);
            this.checkpointTask.shutdown();
            this.checkpointExecutor.shutdown();
            this.queues.clear();
            this.topics.clear();
            IOException iOException = null;
            try {
                this.journal.close();
            } catch (Exception e) {
                iOException = IOExceptionSupport.create(new StringBuffer().append("Failed to close journals: ").append(e).toString(), e);
            }
            this.store.close();
            if (iOException != null) {
                throw iOException;
            }
        }
    }

    public WireFormat getWireFormat() {
        return this.wireFormat;
    }

    @Override // org.apache.activeio.journal.JournalEventListener
    public void overflowNotification(RecordLocation recordLocation) {
        checkpoint(false, true);
    }

    public void checkpoint(boolean z, boolean z2) {
        CountDownLatch countDownLatch;
        try {
            if (this.journal == null) {
                throw new IllegalStateException("Journal is closed.");
            }
            long currentTimeMillis = System.currentTimeMillis();
            synchronized (this) {
                countDownLatch = this.nextCheckpointCountDownLatch;
                this.lastCheckpointRequest = currentTimeMillis;
                if (z2) {
                    this.fullCheckPoint = true;
                }
            }
            this.checkpointTask.wakeup();
            if (z) {
                log.debug("Waking for checkpoint to complete.");
                countDownLatch.await();
            }
        } catch (InterruptedException e) {
            log.warn(new StringBuffer().append("Request to start checkpoint failed: ").append(e).toString(), e);
        }
    }

    public boolean doCheckpoint() {
        CountDownLatch countDownLatch;
        boolean z;
        boolean z2;
        synchronized (this) {
            countDownLatch = this.nextCheckpointCountDownLatch;
            this.nextCheckpointCountDownLatch = new CountDownLatch(1);
            z = this.fullCheckPoint;
            this.fullCheckPoint = false;
        }
        try {
            log.debug("Checkpoint started.");
            RecordLocation recordLocation = null;
            ArrayList arrayList = new ArrayList(this.queues.size() + this.topics.size());
            if (z) {
                Iterator it = this.queues.values().iterator();
                while (it.hasNext()) {
                    try {
                        FutureTask futureTask = new FutureTask(new Callable(this, (RapidMessageStore) it.next()) { // from class: org.apache.activemq.store.rapid.RapidPersistenceAdapter.4
                            private final RapidMessageStore val$ms;
                            private final RapidPersistenceAdapter this$0;

                            {
                                this.this$0 = this;
                                this.val$ms = r5;
                            }

                            @Override // edu.emory.mathcs.backport.java.util.concurrent.Callable
                            public Object call() throws Exception {
                                return this.val$ms.checkpoint();
                            }
                        });
                        arrayList.add(futureTask);
                        this.checkpointExecutor.execute(futureTask);
                    } catch (Exception e) {
                        log.error(new StringBuffer().append("Failed to checkpoint a message store: ").append(e).toString(), e);
                    }
                }
            }
            Iterator it2 = this.topics.values().iterator();
            while (it2.hasNext()) {
                try {
                    FutureTask futureTask2 = new FutureTask(new Callable(this, (RapidTopicMessageStore) it2.next()) { // from class: org.apache.activemq.store.rapid.RapidPersistenceAdapter.5
                        private final RapidTopicMessageStore val$ms;
                        private final RapidPersistenceAdapter this$0;

                        {
                            this.this$0 = this;
                            this.val$ms = r5;
                        }

                        @Override // edu.emory.mathcs.backport.java.util.concurrent.Callable
                        public Object call() throws Exception {
                            return this.val$ms.checkpoint();
                        }
                    });
                    arrayList.add(futureTask2);
                    this.checkpointExecutor.execute(futureTask2);
                } catch (Exception e2) {
                    log.error(new StringBuffer().append("Failed to checkpoint a message store: ").append(e2).toString(), e2);
                }
            }
            try {
                Iterator it3 = arrayList.iterator();
                while (it3.hasNext()) {
                    RecordLocation recordLocation2 = (RecordLocation) ((FutureTask) it3.next()).get();
                    if (z && recordLocation2 != null && (recordLocation == null || recordLocation.compareTo(recordLocation2) < 0)) {
                        recordLocation = recordLocation2;
                    }
                }
            } catch (Throwable th) {
                log.error(new StringBuffer().append("Failed to checkpoint a message store: ").append(th).toString(), th);
            }
            if (z) {
                if (recordLocation != null) {
                    try {
                        log.debug(new StringBuffer().append("Marking journal at: ").append(recordLocation).toString());
                        this.journal.setMark(recordLocation, true);
                    } catch (Exception e3) {
                        log.error(new StringBuffer().append("Failed to mark the Journal: ").append(e3).toString(), e3);
                    }
                }
            }
            log.debug("Checkpoint done.");
            countDownLatch.countDown();
            synchronized (this) {
                z2 = this.fullCheckPoint;
            }
            return z2;
        } catch (Throwable th2) {
            countDownLatch.countDown();
            throw th2;
        }
    }

    public DataStructure readCommand(RecordLocation recordLocation) throws IOException {
        try {
            return (DataStructure) this.wireFormat.unmarshal(toByteSequence(this.journal.read(recordLocation)));
        } catch (IOException e) {
            throw createReadException(recordLocation, e);
        } catch (InvalidRecordLocationException e2) {
            throw createReadException(recordLocation, e2);
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:23:0x0146. Please report as an issue. */
    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
        RecordLocation recordLocation = null;
        int i = 0;
        log.info("Journal Recovery Started.");
        ConnectionContext connectionContext = new ConnectionContext();
        while (true) {
            RecordLocation nextRecordLocation = this.journal.getNextRecordLocation(recordLocation);
            recordLocation = nextRecordLocation;
            if (nextRecordLocation == null) {
                this.journal.setMark(writeTraceMessage("RECOVERED", true), true);
                log.info(new StringBuffer().append("Journal Recovered: ").append(i).append(" message(s) in transactions recovered.").toString());
                return;
            }
            DataStructure dataStructure = (DataStructure) this.wireFormat.unmarshal(toByteSequence(this.journal.read(recordLocation)));
            if (!(dataStructure instanceof Message)) {
                switch (dataStructure.getDataStructureType()) {
                    case 50:
                        JournalTopicAck journalTopicAck = (JournalTopicAck) dataStructure;
                        RapidTopicMessageStore rapidTopicMessageStore = (RapidTopicMessageStore) createMessageStore(journalTopicAck.getDestination());
                        if (journalTopicAck.getTransactionId() == null) {
                            rapidTopicMessageStore.replayAcknowledge(connectionContext, journalTopicAck.getClientId(), journalTopicAck.getSubscritionName(), journalTopicAck.getMessageId());
                            i++;
                            break;
                        } else {
                            this.transactionStore.acknowledge(rapidTopicMessageStore, journalTopicAck, recordLocation);
                            break;
                        }
                    case 51:
                    default:
                        log.error(new StringBuffer().append("Unknown type of record in transaction log which will be discarded: ").append(dataStructure).toString());
                        break;
                    case 52:
                        JournalQueueAck journalQueueAck = (JournalQueueAck) dataStructure;
                        RapidMessageStore rapidMessageStore = (RapidMessageStore) createMessageStore(journalQueueAck.getDestination());
                        if (!journalQueueAck.getMessageAck().isInTransaction()) {
                            rapidMessageStore.replayRemoveMessage(connectionContext, journalQueueAck.getMessageAck());
                            i++;
                            break;
                        } else {
                            this.transactionStore.removeMessage(rapidMessageStore, journalQueueAck.getMessageAck(), recordLocation);
                            break;
                        }
                    case 53:
                        log.debug(new StringBuffer().append("TRACE Entry: ").append(((JournalTrace) dataStructure).getMessage()).toString());
                        break;
                    case 54:
                        JournalTransaction journalTransaction = (JournalTransaction) dataStructure;
                        try {
                            switch (journalTransaction.getType()) {
                                case 1:
                                    this.transactionStore.replayPrepare(journalTransaction.getTransactionId());
                                    break;
                                case 2:
                                case 4:
                                    RapidTransactionStore.Tx replayCommit = this.transactionStore.replayCommit(journalTransaction.getTransactionId(), journalTransaction.getWasPrepared());
                                    if (replayCommit != null) {
                                        Iterator it = replayCommit.getOperations().iterator();
                                        while (it.hasNext()) {
                                            RapidTransactionStore.TxOperation txOperation = (RapidTransactionStore.TxOperation) it.next();
                                            if (txOperation.operationType == 0) {
                                                txOperation.store.replayAddMessage(connectionContext, (Message) txOperation.data, txOperation.location);
                                            }
                                            if (txOperation.operationType == 1) {
                                                txOperation.store.replayRemoveMessage(connectionContext, (MessageAck) txOperation.data);
                                            }
                                            if (txOperation.operationType == 3) {
                                                JournalTopicAck journalTopicAck2 = (JournalTopicAck) txOperation.data;
                                                ((RapidTopicMessageStore) txOperation.store).replayAcknowledge(connectionContext, journalTopicAck2.getClientId(), journalTopicAck2.getSubscritionName(), journalTopicAck2.getMessageId());
                                            }
                                        }
                                        i++;
                                    }
                                    break;
                                case 3:
                                case 5:
                                    this.transactionStore.replayRollback(journalTransaction.getTransactionId());
                                    break;
                            }
                        } catch (IOException e) {
                            log.error(new StringBuffer().append("Recovery Failure: Could not replay: ").append(dataStructure).append(", reason: ").append(e).toString(), e);
                            break;
                        }
                        break;
                }
            } else {
                Message message = (Message) dataStructure;
                RapidMessageStore rapidMessageStore2 = (RapidMessageStore) createMessageStore(message.getDestination());
                if (message.isInTransaction()) {
                    this.transactionStore.addMessage(rapidMessageStore2, message, recordLocation);
                } else {
                    rapidMessageStore2.replayAddMessage(connectionContext, message, recordLocation);
                    i++;
                }
            }
        }
    }

    private IOException createReadException(RecordLocation recordLocation, Exception exc) {
        return IOExceptionSupport.create(new StringBuffer().append("Failed to read to journal for: ").append(recordLocation).append(". Reason: ").append(exc).toString(), exc);
    }

    protected IOException createWriteException(DataStructure dataStructure, Exception exc) {
        return IOExceptionSupport.create(new StringBuffer().append("Failed to write to journal for: ").append(dataStructure).append(". Reason: ").append(exc).toString(), exc);
    }

    protected IOException createWriteException(String str, Exception exc) {
        return IOExceptionSupport.create(new StringBuffer().append("Failed to write to journal for command: ").append(str).append(". Reason: ").append(exc).toString(), exc);
    }

    protected IOException createRecoveryFailedException(Exception exc) {
        return IOExceptionSupport.create(new StringBuffer().append("Failed to recover from journal. Reason: ").append(exc).toString(), exc);
    }

    public RecordLocation writeCommand(DataStructure dataStructure, boolean z) throws IOException {
        if (this.started.get()) {
            return this.journal.write(toPacket(this.wireFormat.marshal(dataStructure)), z);
        }
        throw new IOException("closed");
    }

    private RecordLocation writeTraceMessage(String str, boolean z) throws IOException {
        JournalTrace journalTrace = new JournalTrace();
        journalTrace.setMessage(str);
        return writeCommand(journalTrace, z);
    }

    @Override // org.apache.activemq.memory.UsageListener
    public void onMemoryUseChanged(UsageManager usageManager, int i, int i2) {
        if (i2 <= 80 || i >= i2) {
            return;
        }
        checkpoint(false, true);
    }

    public RapidTransactionStore getTransactionStore() {
        return this.transactionStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void deleteAllMessages() throws IOException {
        try {
            JournalTrace journalTrace = new JournalTrace();
            journalTrace.setMessage("DELETED");
            this.journal.setMark(this.journal.write(toPacket(this.wireFormat.marshal(journalTrace)), false), true);
            log.info("Journal deleted: ");
            if (this.store != null) {
                this.store.delete();
            }
        } catch (IOException e) {
            throw e;
        } catch (Throwable th) {
            throw IOExceptionSupport.create(th);
        }
    }

    public int getMaxCheckpointMessageAddSize() {
        return this.maxCheckpointMessageAddSize;
    }

    public void setMaxCheckpointMessageAddSize(int i) {
        this.maxCheckpointMessageAddSize = i;
    }

    public int getMaxCheckpointWorkers() {
        return this.maxCheckpointWorkers;
    }

    public void setMaxCheckpointWorkers(int i) {
        this.maxCheckpointWorkers = i;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public boolean isUseExternalMessageReferences() {
        return false;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setUseExternalMessageReferences(boolean z) {
        if (z) {
            throw new IllegalArgumentException("The journal does not support message references.");
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setUsageManager(UsageManager usageManager) {
    }

    public Store getStore() {
        return this.store;
    }

    public Packet toPacket(ByteSequence byteSequence) {
        return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(byteSequence.data, byteSequence.offset, byteSequence.length));
    }

    public ByteSequence toByteSequence(Packet packet) {
        org.apache.activeio.packet.ByteSequence asByteSequence = packet.asByteSequence();
        return new ByteSequence(asByteSequence.getData(), asByteSequence.getOffset(), asByteSequence.getLength());
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$store$rapid$RapidPersistenceAdapter == null) {
            cls = class$("org.apache.activemq.store.rapid.RapidPersistenceAdapter");
            class$org$apache$activemq$store$rapid$RapidPersistenceAdapter = cls;
        } else {
            cls = class$org$apache$activemq$store$rapid$RapidPersistenceAdapter;
        }
        log = LogFactory.getLog(cls);
    }
}
