package com.microsoft.azure.spring.integration.core;

import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.core.api.SendOperation;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.AttributeAccessor;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFutureCallback;

/* loaded from: input_file:com/microsoft/azure/spring/integration/core/DefaultMessageHandler.class */
public class DefaultMessageHandler extends AbstractMessageProducingHandler {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultMessageHandler.class);
    private static final long DEFAULT_SEND_TIMEOUT = 10000;
    private final String destination;
    private final SendOperation sendOperation;
    private ListenableFutureCallback<Void> sendCallback;
    private EvaluationContext evaluationContext;
    private Expression partitionKeyExpression;
    private MessageChannel sendFailureChannel;
    private String sendFailureChannelName;
    private boolean sync = false;
    private Expression sendTimeoutExpression = new ValueExpression(Long.valueOf(DEFAULT_SEND_TIMEOUT));
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();

    public DefaultMessageHandler(String str, @NonNull SendOperation sendOperation) {
        Assert.hasText(str, "destination can't be null or empty");
        this.destination = str;
        this.sendOperation = sendOperation;
    }

    protected void onInit() {
        super.onInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
        LOG.info("Started DefaultMessageHandler with properties: {}", buildPropertiesMap());
    }

    protected void handleMessageInternal(Message<?> message) {
        PartitionSupplier partitionSupplier = toPartitionSupplier(message);
        CompletableFuture<Void> sendAsync = this.sendOperation.sendAsync(toDestination(message), message, partitionSupplier);
        if (this.sync) {
            waitingSendResponse(sendAsync, message);
        } else {
            handleSendResponseAsync(message, sendAsync);
        }
    }

    private void handleSendResponseAsync(Message<?> message, CompletableFuture<?> completableFuture) {
        completableFuture.handle((obj, th) -> {
            if (th == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} sent successfully in async mode", message);
                }
                if (this.sendCallback == null) {
                    return null;
                }
                this.sendCallback.onSuccess((Void) obj);
                return null;
            }
            if (LOG.isWarnEnabled()) {
                LOG.warn("{} sent failed in async mode due to {}", message, th.getMessage());
            }
            if (this.sendCallback != null) {
                this.sendCallback.onFailure(th);
            }
            if (getSendFailureChannel() == null) {
                return null;
            }
            this.messagingTemplate.send(getSendFailureChannel(), getErrorMessageStrategy().buildErrorMessage(new AzureSendFailureException(message, th), (AttributeAccessor) null));
            return null;
        });
    }

    private void waitingSendResponse(CompletableFuture<?> completableFuture, Message<?> message) {
        Long l = (Long) this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
        if (l == null || l.longValue() < 0) {
            try {
                completableFuture.get();
                return;
            } catch (Exception e) {
                throw new MessageDeliveryException(e.getMessage());
            }
        }
        try {
            completableFuture.get(l.longValue(), TimeUnit.MILLISECONDS);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} sent successfully in sync mode", message);
            }
        } catch (TimeoutException e2) {
            throw new MessageTimeoutException(message, "Timeout waiting for send event hub response", e2);
        } catch (Exception e3) {
            throw new MessageDeliveryException(e3.getMessage());
        }
    }

    public void setSync(boolean z) {
        this.sync = z;
        LOG.info("DefaultMessageHandler sync becomes: {}", Boolean.valueOf(z));
    }

    public void setSendTimeout(long j) {
        setSendTimeoutExpression(new ValueExpression(Long.valueOf(j)));
    }

    public void setPartitionKey(String str) {
        setPartitionKeyExpression(new LiteralExpression(str));
    }

    public void setPartitionKeyExpression(Expression expression) {
        this.partitionKeyExpression = expression;
    }

    public void setPartitionKeyExpressionString(String str) {
        setPartitionKeyExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    private String toDestination(Message<?> message) {
        return message.getHeaders().containsKey(AzureHeaders.NAME) ? (String) message.getHeaders().get(AzureHeaders.NAME, String.class) : this.destination;
    }

    private PartitionSupplier toPartitionSupplier(Message<?> message) {
        PartitionSupplier partitionSupplier = new PartitionSupplier();
        String str = (String) message.getHeaders().get(AzureHeaders.PARTITION_KEY, String.class);
        if (!StringUtils.hasText(str) && this.partitionKeyExpression != null) {
            str = (String) this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class);
        }
        if (StringUtils.hasText(str)) {
            partitionSupplier.setPartitionKey(str);
        }
        if (message.getHeaders().containsKey(AzureHeaders.PARTITION_ID)) {
            partitionSupplier.setPartitionId((String) message.getHeaders().get(AzureHeaders.PARTITION_ID, String.class));
        }
        return partitionSupplier;
    }

    private Map<String, Object> buildPropertiesMap() {
        HashMap hashMap = new HashMap();
        hashMap.put("sync", Boolean.valueOf(this.sync));
        hashMap.put("sendTimeout", this.sendTimeoutExpression);
        hashMap.put("destination", this.destination);
        return hashMap;
    }

    public void setSendCallback(ListenableFutureCallback<Void> listenableFutureCallback) {
        this.sendCallback = listenableFutureCallback;
    }

    public Expression getSendTimeoutExpression() {
        return this.sendTimeoutExpression;
    }

    public void setSendTimeoutExpression(Expression expression) {
        Assert.notNull(expression, "'sendTimeoutExpression' must not be null");
        this.sendTimeoutExpression = expression;
        LOG.info("DefaultMessageHandler syncTimeout becomes: {}", expression);
    }

    protected MessageChannel getSendFailureChannel() {
        if (this.sendFailureChannel != null) {
            return this.sendFailureChannel;
        }
        if (this.sendFailureChannelName == null) {
            return null;
        }
        this.sendFailureChannel = (MessageChannel) getChannelResolver().resolveDestination(this.sendFailureChannelName);
        return this.sendFailureChannel;
    }

    public void setSendFailureChannel(MessageChannel messageChannel) {
        this.sendFailureChannel = messageChannel;
    }

    public void setSendFailureChannelName(String str) {
        this.sendFailureChannelName = str;
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return this.errorMessageStrategy;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' must not be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }
}
