package com.hazelcast.jet.impl.connector;

import com.hazelcast.cache.journal.EventJournalCacheEvent;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.HazelcastClientProxy;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.Partition;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.journal.EventJournalInitialSubscriberState;
import com.hazelcast.journal.EventJournalReader;
import com.hazelcast.map.journal.EventJournalMapEvent;
import com.hazelcast.nio.Address;
import com.hazelcast.projection.Projection;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.util.function.Predicate;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamEventJournalP.class */
public final class StreamEventJournalP<E, T> extends AbstractProcessor {
    private static final int MAX_FETCH_SIZE = 128;
    private final EventJournalReader<E> eventJournalReader;
    private final Set<Integer> assignedPartitions;
    private final SerializablePredicate<E> predicate;
    private final Projection<E, T> projection;
    private final boolean startFromNewest;
    private final boolean isRemoteReader;
    private Traverser<T> eventTraverser;
    private Traverser<Map.Entry<BroadcastKey<Integer>, Long>> snapshotTraverser;
    private long pendingItemOffset;
    private int pendingItemPartition;
    private Iterator<Map.Entry<Integer, ICompletableFuture<ReadResultSet<T>>>> iterator;
    private final Map<Integer, Long> emitOffsets = new HashMap();
    private final Map<Integer, Long> readOffsets = new HashMap();
    private final Map<Integer, ICompletableFuture<ReadResultSet<T>>> readFutures = new HashMap();
    private Consumer<T> updateOffsetFn = obj -> {
        this.emitOffsets.put(Integer.valueOf(this.pendingItemPartition), Long.valueOf(this.pendingItemOffset + 1));
    };

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamEventJournalP$ClusterMetaSupplier.class */
    private static class ClusterMetaSupplier<E, T> implements ProcessorMetaSupplier {
        static final long serialVersionUID = 1;
        private final SerializableClientConfig serializableConfig;
        private final DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier;
        private final DistributedPredicate<E> predicate;
        private final DistributedFunction<E, T> projection;
        private final boolean startFromNewest;
        private transient int remotePartitionCount;
        private transient Map<Address, List<Integer>> addrToPartitions;

        ClusterMetaSupplier(ClientConfig clientConfig, DistributedFunction<HazelcastInstance, EventJournalReader<E>> distributedFunction, DistributedPredicate<E> distributedPredicate, DistributedFunction<E, T> distributedFunction2, boolean z) {
            this.serializableConfig = clientConfig == null ? null : new SerializableClientConfig(clientConfig);
            this.eventJournalReaderSupplier = distributedFunction;
            this.predicate = distributedPredicate;
            this.projection = distributedFunction2;
            this.startFromNewest = z;
        }

        @Override // com.hazelcast.jet.core.ProcessorMetaSupplier
        public int preferredLocalParallelism() {
            return this.serializableConfig != null ? 1 : 2;
        }

        @Override // com.hazelcast.jet.core.ProcessorMetaSupplier
        public void init(@Nonnull ProcessorMetaSupplier.Context context) {
            if (this.serializableConfig != null) {
                initRemote();
            } else {
                initLocal(context.jetInstance().getHazelcastInstance().getPartitionService().getPartitions());
            }
        }

        private void initRemote() {
            HazelcastInstance newHazelcastClient = HazelcastClient.newHazelcastClient(this.serializableConfig.asClientConfig());
            try {
                this.remotePartitionCount = ((HazelcastClientProxy) newHazelcastClient).client.getClientPartitionService().getPartitionCount();
            } finally {
                newHazelcastClient.shutdown();
            }
        }

        private void initLocal(Set<Partition> set) {
            this.addrToPartitions = (Map) set.stream().collect(Collectors.groupingBy(partition -> {
                return partition.getOwner().getAddress();
            }, Collectors.mapping((v0) -> {
                return v0.getPartitionId();
            }, Collectors.toList())));
        }

