package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.function.BiConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.impl.JetEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:com/hazelcast/jet/impl/connector/ConvenientSourceP.class */
public class ConvenientSourceP<C, T, S> extends AbstractProcessor {
    private final Function<? super Processor.Context, ? extends C> createFn;
    private final BiConsumer<? super C, ? super SourceBufferConsumerSide<?>> fillBufferFn;
    private final FunctionEx<? super C, ? extends S> createSnapshotFn;
    private final BiConsumerEx<? super C, ? super List<S>> restoreSnapshotFn;
    private final Consumer<? super C> destroyFn;
    private final SourceBufferConsumerSide<?> buffer;
    private final EventTimeMapper<T> eventTimeMapper;
    private BroadcastKey<Integer> snapshotKey;
    private boolean initialized;
    private C ctx;
    private Traverser<?> traverser;
    private S pendingState;
    private List<S> restoredStates;

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/ConvenientSourceP$SourceBufferConsumerSide.class */
    public interface SourceBufferConsumerSide<T> {
        Traverser<T> traverse();

        boolean isEmpty();

        boolean isClosed();
    }

    public ConvenientSourceP(@Nonnull Function<? super Processor.Context, ? extends C> function, @Nonnull BiConsumer<? super C, ? super SourceBufferConsumerSide<?>> biConsumer, @Nonnull FunctionEx<? super C, ? extends S> functionEx, @Nonnull BiConsumerEx<? super C, ? super List<S>> biConsumerEx, @Nonnull Consumer<? super C> consumer, @Nonnull SourceBufferConsumerSide<?> sourceBufferConsumerSide, @Nullable EventTimePolicy<? super T> eventTimePolicy) {
        this.createFn = function;
        this.fillBufferFn = biConsumer;
        this.createSnapshotFn = functionEx;
        this.restoreSnapshotFn = biConsumerEx;
        this.destroyFn = consumer;
        this.buffer = sourceBufferConsumerSide;
        if (eventTimePolicy == null) {
            this.eventTimeMapper = null;
        } else {
            this.eventTimeMapper = new EventTimeMapper<>(eventTimePolicy);
            this.eventTimeMapper.addPartitions(1);
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean isCooperative() {
        return false;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void init(@Nonnull Processor.Context context) {
        this.ctx = this.createFn.apply(context);
        this.snapshotKey = BroadcastKey.broadcastKey(Integer.valueOf(context.globalProcessorIndex()));
        this.initialized = true;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        if (this.traverser == null) {
            this.fillBufferFn.accept(this.ctx, this.buffer);
            this.traverser = this.eventTimeMapper == null ? this.buffer.traverse() : this.buffer.isEmpty() ? this.eventTimeMapper.flatMapIdle() : this.buffer.traverse().flatMap(obj -> {
                JetEvent jetEvent = (JetEvent) obj;
                return this.eventTimeMapper.flatMapEvent(jetEvent.payload(), 0, jetEvent.timestamp());
            });
        }
        boolean emitFromTraverser = emitFromTraverser(this.traverser);
        if (emitFromTraverser) {
            this.traverser = null;
        }
        return emitFromTraverser && this.buffer.isClosed();
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean saveToSnapshot() {
        if (this.traverser != null && !emitFromTraverser(this.traverser)) {
            return false;
        }
        if (this.buffer.isClosed()) {
            return true;
        }
        this.traverser = null;
        if (this.pendingState == null) {
            this.pendingState = this.createSnapshotFn.apply(this.ctx);
        }
        if (this.pendingState != null && !tryEmitToSnapshot(this.snapshotKey, this.pendingState)) {
            return false;
        }
        this.pendingState = null;
        return true;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (this.restoredStates == null) {
            this.restoredStates = new ArrayList();
        }
        this.restoredStates.add(obj2);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean finishSnapshotRestore() {
        if (this.restoredStates != null) {
            this.restoreSnapshotFn.accept(this.ctx, this.restoredStates);
        }
        this.restoredStates = null;
        return true;
    }

    @Override // com.hazelcast.jet.core.Processor
    public void close() {
        if (this.initialized) {
            this.destroyFn.accept(this.ctx);
        }
    }
}
