/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.store.journal;

import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Map;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.activeio.adapter.PacketByteArrayOutputStream;
import org.activeio.adapter.PacketInputStream;
import org.activeio.journal.InvalidRecordLocationException;
import org.activeio.journal.Journal;
import org.activeio.journal.JournalEventListener;
import org.activeio.journal.RecordLocation;
import org.activeio.journal.active.JournalImpl;
import org.activeio.journal.howl.HowlJournal;
import org.activemq.io.WireFormat;
import org.activemq.io.impl.StatelessDefaultWireFormat;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQXid;
import org.activemq.message.MessageAck;
import org.activemq.message.Packet;
import org.activemq.service.MessageIdentity;
import org.activemq.store.MessageStore;
import org.activemq.store.PersistenceAdapter;
import org.activemq.store.TopicMessageStore;
import org.activemq.store.TransactionStore;
import org.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.activemq.store.journal.JournalAck;
import org.activemq.store.journal.JournalMessageStore;
import org.activemq.store.journal.JournalTopicMessageStore;
import org.activemq.store.journal.JournalTransactionStore;
import org.activemq.store.journal.TxCommand;
import org.activemq.util.JMSExceptionHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.objectweb.howl.log.Configuration;

public class JournalPersistenceAdapter
implements PersistenceAdapter,
JournalEventListener {
    private static final Log log = LogFactory.getLog((Class)JournalPersistenceAdapter.class);
    public static final String DEFAULT_JOURNAL_TYPE = "default";
    public static final String HOWL_JOURNAL_TYPE = "howl";
    private Journal journal;
    private String journalType = "default";
    private PersistenceAdapter longTermPersistence;
    private File directory = new File("logs");
    private final StatelessDefaultWireFormat wireFormat = new StatelessDefaultWireFormat();
    private final ConcurrentHashMap messageStores = new ConcurrentHashMap();
    private final ConcurrentHashMap topicMessageStores = new ConcurrentHashMap();
    private static final int PACKET_RECORD_TYPE = 0;
    private static final int COMMAND_RECORD_TYPE = 1;
    private static final int TX_COMMAND_RECORD_TYPE = 2;
    private static final int ACK_RECORD_TYPE = 3;
    private Channel checkpointRequests = new LinkedQueue();
    private QueuedExecutor checkpointExecutor;
    ClockDaemon clockDaemon;
    private Object clockTicket;
    private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
    private int logFileSize = 0x1400000;
    private int logFileCount = 2;
    private long checkpointInterval = 300000L;
    static /* synthetic */ Class class$EDU$oswego$cs$dl$util$concurrent$Latch;

    public JournalPersistenceAdapter() {
        this.checkpointExecutor = new QueuedExecutor((Channel)new LinkedQueue());
        this.checkpointExecutor.setThreadFactory(new ThreadFactory(){

            public Thread newThread(Runnable runnable) {
                Thread answer = new Thread(runnable, "Checkpoint Worker");
                answer.setDaemon(true);
                answer.setPriority(10);
                return answer;
            }
        });
    }

    public JournalPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) throws IOException {
        this();
        this.directory = directory;
        this.longTermPersistence = longTermPersistence;
    }

    public Map getInitialDestinations() {
        return this.longTermPersistence.getInitialDestinations();
    }

    private MessageStore createMessageStore(String destination, boolean isQueue) throws JMSException {
        if (isQueue) {
            return this.createQueueMessageStore(destination);
        }
        return this.createTopicMessageStore(destination);
    }

    public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
        JournalMessageStore store = (JournalMessageStore)this.messageStores.get((Object)destinationName);
        if (store == null) {
            MessageStore checkpointStore = this.longTermPersistence.createQueueMessageStore(destinationName);
            store = new JournalMessageStore(this, checkpointStore, destinationName);
            this.messageStores.put((Object)destinationName, (Object)store);
        }
        return store;
    }

    public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
        JournalTopicMessageStore store = (JournalTopicMessageStore)this.topicMessageStores.get((Object)destinationName);
        if (store == null) {
            TopicMessageStore checkpointStore = this.longTermPersistence.createTopicMessageStore(destinationName);
            store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
            this.topicMessageStores.put((Object)destinationName, (Object)store);
        }
        return store;
    }

    public TransactionStore createTransactionStore() throws JMSException {
        return this.transactionStore;
    }

    public void beginTransaction() throws JMSException {
        this.longTermPersistence.beginTransaction();
    }

    public void commitTransaction() throws JMSException {
        this.longTermPersistence.commitTransaction();
    }

    public void rollbackTransaction() {
        this.longTermPersistence.rollbackTransaction();
    }

    public synchronized void start() throws JMSException {
        if (this.longTermPersistence instanceof JDBCPersistenceAdapter) {
            ((JDBCPersistenceAdapter)this.longTermPersistence).setCleanupPeriod(0);
        }
        this.longTermPersistence.start();
        this.createTransactionStore();
        if (this.journal == null) {
            try {
                log.info((Object)"Opening journal.");
                this.journal = this.createJournal();
                log.info((Object)("Opened journal: " + this.journal));
                this.journal.setJournalEventListener((JournalEventListener)this);
            }
            catch (Exception e) {
                throw JMSExceptionHelper.newJMSException("Failed to open transaction journal: " + e, e);
            }
            try {
                this.recover();
            }
            catch (Exception e) {
                throw JMSExceptionHelper.newJMSException("Failed to recover transactions from journal: " + e, e);
            }
        }
        this.clockTicket = this.getClockDaemon().executePeriodically(this.checkpointInterval, new Runnable(){

            public void run() {
                JournalPersistenceAdapter.this.checkpoint(false);
            }
        }, false);
    }

    public synchronized void stop() throws JMSException {
        if (this.clockTicket != null) {
            ClockDaemon.cancel((Object)this.clockTicket);
            this.clockTicket = null;
            this.clockDaemon.shutDown();
        }
        this.checkpoint(true);
        this.checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
        JMSException firstException = null;
        if (this.journal != null) {
            try {
                this.journal.close();
                this.journal = null;
            }
            catch (Exception e) {
                firstException = JMSExceptionHelper.newJMSException("Failed to close journals: " + e, e);
            }
        }
        this.longTermPersistence.stop();
        if (firstException != null) {
            throw firstException;
        }
    }

    public PersistenceAdapter getLongTermPersistence() {
        return this.longTermPersistence;
    }

    public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
        this.longTermPersistence = longTermPersistence;
    }

    public File getDirectory() {
        return this.directory;
    }

    public void setDirectory(File directory) {
        this.directory = directory;
    }

    public WireFormat getWireFormat() {
        return this.wireFormat;
    }

    public String getJournalType() {
        return this.journalType;
    }

    public void setJournalType(String journalType) {
        this.journalType = journalType;
    }

    protected Journal createJournal() throws IOException {
        if (DEFAULT_JOURNAL_TYPE.equals(this.journalType)) {
            return new JournalImpl(this.directory, this.logFileCount, this.logFileSize);
        }
        if (HOWL_JOURNAL_TYPE.equals(this.journalType)) {
            try {
                Configuration config = new Configuration();
                config.setLogFileDir(this.directory.getCanonicalPath());
                return new HowlJournal(config);
            }
            catch (IOException e) {
                throw e;
            }
            catch (Exception e) {
                throw (IOException)new IOException("Could not open HOWL journal: " + e.getMessage()).initCause(e);
            }
        }
        throw new IllegalStateException("Unsupported valued for journalType attribute: " + this.journalType);
    }

    public void overflowNotification(RecordLocation safeLocation) {
        this.checkpoint(false);
    }

    public void checkpoint(boolean sync) {
        try {
            if (this.journal == null) {
                throw new IllegalStateException("Journal is closed.");
            }
            Latch latch = null;
            if (sync) {
                latch = new Latch();
                this.checkpointRequests.put((Object)latch);
            } else {
                this.checkpointRequests.put((Object)Boolean.TRUE);
            }
            this.checkpointExecutor.execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Unable to fully structure code
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                public void run() {
                    block29: {
                        block28: {
                            listners = new ArrayList<Object>();
                            try {
                                block27: {
                                    try {
                                        requested = false;
                                        while ((t = JournalPersistenceAdapter.access$000(JournalPersistenceAdapter.this).poll(0L)) != null) {
                                            if (t.getClass() == (JournalPersistenceAdapter.class$EDU$oswego$cs$dl$util$concurrent$Latch == null ? JournalPersistenceAdapter.class$("EDU.oswego.cs.dl.util.concurrent.Latch") : JournalPersistenceAdapter.class$EDU$oswego$cs$dl$util$concurrent$Latch)) {
                                                listners.add(t);
                                            }
                                            requested = true;
                                        }
                                        if (requested) break block27;
                                    }
                                    catch (InterruptedException e1) {
                                        var7_7 = null;
                                        iter = listners.iterator();
                                        while (true) {
                                            if (!iter.hasNext()) {
                                                return;
                                            }
                                            latch = (Latch)iter.next();
                                            latch.release();
                                        }
                                    }
                                    var7_6 = null;
                                    iter = listners.iterator();
lbl25:
                                    // 2 sources

                                    while (true) {
                                        if (!iter.hasNext()) {
                                            return;
                                        }
                                        break block28;
                                        break;
                                    }
                                }
                                JournalPersistenceAdapter.access$100().debug((Object)"Checkpoint started.");
                                newMark = null;
                                iterator = JournalPersistenceAdapter.access$200(JournalPersistenceAdapter.this).values().iterator();
                                while (iterator.hasNext()) {
                                    try {
                                        ms = (JournalMessageStore)iterator.next();
                                        mark = ms.checkpoint();
                                        if (mark == null || newMark != null && newMark.compareTo((Object)mark) >= 0) continue;
                                        newMark = mark;
                                    }
                                    catch (Exception e) {
                                        JournalPersistenceAdapter.access$100().error((Object)("Failed to checkpoint a message store: " + e), (Throwable)e);
                                    }
                                }
                                iterator = JournalPersistenceAdapter.access$300(JournalPersistenceAdapter.this).values().iterator();
                                while (iterator.hasNext()) {
                                    try {
                                        ms = (JournalTopicMessageStore)iterator.next();
                                        mark = ms.checkpoint();
                                        if (mark == null || newMark != null && newMark.compareTo((Object)mark) >= 0) continue;
                                        newMark = mark;
                                    }
                                    catch (Exception e) {
                                        JournalPersistenceAdapter.access$100().error((Object)("Failed to checkpoint a message store: " + e), (Throwable)e);
                                    }
                                }
                                try {
                                    if (newMark != null) {
                                        if (JournalPersistenceAdapter.access$100().isDebugEnabled()) {
                                            JournalPersistenceAdapter.access$100().debug((Object)("Marking journal: " + newMark));
                                        }
                                        JournalPersistenceAdapter.access$400(JournalPersistenceAdapter.this).setMark(newMark, true);
                                    }
                                }
                                catch (Exception e) {
                                    JournalPersistenceAdapter.access$100().error((Object)("Failed to mark the Journal: " + e), (Throwable)e);
                                }
                                if (JournalPersistenceAdapter.access$500(JournalPersistenceAdapter.this) instanceof JDBCPersistenceAdapter) {
                                    try {
                                        ((JDBCPersistenceAdapter)JournalPersistenceAdapter.access$500(JournalPersistenceAdapter.this)).cleanup();
                                    }
                                    catch (SQLException sqle) {
                                        JournalPersistenceAdapter.access$100().error((Object)("Cleanup failed due to: " + sqle), (Throwable)sqle);
                                    }
                                }
                                JournalPersistenceAdapter.access$100().debug((Object)"Checkpoint done.");
                                break block29;
                            }
                            catch (Throwable var6_24) {
                                var7_9 = null;
                                iter = listners.iterator();
                                while (true) {
                                    if (!iter.hasNext()) {
                                        throw var6_24;
                                    }
                                    latch = (Latch)iter.next();
                                    latch.release();
                                }
                            }
                        }
                        latch = (Latch)iter.next();
                        latch.release();
                        ** while (true)
                    }
                    var7_8 = null;
                    iter = listners.iterator();
                    while (true) {
                        if (!iter.hasNext()) {
                            return;
                        }
                        latch = (Latch)iter.next();
                        latch.release();
                    }
                }
            });
            if (sync) {
                latch.acquire();
            }
        }
        catch (InterruptedException e) {
            log.warn((Object)("Request to start checkpoint failed: " + e), (Throwable)e);
        }
    }

    public RecordLocation writePacket(String destination, Packet packet, boolean sync) throws JMSException {
        try {
            PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
            DataOutputStream os = new DataOutputStream((OutputStream)pos);
            os.writeByte(0);
            os.writeUTF(destination);
            os.close();
            org.activeio.Packet p = this.wireFormat.writePacket(packet, pos);
            return this.journal.write(p, sync);
        }
        catch (IOException e) {
            throw this.createWriteException(packet, (Exception)e);
        }
    }

    public RecordLocation writeCommand(String command, boolean sync) throws JMSException {
        try {
            PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
            DataOutputStream os = new DataOutputStream((OutputStream)pos);
            os.writeByte(1);
            os.writeUTF(command);
            os.close();
            return this.journal.write(pos.getPacket(), sync);
        }
        catch (IOException e) {
            throw this.createWriteException(command, (Exception)e);
        }
    }

    public Packet readPacket(RecordLocation location) throws JMSException {
        try {
            org.activeio.Packet data = this.journal.read(location);
            DataInputStream is = new DataInputStream((InputStream)new PacketInputStream(data));
            byte type = is.readByte();
            if (type != 0) {
                throw new IOException("Record is not a packet type.");
            }
            String destination = is.readUTF();
            Packet packet = this.wireFormat.readPacket(data);
            is.close();
            return packet;
        }
        catch (InvalidRecordLocationException e) {
            throw this.createReadException(location, (Exception)((Object)e));
        }
        catch (IOException e) {
            throw this.createReadException(location, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException {
        pos = null;
        transactionCounter = 0;
        JournalPersistenceAdapter.log.info((Object)"Journal Recovery Started.");
lbl4:
        // 11 sources

        block17: while ((pos = this.journal.getNextRecordLocation(pos)) != null) {
            data = this.journal.read(pos);
            is = new DataInputStream((InputStream)new PacketInputStream(data));
            destination = null;
            packet = null;
            try {
                type = is.readByte();
                switch (type) {
                    case 0: {
                        destination = is.readUTF();
                        packet = this.wireFormat.readPacket(data);
                        if (packet instanceof ActiveMQMessage) {
                            msg = (ActiveMQMessage)packet;
                            store = (JournalMessageStore)this.createMessageStore(destination, msg.getJMSActiveMQDestination().isQueue());
                            if (msg.getTransactionId() != null) {
                                this.transactionStore.addMessage(store, msg, pos);
                                ** break;
                            }
                            store.replayAddMessage(msg);
                            ++transactionCounter;
                            ** break;
                        }
                        if (packet instanceof MessageAck) {
                            ack = (MessageAck)packet;
                            store = (JournalMessageStore)this.createMessageStore(destination, ack.getDestination().isQueue());
                            if (ack.getTransactionId() != null) {
                                this.transactionStore.removeMessage(store, ack, pos);
                                ** break;
                            }
                            store.replayRemoveMessage(ack);
                            ++transactionCounter;
                            ** break;
                        }
                        JournalPersistenceAdapter.log.error((Object)("Unknown type of packet in transaction log which will be discarded: " + packet));
                        ** break;
                    }
                    case 2: {
                        command = new TxCommand();
                        command.setType(is.readByte());
                        command.setWasPrepared(is.readBoolean());
                        switch (command.getType()) {
                            case 4: 
                            case 5: {
                                command.setTransactionId(is.readUTF());
                                break;
                            }
                            default: {
                                command.setTransactionId(ActiveMQXid.read(is));
                            }
                        }
                        switch (command.getType()) {
                            case 1: {
                                this.transactionStore.replayPrepare(command.getTransactionId());
                                break;
                            }
                            case 2: 
                            case 4: {
                                tx = this.transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
                                if (tx == null) break;
                                tx.getOperations();
                                iter = tx.getOperations().iterator();
                                while (iter.hasNext()) {
                                    op = (JournalTransactionStore.TxOperation)iter.next();
                                    if (op.operationType == 0) {
                                        op.store.replayAddMessage((ActiveMQMessage)op.data);
                                    }
                                    if (op.operationType == 1) {
                                        op.store.replayRemoveMessage((MessageAck)op.data);
                                    }
                                    if (op.operationType != 3) continue;
                                    ack = (JournalAck)op.data;
                                    ((JournalTopicMessageStore)op.store).replayAcknowledge(ack.getSubscription(), new MessageIdentity(ack.getMessageId()));
                                }
                                ++transactionCounter;
                                break;
                            }
                            case 3: 
                            case 5: {
                                this.transactionStore.replayRollback(command.getTransactionId());
                            }
                        }
                        continue block17;
                    }
                    case 3: {
                        destination = is.readUTF();
                        subscription = is.readUTF();
                        messageId = is.readUTF();
                        transactionId = null;
                        store = (JournalTopicMessageStore)this.createMessageStore(destination, false);
                        if (transactionId != null) {
                            ack = new JournalAck(destination, subscription, messageId, transactionId);
                            this.transactionStore.acknowledge(store, ack, pos);
                            ** break;
                        }
                        store.replayAcknowledge(subscription, new MessageIdentity(messageId));
                        ++transactionCounter;
                    }
                    case 1: {
                        ** break;
                    }
                    default: {
                        JournalPersistenceAdapter.log.error((Object)("Unknown type of record in transaction log which will be discarded: " + type));
                        continue block17;
                    }
                }
            }
            finally {
                is.close();
            }
        }
        location = this.writeCommand("RECOVERED", true);
        this.journal.setMark(location, true);
        JournalPersistenceAdapter.log.info((Object)("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."));
    }

    private JMSException createReadException(RecordLocation location, Exception e) {
        return JMSExceptionHelper.newJMSException("Failed to read to journal for: " + location + ". Reason: " + e, e);
    }

    protected JMSException createWriteException(Packet packet, Exception e) {
        return JMSExceptionHelper.newJMSException("Failed to write to journal for: " + packet + ". Reason: " + e, e);
    }

    private XAException createWriteException(TxCommand command, Exception e) {
        return (XAException)new XAException("Failed to write to journal for: " + command + ". Reason: " + e).initCause(e);
    }

    protected JMSException createWriteException(String command, Exception e) {
        return JMSExceptionHelper.newJMSException("Failed to write to journal for command: " + command + ". Reason: " + e, e);
    }

    protected JMSException createRecoveryFailedException(Exception e) {
        return JMSExceptionHelper.newJMSException("Failed to recover from journal. Reason: " + e, e);
    }

    public ClockDaemon getClockDaemon() {
        if (this.clockDaemon == null) {
            this.clockDaemon = new ClockDaemon();
            this.clockDaemon.setThreadFactory(new ThreadFactory(){

                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable, "Checkpoint Timmer");
                    thread.setDaemon(true);
                    return thread;
                }
            });
        }
        return this.clockDaemon;
    }

    public void setClockDaemon(ClockDaemon clockDaemon) {
        this.clockDaemon = clockDaemon;
    }

    public RecordLocation writeTxCommand(TxCommand command, boolean sync) throws XAException {
        try {
            PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
            DataOutputStream os = new DataOutputStream((OutputStream)pos);
            os.writeByte(2);
            os.writeByte(command.getType());
            os.writeBoolean(command.getWasPrepared());
            switch (command.getType()) {
                case 4: 
                case 5: {
                    os.writeUTF((String)command.getTransactionId());
                    break;
                }
                default: {
                    ActiveMQXid xid = (ActiveMQXid)command.getTransactionId();
                    xid.write(os);
                }
            }
            os.close();
            return this.journal.write(pos.getPacket(), sync);
        }
        catch (IOException e) {
            throw this.createWriteException(command, (Exception)e);
        }
    }

    public RecordLocation writePacket(String destinationName, String subscription, MessageIdentity messageIdentity, boolean sync) throws JMSException {
        try {
            PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
            DataOutputStream os = new DataOutputStream((OutputStream)pos);
            os.writeByte(3);
            os.writeUTF(destinationName);
            os.writeUTF(subscription);
            os.writeUTF(messageIdentity.getMessageID());
            os.close();
            return this.journal.write(pos.getPacket(), sync);
        }
        catch (IOException e) {
            throw this.createWriteException("Ack for message: " + messageIdentity, (Exception)e);
        }
    }

    public JournalTransactionStore getTransactionStore() {
        return this.transactionStore;
    }

    public int getLogFileCount() {
        return this.logFileCount;
    }

    public void setLogFileCount(int logFileCount) {
        this.logFileCount = logFileCount;
    }

    public int getLogFileSize() {
        return this.logFileSize;
    }

    public void setLogFileSize(int logFileSize) {
        this.logFileSize = logFileSize;
    }

    public boolean deadLetterAlreadySent(long seq, boolean useLocking) {
        return this.longTermPersistence.deadLetterAlreadySent(seq, useLocking);
    }

    public long getCheckpointInterval() {
        return this.checkpointInterval;
    }

    public void setCheckpointInterval(long checkpointInterval) {
        this.checkpointInterval = checkpointInterval;
    }

    static /* synthetic */ Channel access$000(JournalPersistenceAdapter x0) {
        return x0.checkpointRequests;
    }

    static /* synthetic */ Log access$100() {
        return log;
    }

    static /* synthetic */ ConcurrentHashMap access$200(JournalPersistenceAdapter x0) {
        return x0.messageStores;
    }

    static /* synthetic */ ConcurrentHashMap access$300(JournalPersistenceAdapter x0) {
        return x0.topicMessageStores;
    }

    static /* synthetic */ Journal access$400(JournalPersistenceAdapter x0) {
        return x0.journal;
    }

    static /* synthetic */ PersistenceAdapter access$500(JournalPersistenceAdapter x0) {
        return x0.longTermPersistence;
    }
}

