/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.map;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.openhft.chronicle.hash.replication.ReplicationHub;
import net.openhft.chronicle.hash.replication.TcpTransportAndNetworkConfig;
import net.openhft.chronicle.hash.replication.UdpTransportConfig;
import net.openhft.chronicle.map.ChronicleMap;
import net.openhft.chronicle.map.ChronicleMapBuilder;
import net.openhft.chronicle.map.Replica;
import net.openhft.chronicle.map.Replicator;
import net.openhft.chronicle.map.TcpReplicator;
import net.openhft.chronicle.map.UdpReplicator;
import net.openhft.chronicle.map.VanillaChronicleMap;
import net.openhft.lang.collection.DirectBitSet;
import net.openhft.lang.collection.SingleThreadedDirectBitSet;
import net.openhft.lang.io.ByteBufferBytes;
import net.openhft.lang.io.Bytes;
import net.openhft.lang.io.RandomDataInput;
import net.openhft.lang.threadlocal.ThreadLocalCopies;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ChannelProvider
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((String)ChannelProvider.class.getName());
    static final Map<ReplicationHub, ChannelProvider> implMapping = new IdentityHashMap<ReplicationHub, ChannelProvider>();
    private static final byte BOOTSTRAP_MESSAGE = 66;
    final Replica.EntryExternalizable asEntryExternalizable = new Replica.EntryExternalizable(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int sizeOfEntry(@NotNull Bytes entry, int chronicleChannel) {
            ChannelProvider.this.channelDataLock.readLock().lock();
            try {
                int n = ChannelProvider.this.channelEntryExternalizables[chronicleChannel].sizeOfEntry(entry, chronicleChannel);
                return n;
            }
            finally {
                ChannelProvider.this.channelDataLock.readLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean identifierCheck(@NotNull Bytes entry, int chronicleChannel) {
            ChannelProvider.this.channelDataLock.readLock().lock();
            try {
                boolean bl = ChannelProvider.this.channelEntryExternalizables[chronicleChannel].identifierCheck(entry, chronicleChannel);
                return bl;
            }
            finally {
                ChannelProvider.this.channelDataLock.readLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void writeExternalEntry(@NotNull Bytes entry, @NotNull Bytes destination, int chronicleChannel, long bootstrapTime) {
            ChannelProvider.this.channelDataLock.readLock().lock();
            try {
                destination.writeStopBit((long)chronicleChannel);
                ChannelProvider.this.channelEntryExternalizables[chronicleChannel].writeExternalEntry(entry, destination, chronicleChannel, bootstrapTime);
            }
            finally {
                ChannelProvider.this.channelDataLock.readLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void readExternalEntry(@NotNull ThreadLocalCopies copies, @NotNull VanillaChronicleMap.SegmentState segmentState, @NotNull Bytes source) {
            ChannelProvider.this.channelDataLock.readLock().lock();
            try {
                int chronicleId = (int)source.readStopBit();
                if (chronicleId < ChannelProvider.this.chronicleChannels.length) {
                    if (ChannelProvider.this.channelEntryExternalizables[chronicleId] != null) {
                        ChannelProvider.this.channelEntryExternalizables[chronicleId].readExternalEntry(copies, segmentState, source);
                    }
                } else {
                    LOG.info("skipped entry with chronicleId=" + chronicleId + ", ");
                }
            }
            finally {
                ChannelProvider.this.channelDataLock.readLock().unlock();
            }
        }
    };
    private final byte localIdentifier;
    final Replica asReplica = new Replica(){

        @Override
        public byte identifier() {
            return ChannelProvider.this.localIdentifier;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Replica.ModificationIterator acquireModificationIterator(final byte remoteIdentifier) {
            ChannelProvider.this.channelDataLock.writeLock().lock();
            try {
                Replica.ModificationIterator result = (Replica.ModificationIterator)ChannelProvider.this.modificationIterator.get(remoteIdentifier);
                if (result != null) {
                    Replica.ModificationIterator modificationIterator = result;
                    return modificationIterator;
                }
                Replica.ModificationIterator result0 = new Replica.ModificationIterator(){
                    volatile Replica.ModificationNotifier notifier0;

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public boolean hasNext() {
                        ChannelProvider.this.channelDataLock.readLock().lock();
                        try {
                            int len = ChannelProvider.this.chronicleChannelList.size();
                            for (int i = 0; i < len; ++i) {
                                Replica.ModificationIterator modificationIterator = ((Replica)ChannelProvider.this.chronicleChannelList.get(i)).acquireModificationIterator(remoteIdentifier);
                                if (!modificationIterator.hasNext()) continue;
                                boolean bl = true;
                                return bl;
                            }
                            boolean bl = false;
                            return bl;
                        }
                        finally {
                            ChannelProvider.this.channelDataLock.readLock().unlock();
                        }
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public boolean nextEntry(@NotNull Replica.EntryCallback callback, int na) throws InterruptedException {
                        ChannelProvider.this.channelDataLock.readLock().lock();
                        try {
                            int len = ChannelProvider.this.chronicleChannelList.size();
                            for (int i = 0; i < len; ++i) {
                                Replica chronicleChannel = (Replica)ChannelProvider.this.chronicleChannelList.get(i);
                                Replica.ModificationIterator modificationIterator = chronicleChannel.acquireModificationIterator(remoteIdentifier);
                                if (!modificationIterator.nextEntry(callback, (Integer)ChannelProvider.this.chronicleChannelIds.get(i))) continue;
                                boolean bl = true;
                                return bl;
                            }
                            boolean bl = false;
                            return bl;
                        }
                        finally {
                            ChannelProvider.this.channelDataLock.readLock().unlock();
                        }
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void dirtyEntries(long fromTimeStamp) {
                        ChannelProvider.this.channelDataLock.readLock().lock();
                        try {
                            int len = ChannelProvider.this.chronicleChannelList.size();
                            for (int i = 0; i < len; ++i) {
                                try {
                                    Replica.ModificationIterator mi = ((Replica)ChannelProvider.this.chronicleChannelList.get(i)).acquireModificationIterator(remoteIdentifier);
                                    mi.dirtyEntries(fromTimeStamp);
                                }
                                catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                                this.notifier0.onChange();
                            }
                        }
                        finally {
                            ChannelProvider.this.channelDataLock.readLock().unlock();
                        }
                    }

                    @Override
                    public void setModificationNotifier(@NotNull Replica.ModificationNotifier modificationNotifier) {
                        int len = ChannelProvider.this.chronicleChannelList.size();
                        for (int i = 0; i < len; ++i) {
                            Replica chronicleChannel = (Replica)ChannelProvider.this.chronicleChannelList.get(i);
                            chronicleChannel.acquireModificationIterator(remoteIdentifier).setModificationNotifier(modificationNotifier);
                        }
                        this.notifier0 = modificationNotifier;
                    }
                };
                ChannelProvider.this.modificationIterator.set(remoteIdentifier, result0);
                Replica.ModificationIterator modificationIterator = result0;
                return modificationIterator;
            }
            finally {
                ChannelProvider.this.channelDataLock.writeLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public long lastModificationTime(byte remoteIdentifier) {
            ChannelProvider.this.channelDataLock.readLock().lock();
            try {
                long t = 0L;
                int len = ChannelProvider.this.chronicleChannelList.size();
                for (int i = 1; i < len; ++i) {
                    Replica channel = (Replica)ChannelProvider.this.chronicleChannelList.get(i);
                    t = t == 0L ? channel.lastModificationTime(remoteIdentifier) : Math.min(t, channel.lastModificationTime(remoteIdentifier));
                }
                long l = t;
                return l;
            }
            finally {
                ChannelProvider.this.channelDataLock.readLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void setLastModificationTime(byte identifier, long timestamp) {
            ChannelProvider.this.channelDataLock.readLock().lock();
            try {
                int len = ChannelProvider.this.chronicleChannelList.size();
                for (int i = 1; i < len; ++i) {
                    Replica channel = (Replica)ChannelProvider.this.chronicleChannelList.get(i);
                    channel.setLastModificationTime(identifier, timestamp);
                }
            }
            finally {
                ChannelProvider.this.channelDataLock.readLock().unlock();
            }
        }

        @Override
        public void close() throws IOException {
            ChannelProvider.this.close();
        }
    };
    private final ReplicationHub hub;
    private final ReadWriteLock channelDataLock = new ReentrantReadWriteLock();
    private final Replica[] chronicleChannels;
    private final List<Replica> chronicleChannelList;
    private final List<Integer> chronicleChannelIds;
    private final Replica.EntryExternalizable[] channelEntryExternalizables;
    private final AtomicReferenceArray<PayloadProvider> systemModificationIterator = new AtomicReferenceArray(128);
    private final DirectBitSet systemModificationIteratorBitSet = ChannelProvider.newBitSet(this.systemModificationIterator.length());
    private final AtomicReferenceArray<Replica.ModificationIterator> modificationIterator = new AtomicReferenceArray(128);
    private final Set<Closeable> replicators = new CopyOnWriteArraySet<Closeable>();
    private volatile boolean isClosed = false;
    private final SystemQueue systemMessageQueue;

    static synchronized ChannelProvider getProvider(ReplicationHub hub) throws IOException {
        UdpTransportConfig udpConfig;
        ChannelProvider channelProvider = implMapping.get(hub);
        if (channelProvider != null) {
            return channelProvider;
        }
        channelProvider = new ChannelProvider(hub);
        TcpTransportAndNetworkConfig tcpConfig = hub.tcpTransportAndNetwork();
        if (tcpConfig != null) {
            TcpReplicator tcpReplicator = new TcpReplicator(channelProvider.asReplica, channelProvider.asEntryExternalizable, tcpConfig, hub.remoteNodeValidator(), null, hub.connectionListener());
            channelProvider.add(tcpReplicator);
        }
        if ((udpConfig = hub.udpTransport()) != null) {
            UdpReplicator udpReplicator = new UdpReplicator(channelProvider.asReplica, channelProvider.asEntryExternalizable, udpConfig);
            channelProvider.add(udpReplicator);
            if (tcpConfig == null) {
                LOG.warn("MISSING TCP REPLICATION : The UdpReplicator only attempts to read data (it does not enforce or guarantee delivery), you should usethe UdpReplicator if you have a large number of nodes, and you wishto receive the data before it becomes available on TCP/IP. Since datadelivery is not guaranteed, it is recommended that you only usethe UDP Replicator in conjunction with a TCP Replicator");
            }
        }
        implMapping.put(hub, channelProvider);
        return channelProvider;
    }

    private ChannelProvider(ReplicationHub hub) {
        this.localIdentifier = hub.identifier();
        this.hub = hub;
        this.chronicleChannels = new Replica[hub.maxNumberOfChannels()];
        this.channelEntryExternalizables = new Replica.EntryExternalizable[hub.maxNumberOfChannels()];
        this.chronicleChannelList = new ArrayList<Replica>();
        this.chronicleChannelIds = new ArrayList<Integer>();
        MessageHandler systemMessageHandler = new MessageHandler(){

            @Override
            public void onMessage(Bytes bytes) {
                byte type = bytes.readByte();
                if (type == 66) {
                    ChannelProvider.this.onBootstrapMessage(bytes);
                } else {
                    LOG.info("message of type=" + type + " was ignored.");
                }
            }
        };
        this.systemMessageQueue = new SystemQueue(this.systemModificationIteratorBitSet, this.systemModificationIterator, systemMessageHandler);
        this.add(0, this.systemMessageQueue.asReplica, this.systemMessageQueue.asEntryExternalizable);
    }

    private static DirectBitSet newBitSet(int numberOfBits) {
        ByteBufferBytes bytes = new ByteBufferBytes(ByteBuffer.wrap(new byte[(numberOfBits + 7) / 8]));
        return new SingleThreadedDirectBitSet((Bytes)bytes);
    }

    public ChronicleChannel createChannel(int channel) {
        return new ChronicleChannel(channel);
    }

    private void onBootstrapMessage(Bytes bytes) {
        byte remoteIdentifier = bytes.readByte();
        int chronicleChannel = bytes.readUnsignedShort();
        long lastModificationTime = bytes.readLong();
        if (LOG.isDebugEnabled()) {
            LOG.debug("received bootstrap message received for localIdentifier=" + this.localIdentifier + ", " + "remoteIdentifier=" + remoteIdentifier + ",chronicleChannel=" + chronicleChannel + "," + "lastModificationTime=" + lastModificationTime);
        }
        if (this.chronicleChannels[chronicleChannel] != null) {
            try {
                this.chronicleChannels[chronicleChannel].acquireModificationIterator(remoteIdentifier).dirtyEntries(lastModificationTime);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static ByteBufferBytes toBootstrapMessage(int chronicleChannel, long lastModificationTime, byte localIdentifier) {
        ByteBufferBytes writeBuffer = new ByteBufferBytes(ByteBuffer.allocate(12));
        writeBuffer.writeByte(66);
        writeBuffer.writeByte((int)localIdentifier);
        writeBuffer.writeUnsignedShort(chronicleChannel);
        writeBuffer.writeLong(lastModificationTime);
        writeBuffer.flip();
        return writeBuffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void add(int chronicleChannel, Replica replica, @NotNull Replica.EntryExternalizable entryExternalizable) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("adding chronicleChannel=" + chronicleChannel + ",entryExternalizable=" + entryExternalizable);
        }
        this.channelDataLock.writeLock().lock();
        try {
            if (this.chronicleChannels[chronicleChannel] != null) {
                throw new IllegalStateException("chronicleId=" + chronicleChannel + " is already in use.");
            }
            this.chronicleChannels[chronicleChannel] = replica;
            this.chronicleChannelList.add(replica);
            this.chronicleChannelIds.add(chronicleChannel);
            this.channelEntryExternalizables[chronicleChannel] = entryExternalizable;
            if (chronicleChannel == 0) {
                return;
            }
            int i = (int)this.systemModificationIteratorBitSet.nextSetBit(0L);
            while (i > 0) {
                byte remoteIdentifier = (byte)i;
                long lastModificationTime = replica.lastModificationTime(remoteIdentifier);
                ByteBufferBytes message = ChannelProvider.toBootstrapMessage(chronicleChannel, lastModificationTime, this.localIdentifier);
                this.systemModificationIterator.get(remoteIdentifier).addPayload((Bytes)message);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("sending bootstrap message received for localIdentifier=" + this.localIdentifier + ", " + "remoteIdentifier=" + remoteIdentifier + ",chronicleChannel=" + chronicleChannel + "," + "lastModificationTime=" + lastModificationTime);
                }
                i = (int)this.systemModificationIteratorBitSet.nextSetBit((long)(i + 1));
            }
        }
        finally {
            this.channelDataLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        if (this.isClosed) {
            return;
        }
        Class<ChannelProvider> clazz = ChannelProvider.class;
        synchronized (ChannelProvider.class) {
            this.isClosed = true;
            for (Closeable replicator : this.replicators) {
                try {
                    replicator.close();
                }
                catch (IOException e) {
                    LOG.error("", (Throwable)e);
                }
            }
            implMapping.remove(this.hub);
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return;
        }
    }

    void add(Closeable replicator) {
        this.replicators.add(replicator);
    }

    public class ChronicleChannel
    extends Replicator
    implements Closeable {
        private final int chronicleChannel;

        private ChronicleChannel(int chronicleChannel) {
            this.chronicleChannel = chronicleChannel;
        }

        public byte identifier() {
            return ChannelProvider.this.localIdentifier;
        }

        @Override
        protected Closeable applyTo(ChronicleMapBuilder builder, Replica map, Replica.EntryExternalizable entryExternalizable, ChronicleMap chronicleMap) {
            ChannelProvider.this.add(this.chronicleChannel, map, entryExternalizable);
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() throws IOException {
            ChannelProvider.this.channelDataLock.writeLock().lock();
            try {
                ChannelProvider.this.chronicleChannelList.remove(ChannelProvider.this.chronicleChannels[this.chronicleChannel]);
                ChannelProvider.this.chronicleChannelIds.remove((Object)this.chronicleChannel);
                ((ChannelProvider)ChannelProvider.this).chronicleChannels[this.chronicleChannel] = null;
                ((ChannelProvider)ChannelProvider.this).channelEntryExternalizables[this.chronicleChannel] = null;
                if (ChannelProvider.this.chronicleChannelList.size() == 1) {
                    ChannelProvider.this.close();
                }
            }
            finally {
                ChannelProvider.this.channelDataLock.writeLock().unlock();
            }
        }
    }

    class SystemQueue {
        final Replica asReplica = new Replica(){

            @Override
            public byte identifier() {
                throw new UnsupportedOperationException();
            }

            @Override
            public Replica.ModificationIterator acquireModificationIterator(byte remoteIdentifier) {
                Replica.ModificationIterator result = (Replica.ModificationIterator)SystemQueue.this.systemModificationIterator.get(remoteIdentifier);
                if (result != null) {
                    return result;
                }
                PayloadProvider iterator = new PayloadProvider(){
                    final Queue<Bytes> payloads = new LinkedTransferQueue<Bytes>();
                    Replica.ModificationNotifier modificationNotifier0;

                    @Override
                    public boolean hasNext() {
                        return this.payloads.peek() != null;
                    }

                    @Override
                    public boolean nextEntry(@NotNull Replica.EntryCallback callback, int na) {
                        Bytes bytes = this.payloads.poll();
                        if (bytes == null) {
                            return false;
                        }
                        callback.onEntry(bytes, 0, System.currentTimeMillis());
                        return true;
                    }

                    @Override
                    public void dirtyEntries(long fromTimeStamp) {
                    }

                    @Override
                    public void setModificationNotifier(@NotNull Replica.ModificationNotifier modificationNotifier) {
                        this.modificationNotifier0 = modificationNotifier;
                    }

                    @Override
                    public void addPayload(Bytes bytes) {
                        if (bytes.remaining() == 0L) {
                            return;
                        }
                        this.payloads.add(bytes);
                        this.modificationNotifier0.onChange();
                    }
                };
                SystemQueue.this.systemModificationIterator.set(remoteIdentifier, iterator);
                SystemQueue.this.systemModificationIteratorBitSet.set((long)remoteIdentifier);
                return iterator;
            }

            @Override
            public long lastModificationTime(byte remoteIdentifier) {
                return 0L;
            }

            @Override
            public void setLastModificationTime(byte identifier, long timestamp) {
            }

            @Override
            public void close() throws IOException {
            }
        };
        final Replica.EntryExternalizable asEntryExternalizable = new Replica.EntryExternalizable(){

            @Override
            public int sizeOfEntry(@NotNull Bytes entry, int chronicleId) {
                return (int)entry.remaining();
            }

            @Override
            public boolean identifierCheck(@NotNull Bytes entry, int chronicleId) {
                return true;
            }

            @Override
            public void writeExternalEntry(@NotNull Bytes entry, @NotNull Bytes destination, int na, long bootstrapTime) {
                destination.write((RandomDataInput)entry, entry.position(), entry.remaining());
            }

            @Override
            public void readExternalEntry(@NotNull ThreadLocalCopies copies, @NotNull VanillaChronicleMap.SegmentState segmentState, @NotNull Bytes source) {
                SystemQueue.this.messageHandler.onMessage(source);
            }
        };
        private final DirectBitSet systemModificationIteratorBitSet;
        private final AtomicReferenceArray<PayloadProvider> systemModificationIterator;
        private final MessageHandler messageHandler;

        SystemQueue(DirectBitSet systemModificationIteratorBitSet, AtomicReferenceArray<PayloadProvider> systemModificationIterator, MessageHandler messageHandler) {
            this.systemModificationIteratorBitSet = systemModificationIteratorBitSet;
            this.systemModificationIterator = systemModificationIterator;
            this.messageHandler = messageHandler;
        }
    }

    private static interface PayloadProvider
    extends Replica.ModificationIterator {
        public void addPayload(Bytes var1);
    }

    private static interface MessageHandler {
        public void onMessage(Bytes var1);
    }
}

