/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kinesis;

import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.Generated;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.kinesis.KinesisRecord;
import org.apache.pulsar.io.kinesis.KinesisRecordProcessorFactory;
import org.apache.pulsar.io.kinesis.KinesisSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

@Connector(name="kinesis", type=IOType.SOURCE, help="A source connector that copies messages from Kinesis to Pulsar", configClass=KinesisSourceConfig.class)
public class KinesisSource
extends AbstractAwsConnector
implements Source<byte[]> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KinesisSource.class);
    private LinkedBlockingQueue<KinesisRecord> queue;
    private KinesisSourceConfig kinesisSourceConfig;
    private ConfigsBuilder configsBuilder;
    private ShardRecordProcessorFactory recordProcessorFactory;
    private String workerId;
    private Scheduler scheduler;
    private Thread schedulerThread;
    private Throwable threadEx;

    public void close() throws Exception {
        this.scheduler.shutdown();
    }

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.kinesisSourceConfig = KinesisSourceConfig.load(config, sourceContext);
        this.queue = new LinkedBlockingQueue(this.kinesisSourceConfig.getReceiveQueueSize());
        this.workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        AwsCredentialProviderPlugin credentialsProvider = this.createCredentialProvider(this.kinesisSourceConfig.getAwsCredentialPluginName(), this.kinesisSourceConfig.getAwsCredentialPluginParam());
        KinesisAsyncClient kClient = this.kinesisSourceConfig.buildKinesisAsyncClient(credentialsProvider);
        this.recordProcessorFactory = new KinesisRecordProcessorFactory(this.queue, this.kinesisSourceConfig);
        this.configsBuilder = new ConfigsBuilder(this.kinesisSourceConfig.getAwsKinesisStreamName(), this.kinesisSourceConfig.getApplicationName(), kClient, this.kinesisSourceConfig.buildDynamoAsyncClient(credentialsProvider), this.kinesisSourceConfig.buildCloudwatchAsyncClient(credentialsProvider), this.workerId, this.recordProcessorFactory);
        RetrievalConfig retrievalConfig = this.configsBuilder.retrievalConfig();
        if (!this.kinesisSourceConfig.isUseEnhancedFanOut()) {
            retrievalConfig.retrievalSpecificConfig((RetrievalSpecificConfig)new PollingConfig(this.kinesisSourceConfig.getAwsKinesisStreamName(), kClient));
        }
        retrievalConfig.initialPositionInStreamExtended(this.kinesisSourceConfig.getStreamStartPosition());
        this.scheduler = new Scheduler(this.configsBuilder.checkpointConfig(), this.configsBuilder.coordinatorConfig(), this.configsBuilder.leaseManagementConfig(), this.configsBuilder.lifecycleConfig(), this.configsBuilder.metricsConfig(), this.configsBuilder.processorConfig(), retrievalConfig);
        this.schedulerThread = new Thread((Runnable)this.scheduler);
        this.schedulerThread.setDaemon(true);
        this.threadEx = null;
        this.schedulerThread.setUncaughtExceptionHandler((t, ex) -> {
            this.threadEx = ex;
        });
        this.schedulerThread.start();
    }

    public KinesisRecord read() throws Exception {
        try {
            return this.queue.take();
        }
        catch (InterruptedException ex) {
            log.warn("Got interrupted when trying to fetch out of the queue");
            if (this.threadEx != null) {
                log.error("error from scheduler", this.threadEx);
            }
            throw ex;
        }
    }
}

