/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.spring.integration.core.api.reactor;

import com.microsoft.azure.spring.integration.core.AzureSendFailureException;
import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.core.api.reactor.SendOperation;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFutureCallback;
import reactor.core.publisher.Mono;

public class DefaultMessageHandler
extends AbstractMessageProducingHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageHandler.class);
    private static final long DEFAULT_SEND_TIMEOUT = 10000L;
    private final String destination;
    private final SendOperation sendOperation;
    private boolean sync = false;
    private ListenableFutureCallback<Void> sendCallback;
    private EvaluationContext evaluationContext;
    private Expression sendTimeoutExpression = new ValueExpression((Object)10000L);
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
    private Expression partitionKeyExpression;
    private MessageChannel sendFailureChannel;
    private String sendFailureChannelName;

    public DefaultMessageHandler(String destination, @NonNull SendOperation sendOperation) {
        Assert.hasText((String)destination, (String)"destination can't be null or empty");
        this.destination = destination;
        this.sendOperation = sendOperation;
    }

    protected void onInit() {
        super.onInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.getBeanFactory());
        LOGGER.info("Started DefaultMessageHandler with properties: {}", this.buildPropertiesMap());
    }

    protected void handleMessageInternal(Message<?> message) {
        PartitionSupplier partitionSupplier = this.toPartitionSupplier(message);
        String destination = this.toDestination(message);
        Mono<Void> mono = this.sendOperation.sendAsync(destination, message, partitionSupplier);
        if (this.sync) {
            this.waitingSendResponse(mono, message);
        } else {
            this.handleSendResponseAsync(mono, message);
        }
    }

    private <T> void handleSendResponseAsync(Mono<T> mono, Message<?> message) {
        mono.doOnError(ex -> {
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn("{} sent failed in async mode due to {}", (Object)message, (Object)ex.getMessage());
            }
            if (this.sendCallback != null) {
                this.sendCallback.onFailure(ex);
            }
            if (this.getSendFailureChannel() != null) {
                this.messagingTemplate.send((Object)this.getSendFailureChannel(), (Message)this.getErrorMessageStrategy().buildErrorMessage((Throwable)((Object)new AzureSendFailureException(message, (Throwable)ex)), null));
            }
        }).doOnSuccess(t -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("{} sent successfully in async mode", (Object)message);
            }
            if (this.sendCallback != null) {
                this.sendCallback.onSuccess((Object)((Void)t));
            }
        }).subscribe();
    }

    private <T> void waitingSendResponse(Mono<T> mono, Message<?> message) {
        Long sendTimeout = (Long)this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
        if (sendTimeout == null || sendTimeout < 0L) {
            try {
                mono.block();
            }
            catch (Exception e) {
                throw new MessageDeliveryException(e.getMessage());
            }
        }
        try {
            mono.block(Duration.of(sendTimeout, ChronoUnit.MILLIS));
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("{} sent successfully in sync mode", message);
            }
        }
        catch (Exception e) {
            if (e.getCause() instanceof TimeoutException) {
                throw new MessageTimeoutException(message, "Timeout waiting for send event hub response");
            }
            throw new MessageDeliveryException(e.getMessage());
        }
    }

    public void setSync(boolean sync) {
        this.sync = sync;
        LOGGER.info("DefaultMessageHandler sync becomes: {}", (Object)sync);
    }

    public void setSendTimeout(long sendTimeout) {
        this.setSendTimeoutExpression((Expression)new ValueExpression((Object)sendTimeout));
    }

    public void setPartitionKey(String partitionKey) {
        this.setPartitionKeyExpression((Expression)new LiteralExpression(partitionKey));
    }

    public void setPartitionKeyExpression(Expression partitionKeyExpression) {
        this.partitionKeyExpression = partitionKeyExpression;
    }

    public void setPartitionKeyExpressionString(String partitionKeyExpression) {
        this.setPartitionKeyExpression(EXPRESSION_PARSER.parseExpression(partitionKeyExpression));
    }

    private String toDestination(Message<?> message) {
        if (message.getHeaders().containsKey((Object)"azure_name")) {
            return (String)message.getHeaders().get((Object)"azure_name", String.class);
        }
        return this.destination;
    }

    private PartitionSupplier toPartitionSupplier(Message<?> message) {
        PartitionSupplier partitionSupplier = new PartitionSupplier();
        String partitionKey = (String)message.getHeaders().get((Object)"azure_partition_key", String.class);
        if (!StringUtils.hasText((String)partitionKey) && this.partitionKeyExpression != null) {
            partitionKey = (String)this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class);
        }
        if (StringUtils.hasText((String)partitionKey)) {
            partitionSupplier.setPartitionKey(partitionKey);
        }
        if (message.getHeaders().containsKey((Object)"azure_partition_id")) {
            partitionSupplier.setPartitionId((String)message.getHeaders().get((Object)"azure_partition_id", String.class));
        }
        return partitionSupplier;
    }

    private Map<String, Object> buildPropertiesMap() {
        HashMap<String, Object> properties = new HashMap<String, Object>();
        properties.put("sync", this.sync);
        properties.put("sendTimeout", this.sendTimeoutExpression);
        properties.put("destination", this.destination);
        return properties;
    }

    public void setSendCallback(ListenableFutureCallback<Void> callback) {
        this.sendCallback = callback;
    }

    public Expression getSendTimeoutExpression() {
        return this.sendTimeoutExpression;
    }

    public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
        Assert.notNull((Object)sendTimeoutExpression, (String)"'sendTimeoutExpression' must not be null");
        this.sendTimeoutExpression = sendTimeoutExpression;
        LOGGER.info("DefaultMessageHandler syncTimeout becomes: {}", (Object)sendTimeoutExpression);
    }

    protected MessageChannel getSendFailureChannel() {
        if (this.sendFailureChannel != null) {
            return this.sendFailureChannel;
        }
        if (this.sendFailureChannelName != null) {
            this.sendFailureChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.sendFailureChannelName);
            return this.sendFailureChannel;
        }
        return null;
    }

    public void setSendFailureChannel(MessageChannel sendFailureChannel) {
        this.sendFailureChannel = sendFailureChannel;
    }

    public void setSendFailureChannelName(String sendFailureChannelName) {
        this.sendFailureChannelName = sendFailureChannelName;
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return this.errorMessageStrategy;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull((Object)errorMessageStrategy, (String)"'errorMessageStrategy' must not be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }
}

