/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.processor;

import com.hazelcast.cache.journal.EventJournalCacheEvent;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.ToResultSetFunction;
import com.hazelcast.jet.impl.JetEvent;
import com.hazelcast.jet.impl.connector.ConvenientSourceP;
import com.hazelcast.jet.impl.connector.ReadFilesP;
import com.hazelcast.jet.impl.connector.ReadIListP;
import com.hazelcast.jet.impl.connector.ReadJdbcP;
import com.hazelcast.jet.impl.connector.ReadWithPartitionIteratorP;
import com.hazelcast.jet.impl.connector.StreamEventJournalP;
import com.hazelcast.jet.impl.connector.StreamFilesP;
import com.hazelcast.jet.impl.connector.StreamJmsP;
import com.hazelcast.jet.impl.connector.StreamSocketP;
import com.hazelcast.jet.impl.pipeline.SourceBufferImpl;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.map.journal.EventJournalMapEvent;
import com.hazelcast.projection.Projection;
import com.hazelcast.query.Predicate;
import com.hazelcast.util.Preconditions;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.sql.ResultSet;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

public final class SourceProcessors {
    private SourceProcessors() {
    }

    @Nonnull
    public static ProcessorMetaSupplier readMapP(@Nonnull String mapName) {
        return ReadWithPartitionIteratorP.readMapSupplier(mapName);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier readMapP(@Nonnull String mapName, @Nonnull Predicate<? super K, ? super V> predicate, @Nonnull Projection<? super Map.Entry<K, V>, ? extends T> projectionFn) {
        return ReadWithPartitionIteratorP.readMapSupplier(mapName, predicate, projectionFn);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier readMapP(@Nonnull String mapName, @Nonnull Predicate<? super K, ? super V> predicate, @Nonnull DistributedFunction<? super Map.Entry<K, V>, ? extends T> projectionFn) {
        return ReadWithPartitionIteratorP.readMapSupplier(mapName, predicate, SourceProcessors.toProjection(projectionFn));
    }

    @Nonnull
    public static <K, V> ProcessorMetaSupplier streamMapP(@Nonnull String mapName, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super Map.Entry<K, V>> wmGenParams) {
        return SourceProcessors.streamMapP(mapName, Util.mapPutEvents(), Util.mapEventToEntry(), initialPos, wmGenParams);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier streamMapP(@Nonnull String mapName, @Nonnull DistributedPredicate<? super EventJournalMapEvent<K, V>> predicateFn, @Nonnull DistributedFunction<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super T> wmGenParams) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(predicateFn, "predicateFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(projectionFn, "projectionFn");
        return StreamEventJournalP.streamMapSupplier(mapName, predicateFn, projectionFn, initialPos, wmGenParams);
    }

    @Nonnull
    public static ProcessorMetaSupplier readRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig) {
        return ReadWithPartitionIteratorP.readRemoteMapSupplier(mapName, clientConfig);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier readRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull Predicate<? super K, ? super V> predicate, @Nonnull Projection<? super Map.Entry<K, V>, ? extends T> projection) {
        return ReadWithPartitionIteratorP.readRemoteMapSupplier(mapName, clientConfig, projection, predicate);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier readRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull Predicate<? super K, ? super V> predicate, @Nonnull DistributedFunction<? super Map.Entry<K, V>, ? extends T> projectionFn) {
        return ReadWithPartitionIteratorP.readRemoteMapSupplier(mapName, clientConfig, SourceProcessors.toProjection(projectionFn), predicate);
    }

    @Nonnull
    public static <K, V> ProcessorMetaSupplier streamRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super Map.Entry<K, V>> wmGenParams) {
        return SourceProcessors.streamRemoteMapP(mapName, clientConfig, Util.mapPutEvents(), Util.mapEventToEntry(), initialPos, wmGenParams);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier streamRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedPredicate<? super EventJournalMapEvent<K, V>> predicateFn, @Nonnull DistributedFunction<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super T> wmGenParams) {
        return StreamEventJournalP.streamRemoteMapSupplier(mapName, clientConfig, predicateFn, projectionFn, initialPos, wmGenParams);
    }

    @Nonnull
    public static ProcessorMetaSupplier readCacheP(@Nonnull String cacheName) {
        return ReadWithPartitionIteratorP.readCacheSupplier(cacheName);
    }

    @Nonnull
    public static <K, V> ProcessorMetaSupplier streamCacheP(@Nonnull String cacheName, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super Map.Entry<K, V>> wmGenParams) {
        return SourceProcessors.streamCacheP(cacheName, Util.cachePutEvents(), Util.cacheEventToEntry(), initialPos, wmGenParams);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier streamCacheP(@Nonnull String cacheName, @Nonnull DistributedPredicate<? super EventJournalCacheEvent<K, V>> predicateFn, @Nonnull DistributedFunction<? super EventJournalCacheEvent<K, V>, ? extends T> projectionFn, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super T> wmGenParams) {
        return StreamEventJournalP.streamCacheSupplier(cacheName, predicateFn, projectionFn, initialPos, wmGenParams);
    }

    @Nonnull
    public static ProcessorMetaSupplier readRemoteCacheP(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig) {
        return ReadWithPartitionIteratorP.readRemoteCacheSupplier(cacheName, clientConfig);
    }

    @Nonnull
    public static <K, V> ProcessorMetaSupplier streamRemoteCacheP(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super Map.Entry<K, V>> wmGenParams) {
        return SourceProcessors.streamRemoteCacheP(cacheName, clientConfig, Util.cachePutEvents(), Util.cacheEventToEntry(), initialPos, wmGenParams);
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier streamRemoteCacheP(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedPredicate<? super EventJournalCacheEvent<K, V>> predicateFn, @Nonnull DistributedFunction<? super EventJournalCacheEvent<K, V>, ? extends T> projectionFn, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super T> wmGenParams) {
        return StreamEventJournalP.streamRemoteCacheSupplier(cacheName, clientConfig, predicateFn, projectionFn, initialPos, wmGenParams);
    }

    @Nonnull
    public static ProcessorMetaSupplier readListP(@Nonnull String listName) {
        return ReadIListP.metaSupplier(listName, null);
    }

    @Nonnull
    public static ProcessorMetaSupplier readRemoteListP(@Nonnull String listName, @Nonnull ClientConfig clientConfig) {
        return ReadIListP.metaSupplier(listName, clientConfig);
    }

    @Nonnull
    public static ProcessorMetaSupplier streamSocketP(@Nonnull String host, int port, @Nonnull Charset charset) {
        return StreamSocketP.supplier(host, port, charset.name());
    }

    @Nonnull
    public static <R> ProcessorMetaSupplier readFilesP(@Nonnull String directory, @Nonnull Charset charset, @Nonnull String glob, boolean sharedFileSystem, @Nonnull DistributedBiFunction<? super String, ? super String, ? extends R> mapOutputFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(mapOutputFn, "mapOutputFn");
        String charsetName = charset.name();
        return ReadFilesP.metaSupplier(directory, glob, sharedFileSystem, path -> Files.lines(path, Charset.forName(charsetName)), mapOutputFn);
    }

    @Nonnull
    public static ProcessorMetaSupplier streamFilesP(@Nonnull String watchedDirectory, @Nonnull Charset charset, @Nonnull String glob, boolean sharedFileSystem, @Nonnull DistributedBiFunction<? super String, ? super String, ?> mapOutputFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(mapOutputFn, "mapOutputFn");
        return StreamFilesP.metaSupplier(watchedDirectory, charset.name(), glob, sharedFileSystem, mapOutputFn);
    }

    @Nonnull
    public static <T> ProcessorMetaSupplier streamJmsQueueP(@Nonnull DistributedSupplier<? extends Connection> connectionSupplier, @Nonnull DistributedFunction<? super Connection, ? extends Session> sessionFn, @Nonnull DistributedFunction<? super Session, ? extends MessageConsumer> consumerFn, @Nonnull DistributedConsumer<? super Session> flushFn, @Nonnull DistributedFunction<? super Message, ? extends T> projectionFn) {
        return ProcessorMetaSupplier.of(StreamJmsP.supplier(connectionSupplier, sessionFn, consumerFn, flushFn, projectionFn), 4);
    }

    @Nonnull
    public static <T> ProcessorMetaSupplier streamJmsTopicP(@Nonnull DistributedSupplier<? extends Connection> connectionSupplier, @Nonnull DistributedFunction<? super Connection, ? extends Session> sessionFn, @Nonnull DistributedFunction<? super Session, ? extends MessageConsumer> consumerFn, @Nonnull DistributedConsumer<? super Session> flushFn, @Nonnull DistributedFunction<? super Message, ? extends T> projectionFn) {
        return ProcessorMetaSupplier.forceTotalParallelismOne(StreamJmsP.supplier(connectionSupplier, sessionFn, consumerFn, flushFn, projectionFn));
    }

    public static <T> ProcessorMetaSupplier readJdbcP(@Nonnull DistributedSupplier<? extends java.sql.Connection> connectionSupplier, @Nonnull ToResultSetFunction resultSetFn, @Nonnull DistributedFunction<? super ResultSet, ? extends T> mapOutputFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(connectionSupplier, "connectionSupplier");
        com.hazelcast.jet.impl.util.Util.checkSerializable(resultSetFn, "resultSetFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(mapOutputFn, "mapOutputFn");
        return ReadJdbcP.supplier(connectionSupplier, resultSetFn, mapOutputFn);
    }

    public static <T> ProcessorMetaSupplier readJdbcP(@Nonnull String connectionURL, @Nonnull String query, @Nonnull DistributedFunction<? super ResultSet, ? extends T> mapOutputFn) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(mapOutputFn, "mapOutputFn");
        return ReadJdbcP.supplier(connectionURL, query, mapOutputFn);
    }

    private static <I, O> Projection<I, O> toProjection(final DistributedFunction<I, O> projectionFn) {
        return new Projection<I, O>(){

            @Override
            public O transform(I input) {
                return projectionFn.apply(input);
            }
        };
    }

    @Nonnull
    public static <S, T> ProcessorMetaSupplier convenientSourceP(@Nonnull DistributedFunction<? super Processor.Context, ? extends S> createFn, @Nonnull DistributedBiConsumer<? super S, ? super SourceBuilder.SourceBuffer<T>> fillBufferFn, @Nonnull DistributedConsumer<? super S> destroyFn, int preferredLocalParallelism) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(createFn, "createFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(fillBufferFn, "fillBufferFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(destroyFn, "destroyFn");
        Preconditions.checkNotNegative(preferredLocalParallelism + 1, "preferredLocalParallelism must >= -1");
        ProcessorSupplier procSup = ProcessorSupplier.of(() -> new ConvenientSourceP(createFn, fillBufferFn, destroyFn, new SourceBufferImpl.Plain(), null));
        return preferredLocalParallelism != 0 ? ProcessorMetaSupplier.of(procSup, preferredLocalParallelism) : ProcessorMetaSupplier.forceTotalParallelismOne(procSup);
    }

    @Nonnull
    public static <S, T> ProcessorMetaSupplier convenientTimestampedSourceP(@Nonnull DistributedFunction<? super Processor.Context, ? extends S> createFn, @Nonnull DistributedBiConsumer<? super S, ? super SourceBuilder.TimestampedSourceBuffer<T>> fillBufferFn, @Nonnull WatermarkGenerationParams<? super JetEvent<T>> wmParams, @Nonnull DistributedConsumer<? super S> destroyFn, int preferredLocalParallelism) {
        com.hazelcast.jet.impl.util.Util.checkSerializable(createFn, "createFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(fillBufferFn, "fillBufferFn");
        com.hazelcast.jet.impl.util.Util.checkSerializable(destroyFn, "destroyFn");
        Preconditions.checkNotNegative(preferredLocalParallelism + 1, "preferredLocalParallelism must >= -1");
        ProcessorSupplier procSup = ProcessorSupplier.of(() -> new ConvenientSourceP(createFn, fillBufferFn, destroyFn, new SourceBufferImpl.Timestamped(), wmParams));
        return preferredLocalParallelism > 0 ? ProcessorMetaSupplier.of(procSup, preferredLocalParallelism) : ProcessorMetaSupplier.forceTotalParallelismOne(procSup);
    }
}