        @Override // com.hazelcast.jet.core.ProcessorMetaSupplier
        @Nonnull
        public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> list) {
            if (this.addrToPartitions == null) {
                this.addrToPartitions = (Map) IntStream.range(0, this.remotePartitionCount).boxed().collect(Collectors.groupingBy(num -> {
                    return (Address) list.get(num.intValue() % list.size());
                }));
            }
            return address -> {
                return new ClusterProcessorSupplier(this.addrToPartitions.get(address), this.serializableConfig, this.eventJournalReaderSupplier, this.predicate, this.projection, this.startFromNewest);
            };
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamEventJournalP$ClusterProcessorSupplier.class */
    private static class ClusterProcessorSupplier<E, T> implements ProcessorSupplier {
        static final long serialVersionUID = 1;
        private final List<Integer> ownedPartitions;
        private final SerializableClientConfig serializableClientConfig;
        private final DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier;
        private final DistributedPredicate<E> predicate;
        private final DistributedFunction<E, T> projection;
        private final boolean startFromNewest;
        private transient HazelcastInstance client;
        private transient EventJournalReader<E> eventJournalReader;

        ClusterProcessorSupplier(List<Integer> list, SerializableClientConfig serializableClientConfig, DistributedFunction<HazelcastInstance, EventJournalReader<E>> distributedFunction, DistributedPredicate<E> distributedPredicate, DistributedFunction<E, T> distributedFunction2, boolean z) {
            this.ownedPartitions = list;
            this.serializableClientConfig = serializableClientConfig;
            this.eventJournalReaderSupplier = distributedFunction;
            this.predicate = distributedPredicate;
            this.projection = distributedFunction2;
            this.startFromNewest = z;
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        public void init(@Nonnull ProcessorSupplier.Context context) {
            HazelcastInstance hazelcastInstance = context.jetInstance().getHazelcastInstance();
            if (this.serializableClientConfig != null) {
                this.client = HazelcastClient.newHazelcastClient(this.serializableClientConfig.asClientConfig());
                hazelcastInstance = this.client;
            }
            this.eventJournalReader = this.eventJournalReaderSupplier.apply(hazelcastInstance);
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        public void complete(Throwable th) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        @Nonnull
        public List<Processor> get(int i) {
            return (List) Util.processorToPartitions(i, this.ownedPartitions).values().stream().map(this::processorForPartitions).collect(Collectors.toList());
        }

        private Processor processorForPartitions(List<Integer> list) {
            if (list.isEmpty()) {
                return Processors.noopP().get();
            }
            return new StreamEventJournalP(this.eventJournalReader, list, this.predicate, this.projection, this.startFromNewest, this.client != null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamEventJournalP$SerializablePredicate.class */
    public interface SerializablePredicate<E> extends Predicate<E>, Serializable {
    }

    StreamEventJournalP(EventJournalReader<E> eventJournalReader, List<Integer> list, DistributedPredicate<E> distributedPredicate, DistributedFunction<E, T> distributedFunction, boolean z, boolean z2) {
        SerializablePredicate<E> serializablePredicate;
        this.eventJournalReader = eventJournalReader;
        this.assignedPartitions = new HashSet(list);
        if (distributedPredicate == null) {
            serializablePredicate = null;
        } else {
            distributedPredicate.getClass();
            serializablePredicate = distributedPredicate::test;
        }
        this.predicate = serializablePredicate;
        this.projection = distributedFunction == null ? null : toProjection(distributedFunction);
        this.startFromNewest = z;
        this.isRemoteReader = z2;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void init(@Nonnull Processor.Context context) throws Exception {
        ((Map) this.assignedPartitions.stream().map(num -> {
            return com.hazelcast.jet.Util.entry(num, this.eventJournalReader.subscribeToEventJournal(num.intValue()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).forEach((num2, iCompletableFuture) -> {
            Util.uncheckRun(() -> {
                this.readOffsets.put(num2, Long.valueOf(getSequence((EventJournalInitialSubscriberState) iCompletableFuture.get())));
            });
        });
        this.emitOffsets.putAll(this.readOffsets);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        Traverser<T> nextTraverser;
        if (this.readFutures.isEmpty()) {
            initialRead();
        }
        if (this.eventTraverser == null && (nextTraverser = nextTraverser()) != null) {
            this.eventTraverser = nextTraverser.onFirstNull(() -> {
                this.eventTraverser = null;
            });
        }
        if (this.eventTraverser == null) {
            return false;
        }
        emitFromTraverser(this.eventTraverser, this.updateOffsetFn);
        return false;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean saveToSnapshot() {
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.emitOffsets.entrySet()).map(entry -> {
                return com.hazelcast.jet.Util.entry(BroadcastKey.broadcastKey(entry.getKey()), entry.getValue());
            }).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        boolean emitFromTraverserToSnapshot = emitFromTraverserToSnapshot(this.snapshotTraverser);
        if (emitFromTraverserToSnapshot) {
            LoggingUtil.logFinest(getLogger(), "Saved snapshot. Offsets: %s", this.emitOffsets);
        }
        return emitFromTraverserToSnapshot;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        int intValue = ((Integer) ((BroadcastKey) obj).key()).intValue();
        long longValue = ((Long) obj2).longValue();
        if (this.assignedPartitions.contains(Integer.valueOf(intValue))) {
            this.readOffsets.put(Integer.valueOf(intValue), Long.valueOf(longValue));
            this.emitOffsets.put(Integer.valueOf(intValue), Long.valueOf(longValue));
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean finishSnapshotRestore() {
        LoggingUtil.logFinest(getLogger(), "Restored snapshot. Offsets: %s", this.readOffsets);
        return true;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor, com.hazelcast.jet.core.Processor
    public boolean isCooperative() {
        return false;
    }

    private void initialRead() {
        this.readOffsets.forEach((num, l) -> {
            this.readFutures.put(num, readFromJournal(num.intValue(), l.longValue()));
        });
        this.iterator = this.readFutures.entrySet().iterator();
    }

    private long getSequence(EventJournalInitialSubscriberState eventJournalInitialSubscriberState) {
        return this.startFromNewest ? eventJournalInitialSubscriberState.getNewestSequence() + 1 : eventJournalInitialSubscriberState.getOldestSequence();
    }

    private Traverser<T> nextTraverser() {
        ReadResultSet<T> nextResultSet = nextResultSet();
        if (nextResultSet == null) {
            return null;
        }
        return peekIndex(Traversers.traverseIterable(nextResultSet), i -> {
            this.pendingItemOffset = nextResultSet.getSequence(i);
        });
    }

    private ReadResultSet<T> nextResultSet() {
        while (this.iterator.hasNext()) {
            Map.Entry<Integer, ICompletableFuture<ReadResultSet<T>>> next = this.iterator.next();
            int intValue = next.getKey().intValue();
            if (next.getValue().isDone()) {
                ReadResultSet<T> resultSet = toResultSet(intValue, next.getValue());
                if (resultSet != null && resultSet.size() != 0) {
                    this.pendingItemPartition = intValue;
                    next.setValue(readFromJournal(intValue, this.readOffsets.merge(Integer.valueOf(intValue), Long.valueOf(resultSet.readCount()), (v0, v1) -> {
                        return Long.sum(v0, v1);
                    }).longValue()));
                    return resultSet;
                }
                next.setValue(readFromJournal(intValue, this.readOffsets.get(Integer.valueOf(intValue)).longValue()));
            }
        }
        this.iterator = this.readFutures.entrySet().iterator();
        return null;
    }

    private ReadResultSet<T> toResultSet(int i, ICompletableFuture<ReadResultSet<T>> iCompletableFuture) {
        try {
            return iCompletableFuture.get();
        } catch (InterruptedException e) {
            throw ExceptionUtil.rethrow(e);
        } catch (ExecutionException e2) {
            Throwable peel = ExceptionUtil.peel(e2);
            if ((peel instanceof HazelcastInstanceNotActiveException) && !this.isRemoteReader) {
                return null;
            }
            if (!(peel instanceof StaleSequenceException)) {
                throw ExceptionUtil.rethrow(peel);
            }
            long headSeq = ((StaleSequenceException) e2.getCause()).getHeadSeq();
            long longValue = this.readOffsets.put(Integer.valueOf(i), Long.valueOf(headSeq)).longValue();
            this.emitOffsets.put(Integer.valueOf(i), Long.valueOf(headSeq));
            getLogger().warning("Events lost for partition " + i + " due to journal overflow when reading from event journal. Increase journal size to avoid this error. Requested was: " + longValue + ", current head is: " + headSeq);
            return null;
        }
    }

    private ICompletableFuture<ReadResultSet<T>> readFromJournal(int i, long j) {
        LoggingUtil.logFine(getLogger(), "Reading from partition %s and offset %s", Integer.valueOf(i), Long.valueOf(j));
        return this.eventJournalReader.readFromEventJournal(j, 1, 128, i, this.predicate, this.projection);
    }

    private static <T> Traverser<T> peekIndex(Traverser<T> traverser, IntConsumer intConsumer) {
        int[] iArr = {0};
        return () -> {
            Object next = traverser.next();
            if (next != null) {
                int i = iArr[0];
                iArr[0] = i + 1;
                intConsumer.accept(i);
            }
            return next;
        };
    }

    private static <E, T> Projection<E, T> toProjection(final Function<E, T> function) {
        return new Projection<E, T>() { // from class: com.hazelcast.jet.impl.connector.StreamEventJournalP.1
            @Override // com.hazelcast.projection.Projection
            public T transform(E e) {
                return (T) function.apply(e);
            }
        };
    }

    public static <K, V, T> ProcessorMetaSupplier streamMapP(String str, DistributedPredicate<EventJournalMapEvent<K, V>> distributedPredicate, DistributedFunction<EventJournalMapEvent<K, V>, T> distributedFunction, boolean z) {
        return new ClusterMetaSupplier(null, hazelcastInstance -> {
            return (EventJournalReader) hazelcastInstance.getMap(str);
        }, distributedPredicate, distributedFunction, z);
    }

    public static <K, V, T> ProcessorMetaSupplier streamRemoteMapP(String str, ClientConfig clientConfig, DistributedPredicate<EventJournalMapEvent<K, V>> distributedPredicate, DistributedFunction<EventJournalMapEvent<K, V>, T> distributedFunction, boolean z) {
        return new ClusterMetaSupplier(clientConfig, hazelcastInstance -> {
            return (EventJournalReader) hazelcastInstance.getMap(str);
        }, distributedPredicate, distributedFunction, z);
    }

    public static <K, V, T> ProcessorMetaSupplier streamCacheP(String str, DistributedPredicate<EventJournalCacheEvent<K, V>> distributedPredicate, DistributedFunction<EventJournalCacheEvent<K, V>, T> distributedFunction, boolean z) {
        return new ClusterMetaSupplier(null, hazelcastInstance -> {
            return (EventJournalReader) hazelcastInstance.getCacheManager().getCache(str);
        }, distributedPredicate, distributedFunction, z);
    }

    public static <K, V, T> ProcessorMetaSupplier streamRemoteCacheP(String str, ClientConfig clientConfig, DistributedPredicate<EventJournalCacheEvent<K, V>> distributedPredicate, DistributedFunction<EventJournalCacheEvent<K, V>, T> distributedFunction, boolean z) {
        return new ClusterMetaSupplier(clientConfig, hazelcastInstance -> {
            return (EventJournalReader) hazelcastInstance.getCacheManager().getCache(str);
        }, distributedPredicate, distributedFunction, z);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2143512694:
                if (implMethodName.equals("lambda$streamRemoteMapP$5b046214$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1364593222:
                if (implMethodName.equals("lambda$streamRemoteCacheP$6c7c6ed6$1")) {
                    z = true;
                    break;
                }
                break;
            case -689282880:
                if (implMethodName.equals("lambda$streamMapP$e7131c71$1")) {
                    z = 4;
                    break;
                }
                break;
            case 3556498:
                if (implMethodName.equals("test")) {
                    z = 2;
                    break;
                }
                break;
            case 1571012306:
                if (implMethodName.equals("lambda$streamCacheP$ffff98f3$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/DistributedFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalP") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lcom/hazelcast/core/HazelcastInstance;)Lcom/hazelcast/journal/EventJournalReader;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return hazelcastInstance -> {
                        return (EventJournalReader) hazelcastInstance.getCacheManager().getCache(str);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/DistributedFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalP") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lcom/hazelcast/core/HazelcastInstance;)Lcom/hazelcast/journal/EventJournalReader;")) {
                    String str2 = (String) serializedLambda.getCapturedArg(0);
                    return hazelcastInstance2 -> {
                        return (EventJournalReader) hazelcastInstance2.getCacheManager().getCache(str2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalP$SerializablePredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("java/util/function/Predicate") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    DistributedPredicate distributedPredicate = (DistributedPredicate) serializedLambda.getCapturedArg(0);
                    return distributedPredicate::test;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/DistributedFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalP") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lcom/hazelcast/core/HazelcastInstance;)Lcom/hazelcast/journal/EventJournalReader;")) {
                    String str3 = (String) serializedLambda.getCapturedArg(0);
                    return hazelcastInstance3 -> {
                        return (EventJournalReader) hazelcastInstance3.getMap(str3);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/DistributedFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalP") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lcom/hazelcast/core/HazelcastInstance;)Lcom/hazelcast/journal/EventJournalReader;")) {
                    String str4 = (String) serializedLambda.getCapturedArg(0);
                    return hazelcastInstance4 -> {
                        return (EventJournalReader) hazelcastInstance4.getMap(str4);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
