/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.kinesis.impl.sink;

import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.kinesis.impl.AwsConfig;
import com.hazelcast.jet.kinesis.impl.sink.KinesisSinkP;
import com.hazelcast.jet.kinesis.impl.sink.NoopShardCountMonitor;
import com.hazelcast.jet.kinesis.impl.sink.ShardCountMonitorImpl;
import com.hazelcast.jet.retry.RetryStrategy;
import com.hazelcast.logging.ILogger;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class KinesisSinkPSupplier<T>
implements ProcessorSupplier {
    private static final long serialVersionUID = 1L;
    @Nonnull
    private final AwsConfig awsConfig;
    @Nonnull
    private final String stream;
    @Nonnull
    private final FunctionEx<T, String> keyFn;
    @Nonnull
    private final FunctionEx<T, byte[]> valueFn;
    @Nonnull
    private final RetryStrategy retryStrategy;
    private transient AmazonKinesisAsync client;
    private transient int memberCount;
    private transient ILogger logger;

    public KinesisSinkPSupplier(@Nonnull AwsConfig awsConfig, @Nonnull String stream, @Nonnull FunctionEx<T, String> keyFn, @Nonnull FunctionEx<T, byte[]> valueFn, @Nonnull RetryStrategy retryStrategy) {
        this.awsConfig = awsConfig;
        this.stream = stream;
        this.keyFn = keyFn;
        this.valueFn = valueFn;
        this.retryStrategy = retryStrategy;
    }

    public void init(@Nonnull ProcessorSupplier.Context context) {
        this.memberCount = context.memberCount();
        this.logger = context.logger();
        this.client = this.awsConfig.buildClient();
    }

    @Nonnull
    public Collection<? extends Processor> get(int count) {
        ShardCountMonitorImpl shardCounter = new ShardCountMonitorImpl(this.memberCount, this.client, this.stream, this.retryStrategy, this.logger);
        NoopShardCountMonitor noopShardCounter = new NoopShardCountMonitor(shardCounter.getSharedShardCounter());
        return IntStream.range(0, count).mapToObj(i -> new KinesisSinkP<T>(this.client, this.stream, this.keyFn, this.valueFn, i == 0 ? shardCounter : noopShardCounter, this.retryStrategy)).collect(Collectors.toList());
    }

    public void close(@Nullable Throwable error) {
        if (this.client != null) {
            this.client.shutdown();
        }
    }
}

