package org.springframework.integration.aws.outbound;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.SerializingConverter;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/integration/aws/outbound/KinesisMessageHandler.class */
public class KinesisMessageHandler extends AbstractAwsMessageHandler {
    private final AmazonKinesisAsync amazonKinesis;
    private Converter<Object, byte[]> converter = new SerializingConverter();
    private volatile Expression streamExpression;
    private volatile Expression partitionKeyExpression;
    private volatile Expression explicitHashKeyExpression;
    private volatile Expression sequenceNumberExpression;

    public KinesisMessageHandler(AmazonKinesisAsync amazonKinesisAsync) {
        Assert.notNull(amazonKinesisAsync, "'amazonKinesis' must not be null.");
        this.amazonKinesis = amazonKinesisAsync;
    }

    public void setConverter(Converter<Object, byte[]> converter) {
        Assert.notNull(converter, "'converter' must not be null.");
        this.converter = converter;
    }

    public void setStream(String str) {
        setStreamExpression(new LiteralExpression(str));
    }

    public void setStreamExpressionString(String str) {
        setStreamExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setStreamExpression(Expression expression) {
        this.streamExpression = expression;
    }

    public void setPartitionKey(String str) {
        setPartitionKeyExpression(new LiteralExpression(str));
    }

    public void setPartitionKeyExpressionString(String str) {
        setPartitionKeyExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setPartitionKeyExpression(Expression expression) {
        this.partitionKeyExpression = expression;
    }

    public void setExplicitHashKey(String str) {
        setExplicitHashKeyExpression(new LiteralExpression(str));
    }

    public void setExplicitHashKeyExpressionString(String str) {
        setExplicitHashKeyExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setExplicitHashKeyExpression(Expression expression) {
        this.explicitHashKeyExpression = expression;
    }

    public void setSequenceNumberExpressionString(String str) {
        setSequenceNumberExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setSequenceNumberExpression(Expression expression) {
        this.sequenceNumberExpression = expression;
    }

    @Override // org.springframework.integration.aws.outbound.AbstractAwsMessageHandler
    protected Future<?> handleMessageToAws(Message<?> message) {
        if (message.getPayload() instanceof PutRecordsRequest) {
            return this.amazonKinesis.putRecordsAsync((PutRecordsRequest) message.getPayload(), obtainAsyncHandler(message, (PutRecordsRequest) message.getPayload()));
        }
        PutRecordRequest buildPutRecordRequest = message.getPayload() instanceof PutRecordRequest ? (PutRecordRequest) message.getPayload() : buildPutRecordRequest(message);
        return this.amazonKinesis.putRecordAsync(buildPutRecordRequest, obtainAsyncHandler(message, buildPutRecordRequest));
    }

    private PutRecordRequest buildPutRecordRequest(Message<?> message) {
        ByteBuffer wrap;
        String str = (String) message.getHeaders().get(AwsHeaders.STREAM, String.class);
        if (!StringUtils.hasText(str) && this.streamExpression != null) {
            str = (String) this.streamExpression.getValue(getEvaluationContext(), message, String.class);
        }
        Assert.state(str != null, "'stream' must not be null for sending a Kinesis record. Consider configuring this handler with a 'stream'( or 'streamExpression') or supply an 'aws_stream' message header.");
        String str2 = (String) message.getHeaders().get(AwsHeaders.PARTITION_KEY, String.class);
        if (!StringUtils.hasText(str2) && this.partitionKeyExpression != null) {
            str2 = (String) this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class);
        }
        Assert.state(str2 != null, "'partitionKey' must not be null for sending a Kinesis record. Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') or supply an 'aws_partitionKey' message header.");
        String str3 = this.explicitHashKeyExpression != null ? (String) this.explicitHashKeyExpression.getValue(getEvaluationContext(), message, String.class) : null;
        String str4 = (String) message.getHeaders().get(AwsHeaders.SEQUENCE_NUMBER, String.class);
        if (!StringUtils.hasText(str4) && this.sequenceNumberExpression != null) {
            str4 = (String) this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
        }
        Object payload = message.getPayload();
        if (payload instanceof ByteBuffer) {
            wrap = (ByteBuffer) payload;
        } else {
            wrap = ByteBuffer.wrap(payload instanceof byte[] ? (byte[]) payload : (byte[]) this.converter.convert(payload));
        }
        return new PutRecordRequest().withStreamName(str).withPartitionKey(str2).withExplicitHashKey(str3).withSequenceNumberForOrdering(str4).withData(wrap);
    }

    @Override // org.springframework.integration.aws.outbound.AbstractAwsMessageHandler
    protected void additionalOnSuccessHeaders(AbstractIntegrationMessageBuilder<?> abstractIntegrationMessageBuilder, AmazonWebServiceRequest amazonWebServiceRequest, Object obj) {
        if (obj instanceof PutRecordResult) {
            abstractIntegrationMessageBuilder.setHeader(AwsHeaders.SHARD, ((PutRecordResult) obj).getShardId()).setHeader(AwsHeaders.SEQUENCE_NUMBER, ((PutRecordResult) obj).getSequenceNumber());
        }
    }
}
