/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.pipeline;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.function.BiConsumerEx;
import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.impl.pipeline.transform.BatchSourceTransform;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.util.Preconditions;
import java.util.List;
import javax.annotation.Nonnull;

public final class SourceBuilder<C> {
    private final String name;
    private final FunctionEx<? super Processor.Context, ? extends C> createFn;
    private FunctionEx<? super C, Object> createSnapshotFn = ctx -> null;
    private BiConsumerEx<? super C, ? super List<Object>> restoreSnapshotFn = (ctx, states) -> {};
    private ConsumerEx<? super C> destroyFn = ConsumerEx.noop();
    private int preferredLocalParallelism;

    private SourceBuilder(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context, ? extends C> createFn) {
        this.name = name;
        this.createFn = createFn;
    }

    @Nonnull
    public static <C> Batch<Void> batch(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context, ? extends C> createFn) {
        SourceBuilder<C> sourceBuilder = new SourceBuilder<C>(name, createFn);
        sourceBuilder.getClass();
        return sourceBuilder.new Batch<Void>();
    }

    @Nonnull
    public static <C> Stream<Void> stream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context, ? extends C> createFn) {
        SourceBuilder<C> sourceBuilder = new SourceBuilder<C>(name, createFn);
        sourceBuilder.getClass();
        return sourceBuilder.new Stream<Void>();
    }

    @Nonnull
    public static <C> TimestampedStream<Void> timestampedStream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context, ? extends C> createFn) {
        SourceBuilder<C> sourceBuilder = new SourceBuilder<C>(name, createFn);
        sourceBuilder.getClass();
        return sourceBuilder.new TimestampedStream<Void>();
    }

    public final class FaultTolerant<B, S> {
        private final B parentBuilder;

        private FaultTolerant(B parentBuilder, FunctionEx<? super C, ? extends S> createSnapshotFn) {
            this.parentBuilder = parentBuilder;
            SourceBuilder.this.createSnapshotFn = createSnapshotFn;
        }

        @Nonnull
        public B restoreSnapshotFn(@Nonnull BiConsumerEx<? super C, ? super List<S>> restoreSnapshotFn) {
            SourceBuilder.this.restoreSnapshotFn = restoreSnapshotFn;
            return this.parentBuilder;
        }
    }

    public final class TimestampedStream<T>
    extends Base<T> {
        private BiConsumerEx<? super C, ? super TimestampedSourceBuffer<T>> fillBufferFn;

        private TimestampedStream() {
        }

        @Nonnull
        public <T_NEW> TimestampedStream<T_NEW> fillBufferFn(@Nonnull BiConsumerEx<? super C, ? super TimestampedSourceBuffer<T_NEW>> fillBufferFn) {
            TimestampedStream newThis = this;
            newThis.fillBufferFn = fillBufferFn;
            return newThis;
        }

        @Override
        @Nonnull
        public TimestampedStream<T> destroyFn(@Nonnull ConsumerEx<? super C> pDestroyFn) {
            return (TimestampedStream)super.destroyFn(pDestroyFn);
        }

        @Override
        @Nonnull
        public TimestampedStream<T> distributed(int preferredLocalParallelism) {
            return (TimestampedStream)super.distributed(preferredLocalParallelism);
        }

        @Override
        @Nonnull
        public <S> FaultTolerant<TimestampedStream<T>, S> createSnapshotFn(@Nonnull FunctionEx<? super C, ? extends S> createSnapshotFn) {
            return new FaultTolerant(this, createSnapshotFn);
        }

        @Nonnull
        public StreamSource<T> build() {
            Preconditions.checkNotNull(this.fillBufferFn, "fillBufferFn must be set");
            return new StreamSourceTransform(SourceBuilder.this.name, eventTimePolicy -> SourceProcessors.convenientTimestampedSourceP(SourceBuilder.this.createFn, this.fillBufferFn, eventTimePolicy, SourceBuilder.this.createSnapshotFn, SourceBuilder.this.restoreSnapshotFn, SourceBuilder.this.destroyFn, SourceBuilder.this.preferredLocalParallelism), true, true);
        }
    }

    public final class Stream<T>
    extends BaseNoTimestamps<T> {
        private Stream() {
        }

        @Override
        @Nonnull
        public <T_NEW> Stream<T_NEW> fillBufferFn(@Nonnull BiConsumerEx<? super C, ? super SourceBuffer<T_NEW>> fillBufferFn) {
            return (Stream)super.fillBufferFn(fillBufferFn);
        }

        @Override
        @Nonnull
        public Stream<T> destroyFn(@Nonnull ConsumerEx<? super C> pDestroyFn) {
            return (Stream)super.destroyFn(pDestroyFn);
        }

        @Override
        @Nonnull
        public Stream<T> distributed(int preferredLocalParallelism) {
            return (Stream)super.distributed(preferredLocalParallelism);
        }

        @Override
        @Nonnull
        public <S> FaultTolerant<Stream<T>, S> createSnapshotFn(@Nonnull FunctionEx<? super C, ? extends S> createSnapshotFn) {
            return new FaultTolerant(this, createSnapshotFn);
        }

        @Nonnull
        public StreamSource<T> build() {
            Preconditions.checkNotNull(this.fillBufferFn, "fillBufferFn() wasn't called");
            return new StreamSourceTransform(SourceBuilder.this.name, eventTimePolicy -> SourceProcessors.convenientSourceP(SourceBuilder.this.createFn, this.fillBufferFn, SourceBuilder.this.createSnapshotFn, SourceBuilder.this.restoreSnapshotFn, SourceBuilder.this.destroyFn, SourceBuilder.this.preferredLocalParallelism, false), false, false);
        }
    }

    public final class Batch<T>
    extends BaseNoTimestamps<T> {
        private Batch() {
        }

        @Override
        @Nonnull
        public <T_NEW> Batch<T_NEW> fillBufferFn(@Nonnull BiConsumerEx<? super C, ? super SourceBuffer<T_NEW>> fillBufferFn) {
            return (Batch)super.fillBufferFn(fillBufferFn);
        }

        @Override
        @Nonnull
        public Batch<T> destroyFn(@Nonnull ConsumerEx<? super C> destroyFn) {
            return (Batch)super.destroyFn(destroyFn);
        }

        @Override
        @Nonnull
        public Batch<T> distributed(int preferredLocalParallelism) {
            return (Batch)super.distributed(preferredLocalParallelism);
        }

        @Nonnull
        public BatchSource<T> build() {
            Preconditions.checkNotNull(this.fillBufferFn, "fillBufferFn must be non-null");
            return new BatchSourceTransform(SourceBuilder.this.name, SourceProcessors.convenientSourceP(SourceBuilder.this.createFn, this.fillBufferFn, SourceBuilder.this.createSnapshotFn, SourceBuilder.this.restoreSnapshotFn, SourceBuilder.this.destroyFn, SourceBuilder.this.preferredLocalParallelism, true));
        }

        @Override
        @Nonnull
        FaultTolerant createSnapshotFn(@Nonnull FunctionEx createSnapshotFn) {
            throw new UnsupportedOperationException();
        }
    }

    private abstract class BaseNoTimestamps<T>
    extends Base<T> {
        BiConsumerEx<? super C, ? super SourceBuffer<T>> fillBufferFn;

        private BaseNoTimestamps() {
        }

        @Nonnull
        public <T_NEW> BaseNoTimestamps<T_NEW> fillBufferFn(@Nonnull BiConsumerEx<? super C, ? super SourceBuffer<T_NEW>> fillBufferFn) {
            BaseNoTimestamps newThis = this;
            newThis.fillBufferFn = fillBufferFn;
            return newThis;
        }
    }

    private abstract class Base<T> {
        private Base() {
        }

        @Nonnull
        public Base<T> destroyFn(@Nonnull ConsumerEx<? super C> destroyFn) {
            SourceBuilder.this.destroyFn = destroyFn;
            return this;
        }

        @Nonnull
        public Base<T> distributed(int preferredLocalParallelism) {
            Preconditions.checkPositive(preferredLocalParallelism, "Preferred local parallelism must >= 1");
            SourceBuilder.this.preferredLocalParallelism = preferredLocalParallelism;
            return this;
        }

        @Nonnull
        abstract <S> FaultTolerant<? extends Base<T>, S> createSnapshotFn(@Nonnull FunctionEx<? super C, ? extends S> var1);
    }

    public static interface TimestampedSourceBuffer<T>
    extends SourceBuffer<T> {
        public void add(@Nonnull T var1, long var2);

        @Override
        default public void add(@Nonnull T item) {
            this.add(item, System.currentTimeMillis());
        }
    }

    public static interface SourceBuffer<T> {
        public int size();

        public void close() throws JetException;

        public void add(@Nonnull T var1);
    }
}

