package org.springframework.integration.aws.inbound.kinesis;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

@IntegrationManagedResource
@ManagedResource
/* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.class */
public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport implements DisposableBean {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();
    private final AmazonKinesis amazonKinesis;
    private final String[] streams;
    private final Set<KinesisShardOffset> shardOffsets = new HashSet();
    private final Map<KinesisShardOffset, ShardConsumer> shardConsumers = new ConcurrentHashMap();
    private final Set<String> inResharding = new ConcurrentSkipListSet();
    private final List<ConsumerInvoker> consumerInvokers = new ArrayList();
    private final ShardConsumerManager shardConsumerManager = new ShardConsumerManager();
    private final ExecutorService shardLocksExecutor;
    private String consumerGroup;
    private ConcurrentMetadataStore checkpointStore;
    private Executor dispatcherExecutor;
    private boolean dispatcherExecutorExplicitlySet;
    private Executor consumerExecutor;
    private boolean consumerExecutorExplicitlySet;
    private int maxConcurrency;
    private int concurrency;
    private KinesisShardOffset streamInitialSequence;
    private Converter<byte[], Object> converter;
    private ListenerMode listenerMode;
    private CheckpointMode checkpointMode;
    private int recordsLimit;
    private int idleBetweenPolls;
    private int consumerBackoff;
    private int startTimeout;
    private int describeStreamBackoff;
    private int describeStreamRetries;
    private boolean resetCheckpoints;
    private InboundMessageMapper<byte[]> embeddedHeadersMapper;
    private LockRegistry lockRegistry;
    private volatile boolean active;
    private volatile int consumerInvokerMaxCapacity;
    private volatile Future<?> shardConsumerManagerFuture;

    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.class */
    private final class ConsumerDispatcher implements SchedulingAwareRunnable {
        private final Set<String> inReshardingProcess;

        private ConsumerDispatcher() {
            this.inReshardingProcess = new HashSet();
        }

        public void run() {
            while (KinesisMessageDrivenChannelAdapter.this.active) {
                for (String str : KinesisMessageDrivenChannelAdapter.this.inResharding) {
                    if (this.inReshardingProcess.add(str)) {
                        if (KinesisMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                            KinesisMessageDrivenChannelAdapter.this.logger.debug("Resharding has happened for stream [" + str + "]. Rebalancing...");
                        }
                        KinesisMessageDrivenChannelAdapter.this.populateShardsForStream(str, null);
                    }
                }
                Iterator it = KinesisMessageDrivenChannelAdapter.this.shardConsumers.values().iterator();
                while (it.hasNext()) {
                    ShardConsumer shardConsumer = (ShardConsumer) it.next();
                    shardConsumer.execute();
                    if (ConsumerState.STOP == shardConsumer.state) {
                        it.remove();
                        if (KinesisMessageDrivenChannelAdapter.this.streams != null && shardConsumer.shardIterator == null) {
                            KinesisShardOffset kinesisShardOffset = shardConsumer.shardOffset;
                            String stream = kinesisShardOffset.getStream();
                            if (KinesisMessageDrivenChannelAdapter.this.inResharding.add(stream)) {
                                this.inReshardingProcess.remove(stream);
                                synchronized (KinesisMessageDrivenChannelAdapter.this.shardOffsets) {
                                    KinesisMessageDrivenChannelAdapter.this.shardOffsets.remove(kinesisShardOffset);
                                }
                            } else {
                                continue;
                            }
                        }
                    }
                }
                try {
                    Thread.sleep(KinesisMessageDrivenChannelAdapter.this.idleBetweenPolls);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("ConsumerDispatcher Thread [" + this + "] has been interrupted", e);
                }
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ConsumerInvoker.class */
    public final class ConsumerInvoker implements SchedulingAwareRunnable {
        private final Queue<ShardConsumer> consumers = new ConcurrentLinkedQueue();
        private final Semaphore processBarrier = new Semaphore(0);
        private final Runnable notifier = this::notifyBarrier;

        ConsumerInvoker(Collection<ShardConsumer> collection) {
            Iterator<ShardConsumer> it = collection.iterator();
            while (it.hasNext()) {
                addConsumer(it.next());
            }
        }

        void addConsumer(ShardConsumer shardConsumer) {
            shardConsumer.setNotifier(this.notifier);
            this.consumers.add(shardConsumer);
        }

        void notifyBarrier() {
            this.processBarrier.release();
        }

        public void run() {
            while (KinesisMessageDrivenChannelAdapter.this.active) {
                try {
                    this.processBarrier.acquire();
                    Iterator<ShardConsumer> it = this.consumers.iterator();
                    while (it.hasNext()) {
                        ShardConsumer next = it.next();
                        if (ConsumerState.STOP == next.state) {
                            it.remove();
                        } else if (next.task != null) {
                            try {
                                next.task.run();
                            } catch (Exception e) {
                                KinesisMessageDrivenChannelAdapter.this.logger.info("Got an exception " + e + " during [" + next + "] task invocation.\nProcess will be retried on the next iteration.");
                            }
                        }
                    }
                    synchronized (KinesisMessageDrivenChannelAdapter.this.consumerInvokers) {
                        if (this.consumers.isEmpty()) {
                            KinesisMessageDrivenChannelAdapter.this.consumerInvokers.remove(this);
                            return;
                        }
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("ConsumerInvoker thread [" + this + "] has been interrupted", e2);
                }
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ConsumerState.class */
    public enum ConsumerState {
        NEW,
        EXPIRED,
        CONSUME,
        SLEEP,
        STOP
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ShardConsumer.class */
    public final class ShardConsumer {
        private final KinesisShardOffset shardOffset;
        private final ShardCheckpointer checkpointer;
        private final String key;
        private Runnable notifier;
        private volatile Runnable task;
        private volatile String shardIterator;
        private volatile long sleepUntil;
        private final Runnable processTask = processTask();
        private volatile ConsumerState state = ConsumerState.NEW;

        ShardConsumer(KinesisShardOffset kinesisShardOffset) {
            this.shardOffset = new KinesisShardOffset(kinesisShardOffset);
            this.key = KinesisMessageDrivenChannelAdapter.this.buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard());
            this.checkpointer = new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore, this.key);
        }

        void setNotifier(Runnable runnable) {
            this.notifier = runnable;
        }

        void stop() {
            this.state = ConsumerState.STOP;
            if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.unlock(this.key);
            }
            if (this.notifier != null) {
                this.notifier.run();
            }
        }

        void close() {
            stop();
            this.checkpointer.close();
        }

        void execute() {
            if (this.task == null) {
                switch (this.state) {
                    case NEW:
                    case EXPIRED:
                        this.task = () -> {
                            try {
                                if (KinesisMessageDrivenChannelAdapter.this.logger.isInfoEnabled() && this.state == ConsumerState.NEW) {
                                    KinesisMessageDrivenChannelAdapter.this.logger.info("The [" + this + "] has been started.");
                                }
                                if (this.shardOffset.isReset()) {
                                    this.checkpointer.remove();
                                } else {
                                    String checkpoint = this.checkpointer.getCheckpoint();
                                    if (checkpoint != null) {
                                        this.shardOffset.setSequenceNumber(checkpoint);
                                        this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
                                    }
                                }
                                this.shardIterator = KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getShardIterator(this.shardOffset.toShardIteratorRequest()).getShardIterator();
                                if (ConsumerState.STOP != this.state) {
                                    this.state = ConsumerState.CONSUME;
                                }
                            } finally {
                                this.task = null;
                            }
                        };
                        break;
                    case CONSUME:
                        this.task = this.processTask;
                        break;
                    case SLEEP:
                        if (System.currentTimeMillis() >= this.sleepUntil) {
                            this.state = ConsumerState.CONSUME;
                        }
                        this.task = null;
                        break;
                    case STOP:
                        if (this.shardIterator == null) {
                            if (KinesisMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                                KinesisMessageDrivenChannelAdapter.this.logger.info("Stopping the [" + this + "] on the checkpoint [" + this.checkpointer.getCheckpoint() + "] because the shard has been CLOSED and exhausted.");
                            }
                        } else if (KinesisMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                            KinesisMessageDrivenChannelAdapter.this.logger.info("Stopping the [" + this + "].");
                        }
                        this.task = null;
                        break;
                }
                if (this.task != null) {
                    if (this.notifier != null) {
                        this.notifier.run();
                    }
                    if (KinesisMessageDrivenChannelAdapter.this.concurrency == 0) {
                        KinesisMessageDrivenChannelAdapter.this.consumerExecutor.execute(this.task);
                    }
                }
            }
        }

        private Runnable processTask() {
            return () -> {
                GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
                getRecordsRequest.setShardIterator(this.shardIterator);
                getRecordsRequest.setLimit(Integer.valueOf(KinesisMessageDrivenChannelAdapter.this.recordsLimit));
                GetRecordsResult getRecordsResult = null;
                try {
                    getRecordsResult = getRecords(getRecordsRequest);
                    if (getRecordsResult != null) {
                        List<Record> records = getRecordsResult.getRecords();
                        if (!records.isEmpty()) {
                            processRecords(records);
                        }
                    }
                    KinesisMessageDrivenChannelAdapter.attributesHolder.remove();
                    if (getRecordsResult != null) {
                        this.shardIterator = getRecordsResult.getNextShardIterator();
                        if (this.shardIterator == null) {
                            stop();
                        }
                        if (ConsumerState.STOP != this.state && getRecordsResult.getRecords().isEmpty()) {
                            if (KinesisMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                                KinesisMessageDrivenChannelAdapter.this.logger.debug("No records for [" + this + "] on sequenceNumber [" + this.checkpointer.getLastCheckpointValue() + "]. Suspend consuming for [" + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + "] milliseconds.");
                            }
                            prepareSleepState();
                        }
                    }
                    this.task = null;
                } catch (Throwable th) {
                    KinesisMessageDrivenChannelAdapter.attributesHolder.remove();
                    if (getRecordsResult != null) {
                        this.shardIterator = getRecordsResult.getNextShardIterator();
                        if (this.shardIterator == null) {
                            stop();
                        }
                        if (ConsumerState.STOP != this.state && getRecordsResult.getRecords().isEmpty()) {
                            if (KinesisMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                                KinesisMessageDrivenChannelAdapter.this.logger.debug("No records for [" + this + "] on sequenceNumber [" + this.checkpointer.getLastCheckpointValue() + "]. Suspend consuming for [" + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + "] milliseconds.");
                            }
                            prepareSleepState();
                        }
                    }
                    this.task = null;
                    throw th;
                }
            };
        }

        private GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
            try {
                return KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getRecords(getRecordsRequest);
            } catch (ExpiredIteratorException e) {
                if (KinesisMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                    KinesisMessageDrivenChannelAdapter.this.logger.info("Shard iterator for [" + this + "] expired.\nA new one will be started from the check pointed sequence number.");
                }
                this.state = ConsumerState.EXPIRED;
                return null;
            } catch (ProvisionedThroughputExceededException e2) {
                if (KinesisMessageDrivenChannelAdapter.this.logger.isWarnEnabled()) {
                    KinesisMessageDrivenChannelAdapter.this.logger.warn("GetRecords request throttled for [" + this + "] with the reason: " + e2.getErrorMessage());
                }
                prepareSleepState();
                return null;
            }
        }

        private void prepareSleepState() {
            this.sleepUntil = System.currentTimeMillis() + KinesisMessageDrivenChannelAdapter.this.consumerBackoff;
            this.state = ConsumerState.SLEEP;
        }

        private void processRecords(List<Record> list) {
            ArrayList arrayList;
            ArrayList arrayList2;
            if (KinesisMessageDrivenChannelAdapter.this.logger.isTraceEnabled()) {
                KinesisMessageDrivenChannelAdapter.this.logger.trace("Processing records: " + list + " for [" + this + "]");
            }
            this.checkpointer.setHighestSequence(list.get(list.size() - 1).getSequenceNumber());
            switch (KinesisMessageDrivenChannelAdapter.this.listenerMode) {
                case record:
                    for (Record record : list) {
                        performSend(prepareMessageForRecord(record), record);
                        if (CheckpointMode.record.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                            this.checkpointer.checkpoint(record.getSequenceNumber());
                        }
                    }
                    break;
                case batch:
                    Object obj = list;
                    if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                        obj = list.stream().map(this::prepareMessageForRecord).collect(Collectors.toList());
                    }
                    if (KinesisMessageDrivenChannelAdapter.this.converter != null) {
                        arrayList = new ArrayList();
                        arrayList2 = new ArrayList();
                        obj = list.stream().map(record2 -> {
                            arrayList.add(record2.getPartitionKey());
                            arrayList2.add(record2.getSequenceNumber());
                            return KinesisMessageDrivenChannelAdapter.this.converter.convert(record2.getData().array());
                        }).collect(Collectors.toList());
                    } else {
                        arrayList = null;
                        arrayList2 = null;
                    }
                    performSend(KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(obj).setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, arrayList).setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, arrayList2), list);
                    break;
            }
            if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpointer.checkpoint();
            }
        }

        private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(Record record) {
            Object array = record.getData().array();
            Message message = null;
            if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                try {
                    message = KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage((byte[]) array);
                    array = message.getPayload();
                } catch (Exception e) {
                    KinesisMessageDrivenChannelAdapter.this.logger.warn("Could not parse embedded headers. Remain payload untouched.", e);
                }
            }
            if ((array instanceof byte[]) && KinesisMessageDrivenChannelAdapter.this.converter != null) {
                array = KinesisMessageDrivenChannelAdapter.this.converter.convert((byte[]) array);
            }
            AbstractIntegrationMessageBuilder<Object> header = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(array).setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, record.getPartitionKey()).setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, record.getSequenceNumber());
            if (message != null) {
                header.copyHeadersIfAbsent(message.getHeaders());
            }
            return header;
        }

        private void performSend(AbstractIntegrationMessageBuilder<?> abstractIntegrationMessageBuilder, Object obj) {
            abstractIntegrationMessageBuilder.setHeader(AwsHeaders.RECEIVED_STREAM, this.shardOffset.getStream()).setHeader(AwsHeaders.SHARD, this.shardOffset.getShard());
            if (CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                abstractIntegrationMessageBuilder.setHeader(AwsHeaders.CHECKPOINTER, this.checkpointer);
            }
            Message build = abstractIntegrationMessageBuilder.build();
            KinesisMessageDrivenChannelAdapter.this.setAttributesIfNecessary(obj, build);
            try {
                KinesisMessageDrivenChannelAdapter.this.sendMessage(build);
            } catch (Exception e) {
                KinesisMessageDrivenChannelAdapter.this.logger.info("Got an exception during sending a '" + build + "'\nfor the '" + obj + "'.\nConsider to use 'errorChannel' flow for the compensation logic.", e);
            }
        }

        public String toString() {
            return "ShardConsumer{shardOffset=" + this.shardOffset + ", state=" + this.state + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ShardConsumerManager.class */
    public final class ShardConsumerManager implements SchedulingAwareRunnable {
        private final Map<String, KinesisShardOffset> shardOffsetsToConsumer = new ConcurrentHashMap();
        private final Map<String, Lock> locks = new HashMap();
        private final Queue<String> forUnlocking = new ConcurrentLinkedQueue();

        ShardConsumerManager() {
        }

        void addShardToConsume(KinesisShardOffset kinesisShardOffset) {
            this.shardOffsetsToConsumer.put(KinesisMessageDrivenChannelAdapter.this.buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard()), kinesisShardOffset);
        }

        void unlock(String str) {
            this.forUnlocking.add(str);
        }

        public void run() {
            String poll;
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    this.shardOffsetsToConsumer.entrySet().removeIf(entry -> {
                        boolean z = true;
                        if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                            String str = (String) entry.getKey();
                            Lock obtain = KinesisMessageDrivenChannelAdapter.this.lockRegistry.obtain(str);
                            try {
                                if (obtain.tryLock()) {
                                    this.locks.put(str, obtain);
                                } else {
                                    z = false;
                                }
                            } catch (Exception e) {
                                KinesisMessageDrivenChannelAdapter.this.logger.error("Error during locking: " + obtain, e);
                            }
                        }
                        if (z) {
                            KinesisMessageDrivenChannelAdapter.this.populateConsumer((KinesisShardOffset) entry.getValue());
                        }
                        return z;
                    });
                    while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null && (poll = this.forUnlocking.poll()) != null) {
                        Lock remove = this.locks.remove(poll);
                        if (remove != null) {
                            try {
                                remove.unlock();
                            } catch (Exception e) {
                                KinesisMessageDrivenChannelAdapter.this.logger.error("Error during unlocking: " + remove, e);
                            }
                        }
                    }
                    try {
                        Thread.sleep(250L);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        throw new IllegalStateException("ShardConsumerManager Thread [" + this + "] has been interrupted", e2);
                    }
                } catch (Throwable th) {
                    Iterator<Lock> it = this.locks.values().iterator();
                    while (it.hasNext()) {
                        Lock next = it.next();
                        try {
                            try {
                                next.unlock();
                                it.remove();
                            } catch (Exception e3) {
                                KinesisMessageDrivenChannelAdapter.this.logger.error("Error during unlocking: " + next, e3);
                                it.remove();
                            }
                        } catch (Throwable th2) {
                            it.remove();
                            throw th2;
                        }
                    }
                    throw th;
                }
            }
            Iterator<Lock> it2 = this.locks.values().iterator();
            while (it2.hasNext()) {
                Lock next2 = it2.next();
                try {
                    try {
                        next2.unlock();
                        it2.remove();
                    } catch (Throwable th3) {
                        it2.remove();
                        throw th3;
                    }
                } catch (Exception e4) {
                    KinesisMessageDrivenChannelAdapter.this.logger.error("Error during unlocking: " + next2, e4);
                    it2.remove();
                }
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, String... strArr) {
        this.shardLocksExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory((getComponentName() == null ? "" : getComponentName()) + "-kinesis-shard-locks-"));
        this.consumerGroup = "SpringIntegration";
        this.checkpointStore = new SimpleMetadataStore();
        this.streamInitialSequence = KinesisShardOffset.latest();
        this.converter = new DeserializingConverter();
        this.listenerMode = ListenerMode.record;
        this.checkpointMode = CheckpointMode.batch;
        this.recordsLimit = 10000;
        this.idleBetweenPolls = 1000;
        this.consumerBackoff = 1000;
        this.startTimeout = 60000;
        this.describeStreamBackoff = 1000;
        this.describeStreamRetries = 50;
        Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null.");
        Assert.notEmpty(strArr, "'streams' must not be null.");
        this.amazonKinesis = amazonKinesis;
        this.streams = (String[]) Arrays.copyOf(strArr, strArr.length);
    }

    public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, KinesisShardOffset... kinesisShardOffsetArr) {
        this.shardLocksExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory((getComponentName() == null ? "" : getComponentName()) + "-kinesis-shard-locks-"));
        this.consumerGroup = "SpringIntegration";
        this.checkpointStore = new SimpleMetadataStore();
        this.streamInitialSequence = KinesisShardOffset.latest();
        this.converter = new DeserializingConverter();
        this.listenerMode = ListenerMode.record;
        this.checkpointMode = CheckpointMode.batch;
        this.recordsLimit = 10000;
        this.idleBetweenPolls = 1000;
        this.consumerBackoff = 1000;
        this.startTimeout = 60000;
        this.describeStreamBackoff = 1000;
        this.describeStreamRetries = 50;
        Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null.");
        Assert.notEmpty(kinesisShardOffsetArr, "'shardOffsets' must not be null.");
        Assert.noNullElements(kinesisShardOffsetArr, "'shardOffsets' must not contain null elements.");
        for (KinesisShardOffset kinesisShardOffset : kinesisShardOffsetArr) {
            Assert.isTrue(StringUtils.hasText(kinesisShardOffset.getStream()) && StringUtils.hasText(kinesisShardOffset.getShard()), "The 'shardOffsets' must be provided with particular 'stream' and 'shard' values.");
            this.shardOffsets.add(new KinesisShardOffset(kinesisShardOffset));
        }
        this.amazonKinesis = amazonKinesis;
        this.streams = null;
    }

    public void setConsumerGroup(String str) {
        Assert.hasText(str, "'consumerGroup' must not be empty");
        this.consumerGroup = str;
    }

    public void setCheckpointStore(ConcurrentMetadataStore concurrentMetadataStore) {
        Assert.notNull(concurrentMetadataStore, "'checkpointStore' must not be null");
        this.checkpointStore = concurrentMetadataStore;
    }

    public void setConsumerExecutor(Executor executor) {
        Assert.notNull(executor, "'executor' must not be null");
        this.consumerExecutor = executor;
        this.consumerExecutorExplicitlySet = true;
    }

    public void setDispatcherExecutor(Executor executor) {
        this.dispatcherExecutor = executor;
        this.dispatcherExecutorExplicitlySet = true;
    }

    public void setStreamInitialSequence(KinesisShardOffset kinesisShardOffset) {
        Assert.notNull(kinesisShardOffset, "'streamInitialSequence' must not be null");
        this.streamInitialSequence = kinesisShardOffset;
    }

    public void setConverter(Converter<byte[], Object> converter) {
        this.converter = converter;
    }

    public void setListenerMode(ListenerMode listenerMode) {
        Assert.notNull(listenerMode, "'listenerMode' must not be null");
        this.listenerMode = listenerMode;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.notNull(checkpointMode, "'checkpointMode' must not be null");
        this.checkpointMode = checkpointMode;
    }

    public void setRecordsLimit(int i) {
        Assert.isTrue(i > 0, "'recordsLimit' must be more than 0");
        this.recordsLimit = Math.min(10000, i);
    }

    public void setConsumerBackoff(int i) {
        this.consumerBackoff = Math.max(1000, i);
    }

    public void setDescribeStreamBackoff(int i) {
        this.describeStreamBackoff = Math.max(1000, i);
    }

    public void setDescribeStreamRetries(int i) {
        Assert.isTrue(i > 0, "'describeStreamRetries' must be more than 0");
        this.describeStreamRetries = i;
    }

    public void setStartTimeout(int i) {
        Assert.isTrue(i > 0, "'startTimeout' must be more than 0");
        this.startTimeout = i;
    }

    public void setConcurrency(int i) {
        this.maxConcurrency = i;
    }

    public void setIdleBetweenPolls(int i) {
        this.idleBetweenPolls = Math.max(250, i);
    }

    public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> inboundMessageMapper) {
        this.embeddedHeadersMapper = inboundMessageMapper;
    }

    public void setLockRegistry(LockRegistry lockRegistry) {
        this.lockRegistry = lockRegistry;
    }

    protected void onInit() {
        super.onInit();
        if (this.consumerExecutor == null) {
            this.consumerExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory((getComponentName() == null ? "" : getComponentName()) + "-kinesis-consumer-"));
        }
        if (this.dispatcherExecutor == null) {
            this.dispatcherExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory((getComponentName() == null ? "" : getComponentName()) + "-kinesis-dispatcher-"));
        }
        if (this.streams == null) {
            if (this.lockRegistry != null) {
                this.logger.warn("The LockRegistry is ignored when explicit shards configuration is used.");
            }
            this.lockRegistry = null;
        }
    }

    public void destroy() {
        if (!this.consumerExecutorExplicitlySet) {
            ((ExecutorService) this.consumerExecutor).shutdown();
        }
        if (this.dispatcherExecutorExplicitlySet) {
            return;
        }
        ((ExecutorService) this.dispatcherExecutor).shutdown();
    }

    @ManagedOperation
    public void stopConsumer(String str, String str2) {
        ShardConsumer remove = this.shardConsumers.remove(KinesisShardOffset.latest(str, str2));
        if (remove != null) {
            remove.stop();
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("There is no ShardConsumer for shard [" + str2 + "] in stream [" + str2 + "] to stop.");
        }
    }

    @ManagedOperation
    public void startConsumer(String str, String str2) {
        KinesisShardOffset latest = KinesisShardOffset.latest(str, str2);
        ShardConsumer shardConsumer = this.shardConsumers.get(latest);
        if (shardConsumer != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("The [" + shardConsumer + "] has been started before.");
                return;
            }
            return;
        }
        synchronized (this.shardOffsets) {
            Iterator<KinesisShardOffset> it = this.shardOffsets.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                KinesisShardOffset next = it.next();
                if (latest.equals(next)) {
                    this.shardConsumerManager.addShardToConsume(next);
                    break;
                }
            }
        }
    }

    @ManagedOperation
    public void resetCheckpointForShardToLatest(String str, String str2) {
        restartShardConsumerForOffset(KinesisShardOffset.latest(str, str2));
    }

    @ManagedOperation
    public void resetCheckpointForShardToTrimHorizon(String str, String str2) {
        restartShardConsumerForOffset(KinesisShardOffset.trimHorizon(str, str2));
    }

    @ManagedOperation
    public void resetCheckpointForShardToSequenceNumber(String str, String str2, String str3) {
        restartShardConsumerForOffset(KinesisShardOffset.atSequenceNumber(str, str2, str3));
    }

    @ManagedOperation
    public void resetCheckpointForShardAtTimestamp(String str, String str2, long j) {
        restartShardConsumerForOffset(KinesisShardOffset.atTimestamp(str, str2, new Date(j)));
    }

    private void restartShardConsumerForOffset(KinesisShardOffset kinesisShardOffset) {
        Assert.isTrue(this.shardOffsets.contains(kinesisShardOffset), "The [" + this + "] doesn't operate shard [" + kinesisShardOffset.getShard() + "] for stream [" + kinesisShardOffset.getStream() + "]");
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Resetting consumer for [" + kinesisShardOffset + "]...");
        }
        kinesisShardOffset.reset();
        synchronized (this.shardOffsets) {
            this.shardOffsets.remove(kinesisShardOffset);
            this.shardOffsets.add(kinesisShardOffset);
        }
        if (this.active) {
            ShardConsumer remove = this.shardConsumers.remove(kinesisShardOffset);
            if (remove != null) {
                remove.close();
            }
            kinesisShardOffset.setReset(true);
            this.shardConsumerManager.addShardToConsume(kinesisShardOffset);
        }
    }

    @ManagedOperation
    public void resetCheckpoints() {
        this.resetCheckpoints = true;
        if (this.active) {
            stopConsumers();
            populateConsumers();
        }
    }

    protected void doStart() {
        super.doStart();
        if (ListenerMode.batch.equals(this.listenerMode) && CheckpointMode.record.equals(this.checkpointMode)) {
            this.checkpointMode = CheckpointMode.batch;
            this.logger.warn("The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] because it does not make sense in case of [ListenerMode.batch].");
        }
        if (this.streams != null) {
            populateShardsForStreams();
        }
        populateConsumers();
        this.active = true;
        this.concurrency = Math.min(this.maxConcurrency, this.shardOffsets.size());
        this.dispatcherExecutor.execute(new ConsumerDispatcher());
        this.shardConsumerManagerFuture = this.shardLocksExecutor.submit((Runnable) this.shardConsumerManager);
    }

    private Collection<ShardConsumer> shardConsumerSubset(int i) {
        ArrayList arrayList = new ArrayList(this.shardConsumers.values());
        if (this.concurrency == 1) {
            return arrayList;
        }
        int size = arrayList.size();
        if (size == this.concurrency) {
            return Collections.singleton(arrayList.get(i));
        }
        int i2 = size / this.concurrency;
        return i == this.concurrency - 1 ? arrayList.subList(i * i2, size) : arrayList.subList(i * i2, (i + 1) * i2);
    }

    private void populateShardsForStreams() {
        this.shardOffsets.clear();
        CountDownLatch countDownLatch = new CountDownLatch(this.streams.length);
        for (String str : this.streams) {
            populateShardsForStream(str, countDownLatch);
        }
        try {
            if (!countDownLatch.await(this.startTimeout, TimeUnit.MILLISECONDS)) {
                throw new IllegalStateException("The [ " + this + "] could not start during timeout: " + this.startTimeout);
            }
        } catch (InterruptedException e) {
            throw new IllegalStateException("The [ " + this + "] has been interrupted from start.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void populateShardsForStream(String str, CountDownLatch countDownLatch) {
        this.dispatcherExecutor.execute(() -> {
            int i;
            ArrayList arrayList;
            String str2;
            try {
                i = 0;
                arrayList = new ArrayList();
                str2 = null;
            } finally {
                if (countDownLatch != null) {
                }
                this.inResharding.remove(str);
            }
            while (true) {
                DescribeStreamResult describeStreamResult = null;
                try {
                    describeStreamResult = this.amazonKinesis.describeStream(new DescribeStreamRequest().withStreamName(str).withExclusiveStartShardId(str2));
                } catch (Exception e) {
                    this.logger.info("Got an exception when describing stream [" + str + "]. Backing off for [" + this.describeStreamBackoff + "] millis.", e);
                }
                if (describeStreamResult != null && StreamStatus.ACTIVE.toString().equals(describeStreamResult.getStreamDescription().getStreamStatus())) {
                    List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
                    try {
                        for (Shard shard : shards) {
                            String buildCheckpointKeyForShard = buildCheckpointKeyForShard(str, shard.getShardId());
                            String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
                            if (endingSequenceNumber != null) {
                                String str3 = this.checkpointStore.get(buildCheckpointKeyForShard);
                                boolean z = str3 != null && new BigInteger(endingSequenceNumber).compareTo(new BigInteger(str3)) <= 0;
                                if (this.logger.isTraceEnabled()) {
                                    this.logger.trace("The shard [" + shard + "] in stream [" + str + "] is closed CLOSED with endingSequenceNumber [" + endingSequenceNumber + "].\nThe last processed checkpoint is [" + str3 + "]." + (z ? "\nThe shard will be skipped." : ""));
                                }
                                if (z) {
                                }
                            }
                            arrayList.add(shard);
                        }
                    } catch (Exception e2) {
                        this.logger.info("Got an exception when processing shards in stream [" + str + "].\nRetrying...", e2);
                    }
                    if (!describeStreamResult.getStreamDescription().getHasMoreShards().booleanValue()) {
                        break;
                    }
                    str2 = ((Shard) shards.get(shards.size() - 1)).getShardId();
                    i = 0;
                } else {
                    int i2 = i;
                    i++;
                    if (i2 > this.describeStreamRetries) {
                        ResourceNotFoundException resourceNotFoundException = new ResourceNotFoundException("The stream [" + str + "] isn't ACTIVE or doesn't exist.");
                        resourceNotFoundException.setServiceName("Kinesis");
                        throw resourceNotFoundException;
                    }
                    try {
                        Thread.sleep(this.describeStreamBackoff);
                    } catch (InterruptedException e3) {
                        Thread.interrupted();
                        throw new IllegalStateException("The [describeStream] thread for the stream [" + str + "] has been interrupted.", e3);
                    }
                }
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
                this.inResharding.remove(str);
            }
        });
    }

    private void populateConsumers() {
        synchronized (this.shardOffsets) {
            Iterator<KinesisShardOffset> it = this.shardOffsets.iterator();
            while (it.hasNext()) {
                this.shardConsumerManager.addShardToConsume(it.next());
            }
        }
        this.resetCheckpoints = false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void populateConsumer(KinesisShardOffset kinesisShardOffset) {
        kinesisShardOffset.setReset(this.resetCheckpoints);
        ShardConsumer shardConsumer = new ShardConsumer(kinesisShardOffset);
        if (this.active) {
            synchronized (this.consumerInvokers) {
                if (this.consumerInvokers.size() < this.maxConcurrency) {
                    SchedulingAwareRunnable consumerInvoker = new ConsumerInvoker(Collections.singleton(shardConsumer));
                    this.consumerInvokers.add(consumerInvoker);
                    this.consumerExecutor.execute(consumerInvoker);
                } else {
                    for (ConsumerInvoker consumerInvoker2 : this.consumerInvokers) {
                        if (consumerInvoker2.consumers.size() < this.consumerInvokerMaxCapacity) {
                            consumerInvoker2.addConsumer(shardConsumer);
                            return;
                        }
                    }
                    if (this.concurrency != 0) {
                        ConsumerInvoker consumerInvoker3 = this.consumerInvokers.get(0);
                        consumerInvoker3.addConsumer(shardConsumer);
                        this.consumerInvokerMaxCapacity = consumerInvoker3.consumers.size();
                    }
                }
            }
        }
        this.shardConsumers.put(kinesisShardOffset, shardConsumer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String buildCheckpointKeyForShard(String str, String str2) {
        return this.consumerGroup + ":" + str + ":" + str2;
    }

    protected void doStop() {
        Iterator<ConsumerInvoker> it = this.consumerInvokers.iterator();
        while (it.hasNext()) {
            it.next().notifyBarrier();
        }
        super.doStop();
        stopConsumers();
        this.shardConsumerManagerFuture.cancel(true);
        this.active = false;
    }

    private void stopConsumers() {
        Iterator<ShardConsumer> it = this.shardConsumers.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.shardConsumers.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setAttributesIfNecessary(Object obj, Message<?> message) {
        if (getErrorChannel() != null) {
            AttributeAccessor attributeAccessor = ErrorMessageUtils.getAttributeAccessor(message, (Message) null);
            attributesHolder.set(attributeAccessor);
            attributeAccessor.setAttribute(AwsHeaders.RAW_RECORD, obj);
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributeAccessor = attributesHolder.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }

    public String toString() {
        return "KinesisMessageDrivenChannelAdapter{shardOffsets=" + this.shardOffsets + ", consumerGroup='" + this.consumerGroup + "'}";
    }
}
