/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.cache.journal.EventJournalCacheEvent;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.HazelcastClientProxy;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.Partition;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.impl.connector.SerializableClientConfig;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.journal.EventJournalInitialSubscriberState;
import com.hazelcast.journal.EventJournalReader;
import com.hazelcast.map.journal.EventJournalMapEvent;
import com.hazelcast.nio.Address;
import com.hazelcast.projection.Projection;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.util.function.Predicate;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

public final class StreamEventJournalP<E, T>
extends AbstractProcessor {
    private static final int MAX_FETCH_SIZE = 128;
    private final EventJournalReader<E> eventJournalReader;
    private final Set<Integer> assignedPartitions;
    private final SerializablePredicate<E> predicate;
    private final Projection<E, T> projection;
    private final boolean startFromNewest;
    private final boolean isRemoteReader;
    private final Map<Integer, Long> emitOffsets = new HashMap<Integer, Long>();
    private final Map<Integer, Long> readOffsets = new HashMap<Integer, Long>();
    private final Map<Integer, ICompletableFuture<ReadResultSet<T>>> readFutures = new HashMap<Integer, ICompletableFuture<ReadResultSet<T>>>();
    private Traverser<T> eventTraverser;
    private Traverser<Map.Entry<BroadcastKey<Integer>, Long>> snapshotTraverser;
    private long pendingItemOffset;
    private int pendingItemPartition;
    private Consumer<T> updateOffsetFn = e -> this.emitOffsets.put(this.pendingItemPartition, this.pendingItemOffset + 1L);
    private Iterator<Map.Entry<Integer, ICompletableFuture<ReadResultSet<T>>>> iterator;

    StreamEventJournalP(EventJournalReader<E> eventJournalReader, List<Integer> assignedPartitions, DistributedPredicate<E> predicateFn, DistributedFunction<E, T> projectionFn, boolean startFromNewest, boolean isRemoteReader) {
        this.eventJournalReader = eventJournalReader;
        this.assignedPartitions = new HashSet<Integer>(assignedPartitions);
        this.predicate = predicateFn == null ? null : predicateFn::test;
        this.projection = projectionFn == null ? null : StreamEventJournalP.toProjection(projectionFn);
        this.startFromNewest = startFromNewest;
        this.isRemoteReader = isRemoteReader;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        Map<Integer, ICompletableFuture> futures = this.assignedPartitions.stream().map(partition -> Util.entry(partition, this.eventJournalReader.subscribeToEventJournal((int)partition))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        futures.forEach((partition, future) -> com.hazelcast.jet.impl.util.Util.uncheckRun(() -> this.readOffsets.put((Integer)partition, this.getSequence((EventJournalInitialSubscriberState)future.get()))));
        this.emitOffsets.putAll(this.readOffsets);
    }

    @Override
    public boolean complete() {
        Traverser<T> t;
        if (this.readFutures.isEmpty()) {
            this.initialRead();
        }
        if (this.eventTraverser == null && (t = this.nextTraverser()) != null) {
            this.eventTraverser = t.onFirstNull(() -> {
                this.eventTraverser = null;
            });
        }
        if (this.eventTraverser != null) {
            this.emitFromTraverser(this.eventTraverser, this.updateOffsetFn);
        }
        return false;
    }

    @Override
    public boolean saveToSnapshot() {
        boolean done;
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.emitOffsets.entrySet()).map(e -> Util.entry(BroadcastKey.broadcastKey(e.getKey()), e.getValue())).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        if (done = this.emitFromTraverserToSnapshot(this.snapshotTraverser)) {
            LoggingUtil.logFinest(this.getLogger(), "Saved snapshot. Offsets: %s", this.emitOffsets);
        }
        return done;
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        int partition = (Integer)((BroadcastKey)key).key();
        long offset = (Long)value;
        if (this.assignedPartitions.contains(partition)) {
            this.readOffsets.put(partition, offset);
            this.emitOffsets.put(partition, offset);
        }
    }

    @Override
    public boolean finishSnapshotRestore() {
        LoggingUtil.logFinest(this.getLogger(), "Restored snapshot. Offsets: %s", this.readOffsets);
        return true;
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    private void initialRead() {
        this.readOffsets.forEach((partition, offset) -> this.readFutures.put((Integer)partition, this.readFromJournal((int)partition, (long)offset)));
        this.iterator = this.readFutures.entrySet().iterator();
    }

    private long getSequence(EventJournalInitialSubscriberState state) {
        return this.startFromNewest ? state.getNewestSequence() + 1L : state.getOldestSequence();
    }

    private Traverser<T> nextTraverser() {
        ReadResultSet<T> resultSet = this.nextResultSet();
        if (resultSet == null) {
            return null;
        }
        Traverser<T> traverser = Traversers.traverseIterable(resultSet);
        return StreamEventJournalP.peekIndex(traverser, i -> {
            this.pendingItemOffset = resultSet.getSequence(i);
        });
    }

    private ReadResultSet<T> nextResultSet() {
        while (this.iterator.hasNext()) {
            Map.Entry<Integer, ICompletableFuture<ReadResultSet<T>>> entry = this.iterator.next();
            int partition = entry.getKey();
            if (!entry.getValue().isDone()) continue;
            ReadResultSet<T> resultSet = this.toResultSet(partition, entry.getValue());
            if (resultSet == null || resultSet.size() == 0) {
                entry.setValue(this.readFromJournal(partition, this.readOffsets.get(partition)));
                continue;
            }
            this.pendingItemPartition = partition;
            long newOffset = this.readOffsets.merge(partition, Long.valueOf(resultSet.readCount()), Long::sum);
            entry.setValue(this.readFromJournal(partition, newOffset));
            return resultSet;
        }
        this.iterator = this.readFutures.entrySet().iterator();
        return null;
    }

    private ReadResultSet<T> toResultSet(int partition, ICompletableFuture<ReadResultSet<T>> future) {
        try {
            return (ReadResultSet)future.get();
        }
        catch (ExecutionException e) {
            Throwable ex = ExceptionUtil.peel(e);
            if (ex instanceof HazelcastInstanceNotActiveException && !this.isRemoteReader) {
                return null;
            }
            if (ex instanceof StaleSequenceException) {
                long headSeq = ((StaleSequenceException)e.getCause()).getHeadSeq();
                long oldOffset = this.readOffsets.put(partition, headSeq);
                this.emitOffsets.put(partition, headSeq);
                this.getLogger().warning("Events lost for partition " + partition + " due to journal overflow " + "when reading from event journal. Increase journal size to avoid this error. " + "Requested was: " + oldOffset + ", current head is: " + headSeq);
                return null;
            }
            throw ExceptionUtil.rethrow(ex);
        }
        catch (InterruptedException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    private ICompletableFuture<ReadResultSet<T>> readFromJournal(int partition, long offset) {
        LoggingUtil.logFine(this.getLogger(), "Reading from partition %s and offset %s", partition, offset);
        return this.eventJournalReader.readFromEventJournal(offset, 1, 128, partition, this.predicate, this.projection);
    }

    private static <T> Traverser<T> peekIndex(Traverser<T> traverser, IntConsumer action) {
        int[] count = new int[]{0};
        return () -> {
            Object t = traverser.next();
            if (t != null) {
                int n = count[0];
                count[0] = n + 1;
                action.accept(n);
            }
            return t;
        };
    }

    private static <E, T> Projection<E, T> toProjection(final Function<E, T> projectionFn) {
        return new Projection<E, T>(){

            @Override
            public T transform(E input) {
                return projectionFn.apply(input);
            }
        };
    }

    public static <K, V, T> ProcessorMetaSupplier streamMapP(String mapName, DistributedPredicate<EventJournalMapEvent<K, V>> predicate, DistributedFunction<EventJournalMapEvent<K, V>, T> projection, boolean startFromNewest) {
        return new ClusterMetaSupplier<EventJournalMapEvent<K, V>, T>(null, instance -> (EventJournalReader)((Object)instance.getMap(mapName)), predicate, projection, startFromNewest);
    }

    public static <K, V, T> ProcessorMetaSupplier streamRemoteMapP(String mapName, ClientConfig clientConfig, DistributedPredicate<EventJournalMapEvent<K, V>> predicate, DistributedFunction<EventJournalMapEvent<K, V>, T> projection, boolean startFromNewest) {
        return new ClusterMetaSupplier<EventJournalMapEvent<K, V>, T>(clientConfig, instance -> (EventJournalReader)((Object)instance.getMap(mapName)), predicate, projection, startFromNewest);
    }

    public static <K, V, T> ProcessorMetaSupplier streamCacheP(String cacheName, DistributedPredicate<EventJournalCacheEvent<K, V>> predicate, DistributedFunction<EventJournalCacheEvent<K, V>, T> projection, boolean startFromNewest) {
        return new ClusterMetaSupplier<EventJournalCacheEvent<K, V>, T>(null, inst -> (EventJournalReader)((Object)inst.getCacheManager().getCache(cacheName)), predicate, projection, startFromNewest);
    }

    public static <K, V, T> ProcessorMetaSupplier streamRemoteCacheP(String cacheName, ClientConfig clientConfig, DistributedPredicate<EventJournalCacheEvent<K, V>> predicate, DistributedFunction<EventJournalCacheEvent<K, V>, T> projection, boolean startFromNewest) {
        return new ClusterMetaSupplier<EventJournalCacheEvent<K, V>, T>(clientConfig, inst -> (EventJournalReader)((Object)inst.getCacheManager().getCache(cacheName)), predicate, projection, startFromNewest);
    }

    static interface SerializablePredicate<E>
    extends Predicate<E>,
    Serializable {
    }

    private static class ClusterProcessorSupplier<E, T>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final List<Integer> ownedPartitions;
        private final SerializableClientConfig serializableClientConfig;
        private final DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier;
        private final DistributedPredicate<E> predicate;
        private final DistributedFunction<E, T> projection;
        private final boolean startFromNewest;
        private transient HazelcastInstance client;
        private transient EventJournalReader<E> eventJournalReader;

        ClusterProcessorSupplier(List<Integer> ownedPartitions, SerializableClientConfig serializableClientConfig, DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier, DistributedPredicate<E> predicate, DistributedFunction<E, T> projection, boolean startFromNewest) {
            this.ownedPartitions = ownedPartitions;
            this.serializableClientConfig = serializableClientConfig;
            this.eventJournalReaderSupplier = eventJournalReaderSupplier;
            this.predicate = predicate;
            this.projection = projection;
            this.startFromNewest = startFromNewest;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            HazelcastInstance instance = context.jetInstance().getHazelcastInstance();
            if (this.serializableClientConfig != null) {
                instance = this.client = HazelcastClient.newHazelcastClient(this.serializableClientConfig.asClientConfig());
            }
            this.eventJournalReader = (EventJournalReader)this.eventJournalReaderSupplier.apply(instance);
        }

        @Override
        public void complete(Throwable error) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        @Nonnull
        public List<Processor> get(int count) {
            return com.hazelcast.jet.impl.util.Util.processorToPartitions(count, this.ownedPartitions).values().stream().map(this::processorForPartitions).collect(Collectors.toList());
        }

        private Processor processorForPartitions(List<Integer> partitions) {
            return partitions.isEmpty() ? (Processor)Processors.noopP().get() : new StreamEventJournalP<E, T>(this.eventJournalReader, partitions, this.predicate, this.projection, this.startFromNewest, this.client != null);
        }
    }

    private static class ClusterMetaSupplier<E, T>
    implements ProcessorMetaSupplier {
        static final long serialVersionUID = 1L;
        private final SerializableClientConfig serializableConfig;
        private final DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier;
        private final DistributedPredicate<E> predicate;
        private final DistributedFunction<E, T> projection;
        private final boolean startFromNewest;
        private transient int remotePartitionCount;
        private transient Map<Address, List<Integer>> addrToPartitions;

        ClusterMetaSupplier(ClientConfig clientConfig, DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier, DistributedPredicate<E> predicate, DistributedFunction<E, T> projection, boolean startFromNewest) {
            this.serializableConfig = clientConfig == null ? null : new SerializableClientConfig(clientConfig);
            this.eventJournalReaderSupplier = eventJournalReaderSupplier;
            this.predicate = predicate;
            this.projection = projection;
            this.startFromNewest = startFromNewest;
        }

        @Override
        public int preferredLocalParallelism() {
            return this.serializableConfig != null ? 1 : 2;
        }

        @Override
        public void init(@Nonnull ProcessorMetaSupplier.Context context) {
            if (this.serializableConfig != null) {
                this.initRemote();
            } else {
                this.initLocal(context.jetInstance().getHazelcastInstance().getPartitionService().getPartitions());
            }
        }

        private void initRemote() {
            HazelcastInstance client = HazelcastClient.newHazelcastClient(this.serializableConfig.asClientConfig());
            try {
                HazelcastClientProxy clientProxy = (HazelcastClientProxy)client;
                this.remotePartitionCount = clientProxy.client.getClientPartitionService().getPartitionCount();
            }
            finally {
                client.shutdown();
            }
        }

        private void initLocal(Set<Partition> partitions) {
            this.addrToPartitions = partitions.stream().collect(Collectors.groupingBy(p -> p.getOwner().getAddress(), Collectors.mapping(Partition::getPartitionId, Collectors.toList())));
        }

        @Override
        @Nonnull
        public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
            if (this.addrToPartitions == null) {
                this.addrToPartitions = IntStream.range(0, this.remotePartitionCount).boxed().collect(Collectors.groupingBy(partition -> (Address)addresses.get(partition % addresses.size())));
            }
            return address -> new ClusterProcessorSupplier<E, T>(this.addrToPartitions.get(address), this.serializableConfig, this.eventJournalReaderSupplier, this.predicate, this.projection, this.startFromNewest);
        }
    }
}

