/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.replication;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.FileWrapperJournal;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.Page;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.LargeServerMessageInSync;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeginMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.hornetq.core.replication.ReplicatedLargeMessage;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQMessageBundle;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.server.impl.QuorumManager;

public final class ReplicationEndpoint
implements ChannelHandler,
HornetQComponent {
    private static final boolean trace = HornetQServerLogger.LOGGER.isTraceEnabled();
    private final IOCriticalErrorListener criticalErrorListener;
    private final HornetQServerImpl server;
    private final boolean wantedFailBack;
    private Channel channel;
    private Journal[] journals;
    private final JournalLoadInformation[] journalLoadInformation = new JournalLoadInformation[2];
    private final Map<JournalStorageManager.JournalContent, Map<Long, JournalSyncFile>> filesReservedForSync = new HashMap<JournalStorageManager.JournalContent, Map<Long, JournalSyncFile>>();
    private Map<JournalStorageManager.JournalContent, Journal> journalsHolder = new HashMap<JournalStorageManager.JournalContent, Journal>();
    private StorageManager storage;
    private PagingManager pageManager;
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
    private final ConcurrentMap<Long, ReplicatedLargeMessage> largeMessages = new ConcurrentHashMap<Long, ReplicatedLargeMessage>();
    private boolean deletePages = true;
    private boolean started;
    private QuorumManager quorumManager;

    public ReplicationEndpoint(HornetQServerImpl server, IOCriticalErrorListener criticalErrorListener, boolean wantedFailBack) {
        this.server = server;
        this.criticalErrorListener = criticalErrorListener;
        this.wantedFailBack = wantedFailBack;
    }

    public synchronized void registerJournal(byte id, Journal journal) {
        if (this.journals == null || id >= this.journals.length) {
            Journal[] oldJournals = this.journals;
            this.journals = new Journal[id + 1];
            if (oldJournals != null) {
                for (int i = 0; i < oldJournals.length; ++i) {
                    this.journals[i] = oldJournals[i];
                }
            }
        }
        this.journals[id] = journal;
    }

    public void handlePacket(Packet packet) {
        ReplicationResponseMessage response = new ReplicationResponseMessage();
        byte type = packet.getType();
        try {
            if (!this.started) {
                return;
            }
            if (type == 91) {
                this.handleAppendAddRecord((ReplicationAddMessage)packet);
            } else if (type == 92) {
                this.handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
            } else if (type == 93) {
                this.handleAppendDelete((ReplicationDeleteMessage)packet);
            } else if (type == 94) {
                this.handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
            } else if (type == 95) {
                this.handlePrepare((ReplicationPrepareMessage)packet);
            } else if (type == 96) {
                this.handleCommitRollback((ReplicationCommitMessage)packet);
            } else if (type == 97) {
                this.handlePageWrite((ReplicationPageWriteMessage)packet);
            } else if (type == 98) {
                this.handlePageEvent((ReplicationPageEventMessage)packet);
            } else if (type == 99) {
                this.handleLargeMessageBegin((ReplicationLargeMessageBeginMessage)packet);
            } else if (type == 101) {
                this.handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
            } else if (type == 100) {
                this.handleLargeMessageEnd((ReplicationLargeMessageEndMessage)packet);
            } else if (type == 102) {
                this.handleCompareDataMessage((ReplicationCompareDataMessage)packet);
                response = new NullResponseMessage();
            } else if (type == 120) {
                this.handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
            } else if (type == 103) {
                this.handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
            } else if (type == 121) {
                this.handleLiveStopping();
            } else if (type == 116) {
                this.handleFatalError((BackupReplicationStartFailedMessage)packet);
            } else {
                HornetQServerLogger.LOGGER.invalidPacketForReplication(packet);
            }
        }
        catch (HornetQException e) {
            HornetQServerLogger.LOGGER.errorHandlingReplicationPacket((Exception)((Object)e), packet);
            response = new HornetQExceptionMessage(e);
        }
        catch (Exception e) {
            HornetQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
            response = new HornetQExceptionMessage((HornetQException)HornetQMessageBundle.BUNDLE.replicationUnhandledError(e));
        }
        this.channel.send((Packet)response);
    }

    private void handleFatalError(BackupReplicationStartFailedMessage packet) {
        HornetQServerLogger.LOGGER.errorStartingReplication(packet.getRegistrationProblem());
        this.server.stopTheServer();
    }

    private void handleLiveStopping() throws HornetQException {
        this.server.remoteFailOver();
    }

    public boolean isStarted() {
        return this.started;
    }

    public synchronized void start() throws Exception {
        block3: {
            Configuration config = this.server.getConfiguration();
            try {
                this.storage = this.server.getStorageManager();
                this.storage.start();
                this.server.getManagementService().setStorageManager(this.storage);
                this.journalsHolder.put(JournalStorageManager.JournalContent.BINDINGS, this.storage.getBindingsJournal());
                this.journalsHolder.put(JournalStorageManager.JournalContent.MESSAGES, this.storage.getMessageJournal());
                for (JournalStorageManager.JournalContent jc : EnumSet.allOf(JournalStorageManager.JournalContent.class)) {
                    this.filesReservedForSync.put(jc, new HashMap());
                    this.journalLoadInformation[jc.typeByte] = this.journalsHolder.get((Object)jc).loadSyncOnly(Journal.JournalState.SYNCING);
                }
                this.pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(config.getPagingDirectory(), config.getJournalBufferSize_NIO(), this.server.getScheduledPool(), this.server.getExecutorFactory(), config.isJournalSyncNonTransactional(), this.criticalErrorListener), this.storage, this.server.getAddressSettingsRepository());
                this.pageManager.start();
                this.started = true;
            }
            catch (Exception e) {
                if (!this.server.isStarted()) break block3;
                throw e;
            }
        }
    }

    public synchronized void stop() throws Exception {
        if (!this.started) {
            return;
        }
        if (this.channel != null) {
            this.channel.close();
        }
        for (ConcurrentMap concurrentMap : this.pageIndex.values()) {
            for (Page page : concurrentMap.values()) {
                try {
                    page.close();
                }
                catch (Exception e) {
                    HornetQServerLogger.LOGGER.errorClosingPageOnReplication(e);
                }
            }
        }
        this.pageIndex.clear();
        for (ReplicatedLargeMessage replicatedLargeMessage : this.largeMessages.values()) {
            replicatedLargeMessage.releaseResources();
        }
        this.largeMessages.clear();
        for (Map.Entry entry : this.filesReservedForSync.entrySet()) {
            for (JournalSyncFile filesReserved : ((Map)entry.getValue()).values()) {
                filesReserved.close();
            }
        }
        this.filesReservedForSync.clear();
        if (this.journals != null) {
            for (Journal j : this.journals) {
                if (!(j instanceof FileWrapperJournal)) continue;
                j.stop();
            }
        }
        this.pageManager.stop();
        this.storage.stop();
        this.started = false;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    public void compareJournalInformation(JournalLoadInformation[] journalInformation) throws HornetQException {
        if (!this.server.isRemoteBackupUpToDate()) {
            throw HornetQMessageBundle.BUNDLE.journalsNotInSync();
        }
        if (this.journalLoadInformation == null || this.journalLoadInformation.length != journalInformation.length) {
            throw HornetQMessageBundle.BUNDLE.replicationTooManyJournals();
        }
        for (int i = 0; i < journalInformation.length; ++i) {
            if (journalInformation[i].equals((Object)this.journalLoadInformation[i])) continue;
            HornetQServerLogger.LOGGER.journalcomparisonMismatch(this.journalParametersToString(journalInformation));
            throw HornetQMessageBundle.BUNDLE.replicationTooManyJournals();
        }
    }

    public void setDeletePages(boolean deletePages) {
        this.deletePages = deletePages;
    }

    private String journalParametersToString(JournalLoadInformation[] journalInformation) {
        return "**********************************************************\nparameters:\nBindings = " + journalInformation[0] + "\n" + "Messaging = " + journalInformation[1] + "\n" + "**********************************************************" + "\n" + "Expected:" + "\n" + "Bindings = " + this.journalLoadInformation[0] + "\n" + "Messaging = " + this.journalLoadInformation[1] + "\n" + "**********************************************************";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void finishSynchronization(String liveID) throws Exception {
        for (JournalStorageManager.JournalContent jc : EnumSet.allOf(JournalStorageManager.JournalContent.class)) {
            Journal journal = this.journalsHolder.remove((Object)jc);
            journal.synchronizationLock();
            try {
                this.filesReservedForSync.remove((Object)jc);
                this.registerJournal(jc.typeByte, journal);
                journal.stop();
                journal.start();
                journal.loadSyncOnly(Journal.JournalState.SYNCING_UP_TO_DATE);
            }
            finally {
                journal.synchronizationUnlock();
            }
        }
        ByteBuffer buffer = ByteBuffer.allocate(4096);
        for (Map.Entry entry : this.largeMessages.entrySet()) {
            ReplicatedLargeMessage lm = (ReplicatedLargeMessage)entry.getValue();
            if (!(lm instanceof LargeServerMessageInSync)) continue;
            LargeServerMessageInSync lmSync = (LargeServerMessageInSync)lm;
            lmSync.joinSyncedData(buffer);
        }
        this.journalsHolder = null;
        this.quorumManager.setLiveID(liveID);
        this.server.setRemoteBackupUpToDate();
        HornetQServerLogger.LOGGER.backupServerSynched(this.server);
    }

    private synchronized void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
        SequentialFile channel;
        Long id = msg.getId();
        byte[] data = msg.getData();
        switch (msg.getFileType()) {
            case LARGE_MESSAGE: {
                ReplicatedLargeMessage largeMessage = this.lookupLargeMessage(id, false);
                if (!(largeMessage instanceof LargeServerMessageInSync)) {
                    HornetQServerLogger.LOGGER.largeMessageIncomatible();
                    return;
                }
                LargeServerMessageInSync largeMessageInSync = (LargeServerMessageInSync)largeMessage;
                channel = largeMessageInSync.getSyncFile();
                break;
            }
            case PAGE: {
                Page page = this.getPage(msg.getPageStore(), (int)msg.getId());
                channel = page.getFile();
                break;
            }
            case JOURNAL: {
                JournalSyncFile journalSyncFile = this.filesReservedForSync.get((Object)msg.getJournalContent()).get(id);
                FileChannel channel2 = journalSyncFile.getChannel();
                if (data == null) {
                    channel2.close();
                    return;
                }
                channel2.write(ByteBuffer.wrap(data));
                return;
            }
            default: {
                throw HornetQMessageBundle.BUNDLE.replicationUnhandledFileType(msg.getFileType());
            }
        }
        if (data == null) {
            channel.close();
            return;
        }
        if (!channel.isOpen()) {
            channel.open(1, false);
        }
        channel.writeDirect(ByteBuffer.wrap(data), true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStartReplicationSynchronization(ReplicationStartSyncMessage packet) throws Exception {
        if (this.server.isRemoteBackupUpToDate()) {
            throw HornetQMessageBundle.BUNDLE.replicationBackupUpToDate();
        }
        ReplicationEndpoint replicationEndpoint = this;
        synchronized (replicationEndpoint) {
            if (!this.started) {
                return;
            }
            if (packet.isSynchronizationFinished()) {
                this.finishSynchronization(packet.getNodeID());
                return;
            }
            switch (packet.getDataType()) {
                case LargeMessages: {
                    for (long msgID : packet.getFileIds()) {
                        this.createLargeMessage(msgID, true);
                    }
                    break;
                }
                case JournalBindings: 
                case JournalMessages: {
                    if (this.wantedFailBack && !packet.isServerToFailBack()) {
                        HornetQServerLogger.LOGGER.autoFailBackDenied();
                    }
                    JournalStorageManager.JournalContent journalContent = ReplicationStartSyncMessage.SyncDataType.getJournalContentType(packet.getDataType());
                    Journal journal = this.journalsHolder.get((Object)journalContent);
                    if (packet.getNodeID() != null) {
                        this.quorumManager.setLiveID(packet.getNodeID());
                    }
                    Map<Long, JournalSyncFile> mapToFill = this.filesReservedForSync.get((Object)journalContent);
                    for (Map.Entry entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
                        mapToFill.put((Long)entry.getKey(), new JournalSyncFile((JournalFile)entry.getValue()));
                    }
                    FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
                    this.registerJournal(journalContent.typeByte, (Journal)syncJournal);
                    break;
                }
                default: {
                    throw HornetQMessageBundle.BUNDLE.replicationUnhandledDataType();
                }
            }
        }
    }

    private void handleLargeMessageEnd(ReplicationLargeMessageEndMessage packet) {
        ReplicatedLargeMessage message = this.lookupLargeMessage(packet.getMessageId(), true);
        if (message != null) {
            try {
                message.deleteFile();
            }
            catch (Exception e) {
                HornetQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
            }
        }
    }

    private void handleLargeMessageWrite(ReplicationLargeMessageWriteMessage packet) throws Exception {
        ReplicatedLargeMessage message = this.lookupLargeMessage(packet.getMessageId(), false);
        if (message != null) {
            message.addBytes(packet.getBody());
        }
    }

    private void handleCompareDataMessage(ReplicationCompareDataMessage request) throws HornetQException {
        this.compareJournalInformation(request.getJournalInformation());
    }

    private ReplicatedLargeMessage lookupLargeMessage(long messageId, boolean delete) {
        ReplicatedLargeMessage message = delete ? (ReplicatedLargeMessage)this.largeMessages.remove(messageId) : (ReplicatedLargeMessage)this.largeMessages.get(messageId);
        if (message == null) {
            HornetQServerLogger.LOGGER.largeMessageNotAvailable(messageId);
        }
        return message;
    }

    private void handleLargeMessageBegin(ReplicationLargeMessageBeginMessage packet) {
        long id = packet.getMessageId();
        this.createLargeMessage(id, false);
        HornetQServerLogger.LOGGER.trace("Receiving Large Message " + id + " on backup");
    }

    private void createLargeMessage(long id, boolean sync) {
        ReplicatedLargeMessage msg = sync ? new LargeServerMessageInSync(this.storage) : this.storage.createLargeMessage();
        msg.setDurable(true);
        msg.setMessageID(id);
        this.largeMessages.put(id, msg);
    }

    private void handleCommitRollback(ReplicationCommitMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        if (packet.isRollback()) {
            journalToUse.appendRollbackRecord(packet.getTxId(), packet.getSync());
        } else {
            journalToUse.appendCommitRecord(packet.getTxId(), packet.getSync());
        }
    }

    private void handlePrepare(ReplicationPrepareMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        journalToUse.appendPrepareRecord(packet.getTxId(), packet.getRecordData(), false);
    }

    private void handleAppendDeleteTX(ReplicationDeleteTXMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        journalToUse.appendDeleteRecordTransactional(packet.getTxId(), packet.getId(), packet.getRecordData());
    }

    private void handleAppendDelete(ReplicationDeleteMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        journalToUse.appendDeleteRecord(packet.getId(), false);
    }

    private void handleAppendAddTXRecord(ReplicationAddTXMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        if (packet.isUpdate()) {
            journalToUse.appendUpdateRecordTransactional(packet.getTxId(), packet.getId(), packet.getRecordType(), packet.getRecordData());
        } else {
            journalToUse.appendAddRecordTransactional(packet.getTxId(), packet.getId(), packet.getRecordType(), packet.getRecordData());
        }
    }

    private void handleAppendAddRecord(ReplicationAddMessage packet) throws Exception {
        Journal journalToUse = this.getJournal(packet.getJournalID());
        if (packet.isUpdate()) {
            if (trace) {
                HornetQServerLogger.LOGGER.trace("Endpoint appendUpdate id = " + packet.getId());
            }
            journalToUse.appendUpdateRecord(packet.getId(), packet.getRecordType(), packet.getRecordData(), false);
        } else {
            if (trace) {
                HornetQServerLogger.LOGGER.trace("Endpoint append id = " + packet.getId());
            }
            journalToUse.appendAddRecord(packet.getId(), packet.getRecordType(), packet.getRecordData(), false);
        }
    }

    private void handlePageEvent(ReplicationPageEventMessage packet) throws Exception {
        ConcurrentMap<Integer, Page> pages = this.getPageMap(packet.getStoreName());
        Page page = (Page)pages.remove(packet.getPageNumber());
        if (page == null) {
            page = this.getPage(packet.getStoreName(), packet.getPageNumber());
        }
        if (page != null) {
            if (packet.isDelete()) {
                if (this.deletePages) {
                    page.delete(null);
                }
            } else {
                page.close();
            }
        }
    }

    private void handlePageWrite(ReplicationPageWriteMessage packet) throws Exception {
        PagedMessage pgdMessage = packet.getPagedMessage();
        pgdMessage.initMessage(this.storage);
        ServerMessage msg = pgdMessage.getMessage();
        Page page = this.getPage(msg.getAddress(), packet.getPageNumber());
        page.write(pgdMessage);
    }

    private ConcurrentMap<Integer, Page> getPageMap(SimpleString storeName) {
        ConcurrentMap mapResult;
        ConcurrentMap resultIndex = (ConcurrentHashMap)this.pageIndex.get(storeName);
        if (resultIndex == null && (mapResult = (ConcurrentMap)this.pageIndex.putIfAbsent(storeName, resultIndex = new ConcurrentHashMap())) != null) {
            resultIndex = mapResult;
        }
        return resultIndex;
    }

    private Page getPage(SimpleString storeName, int pageId) throws Exception {
        ConcurrentMap<Integer, Page> map = this.getPageMap(storeName);
        Page page = (Page)map.get(pageId);
        if (page == null) {
            page = this.newPage(pageId, storeName, map);
        }
        return page;
    }

    private synchronized Page newPage(int pageId, SimpleString storeName, ConcurrentMap<Integer, Page> map) throws Exception {
        Page page = (Page)map.get(pageId);
        if (page == null) {
            page = this.pageManager.getPageStore(storeName).createPage(pageId);
            page.open();
            map.put(pageId, page);
        }
        return page;
    }

    private Journal getJournal(byte journalID) {
        return this.journals[journalID];
    }

    public synchronized void setQuorumManager(QuorumManager quorumManager) {
        this.quorumManager = quorumManager;
    }

    public static class JournalSyncFile {
        private FileChannel channel;
        private final File file;
        private FileOutputStream fos;

        public JournalSyncFile(JournalFile jFile) throws Exception {
            SequentialFile seqFile = jFile.getFile();
            this.file = seqFile.getJavaFile();
            seqFile.close();
        }

        synchronized FileChannel getChannel() throws Exception {
            if (this.channel == null) {
                this.fos = new FileOutputStream(this.file);
                this.channel = this.fos.getChannel();
            }
            return this.channel;
        }

        synchronized void close() throws IOException {
            if (this.fos != null) {
                this.fos.close();
            }
            if (this.channel != null) {
                this.channel.close();
            }
        }

        public String toString() {
            return "JournalSyncFile(file=" + this.file.getAbsolutePath() + ")";
        }
    }
}

