package org.springframework.cloud.stream.binder.kinesis.provisioning;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ScalingType;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kinesis/provisioning/KinesisStreamProvisioner.class */
public class KinesisStreamProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KinesisConsumerProperties>, ExtendedProducerProperties<KinesisProducerProperties>> {
    private static final Log logger = LogFactory.getLog(KinesisStreamProvisioner.class);
    private final AmazonKinesis amazonKinesis;
    private final KinesisBinderConfigurationProperties configurationProperties;

    public KinesisStreamProvisioner(AmazonKinesis amazonKinesis, KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties) {
        Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null");
        Assert.notNull(kinesisBinderConfigurationProperties, "'kinesisBinderConfigurationProperties' must not be null");
        this.amazonKinesis = amazonKinesis;
        this.configurationProperties = kinesisBinderConfigurationProperties;
    }

    public ProducerDestination provisionProducerDestination(String str, ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties) throws ProvisioningException {
        if (logger.isInfoEnabled()) {
            logger.info("Using Kinesis stream for outbound: " + str);
        }
        if (extendedProducerProperties.getHeaderMode() == null) {
            extendedProducerProperties.setHeaderMode(HeaderMode.embeddedHeaders);
        }
        return new KinesisProducerDestination(str, createOrUpdate(str, extendedProducerProperties.getPartitionCount(), null));
    }

    public ConsumerDestination provisionConsumerDestination(String str, String str2, ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties) throws ProvisioningException {
        if (logger.isInfoEnabled()) {
            logger.info("Using Kinesis stream for inbound: " + str);
        }
        if (extendedConsumerProperties.getHeaderMode() == null) {
            extendedConsumerProperties.setHeaderMode(HeaderMode.embeddedHeaders);
        }
        return new KinesisConsumerDestination(str, createOrUpdate(str, extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency(), (KinesisConsumerProperties) extendedConsumerProperties.getExtension()));
    }

    private List<Shard> createOrUpdate(String str, int i, KinesisConsumerProperties kinesisConsumerProperties) {
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        String str2 = null;
        DescribeStreamRequest withStreamName = new DescribeStreamRequest().withStreamName(str);
        while (true) {
            DescribeStreamResult describeStreamResult = null;
            try {
                withStreamName.withExclusiveStartShardId(str2);
                describeStreamResult = this.amazonKinesis.describeStream(withStreamName);
                StreamDescription streamDescription = describeStreamResult.getStreamDescription();
                if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) {
                    arrayList.addAll(streamDescription.getShards());
                    if (!streamDescription.getHasMoreShards().booleanValue()) {
                        int max = Math.max(this.configurationProperties.getMinShardCount(), i);
                        return (arrayList.size() >= max || !this.configurationProperties.isAutoAddShards()) ? arrayList : updateShardCount(str, arrayList.size(), max);
                    }
                    str2 = ((Shard) arrayList.get(arrayList.size() - 1)).getShardId();
                }
            } catch (LimitExceededException e) {
                logger.info("Got LimitExceededException when describing stream [" + str + "]. Backing off for [" + this.configurationProperties.getDescribeStreamBackoff() + "] millis.");
            } catch (ResourceNotFoundException e2) {
                if (!this.configurationProperties.isAutoCreateStream()) {
                    throw new ProvisioningException("The stream [" + str + "] was not found and auto creation is disabled.", e2);
                }
                if (kinesisConsumerProperties != null && kinesisConsumerProperties.isDynamoDbStreams()) {
                    throw new ProvisioningException("The stream [" + str + "] was not found and DynamoDB Streams doesn't support stream creation.", e2);
                }
                if (logger.isInfoEnabled()) {
                    logger.info("Stream '" + str + "' not found. Create one...");
                }
                this.amazonKinesis.createStream(str, Integer.valueOf(Math.max(this.configurationProperties.getMinShardCount(), i)));
            }
            if (describeStreamResult == null || !StreamStatus.ACTIVE.toString().equals(describeStreamResult.getStreamDescription().getStreamStatus())) {
                int i3 = i2;
                i2++;
                if (i3 > this.configurationProperties.getDescribeStreamRetries()) {
                    ResourceNotFoundException resourceNotFoundException = new ResourceNotFoundException("The stream [" + str + "] isn't ACTIVE or doesn't exist.");
                    resourceNotFoundException.setServiceName("Kinesis");
                    throw new ProvisioningException("Kinesis org.springframework.cloud.stream.binder.kinesis.provisioning error", resourceNotFoundException);
                }
                try {
                    Thread.sleep(this.configurationProperties.getDescribeStreamBackoff());
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                    throw new ProvisioningException("The [describeStream] thread for the stream [" + str + "] has been interrupted.", e3);
                }
            }
        }
    }

    private List<Shard> updateShardCount(String str, int i, int i2) {
        if (logger.isInfoEnabled()) {
            logger.info("Stream [" + str + "] has [" + i + "] shards compared to a target configuration of [" + i2 + "], creating shards...");
        }
        this.amazonKinesis.updateShardCount(new UpdateShardCountRequest().withStreamName(str).withTargetShardCount(Integer.valueOf(i2)).withScalingType(ScalingType.UNIFORM_SCALING));
        ArrayList arrayList = new ArrayList();
        int i3 = 0;
        String str2 = null;
        DescribeStreamRequest withStreamName = new DescribeStreamRequest().withStreamName(str);
        while (true) {
            DescribeStreamResult describeStreamResult = null;
            try {
                withStreamName.withExclusiveStartShardId(str2);
                describeStreamResult = this.amazonKinesis.describeStream(withStreamName);
                StreamDescription streamDescription = describeStreamResult.getStreamDescription();
                if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) {
                    arrayList.addAll(streamDescription.getShards());
                    if (!streamDescription.getHasMoreShards().booleanValue()) {
                        return arrayList;
                    }
                    str2 = ((Shard) arrayList.get(arrayList.size() - 1)).getShardId();
                }
            } catch (LimitExceededException e) {
                logger.info("Got LimitExceededException when describing stream [" + str + "]. Backing off for [" + this.configurationProperties.getDescribeStreamBackoff() + "] millis.");
            }
            if (describeStreamResult == null || !StreamStatus.ACTIVE.toString().equals(describeStreamResult.getStreamDescription().getStreamStatus())) {
                int i4 = i3;
                i3++;
                if (i4 > this.configurationProperties.getDescribeStreamRetries()) {
                    ResourceNotFoundException resourceNotFoundException = new ResourceNotFoundException("The stream [" + str + "] isn't ACTIVE or doesn't exist.");
                    resourceNotFoundException.setServiceName("Kinesis");
                    throw new ProvisioningException("Kinesis org.springframework.cloud.stream.binder.kinesis.provisioning error", resourceNotFoundException);
                }
                try {
                    Thread.sleep(this.configurationProperties.getDescribeStreamBackoff());
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new ProvisioningException("The [describeStream] thread for the stream [" + str + "] has been interrupted.", e2);
                }
            }
        }
    }
}
