/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.spring.integration.servicebus;

import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.spring.integration.core.AzureCheckpointer;
import com.microsoft.azure.spring.integration.core.api.CheckpointConfig;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.servicebus.converter.ServiceBusMessageConverter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

public abstract class ServiceBusMessageHandler<U>
implements IMessageHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceBusMessageHandler.class);
    protected final Consumer<Message<U>> consumer;
    protected final Class<U> payloadType;
    protected final CheckpointConfig checkpointConfig;
    protected final ServiceBusMessageConverter messageConverter;

    public ServiceBusMessageHandler(Consumer<Message<U>> consumer, Class<U> payloadType, CheckpointConfig checkpointConfig, ServiceBusMessageConverter messageConverter) {
        this.consumer = consumer;
        this.payloadType = payloadType;
        this.checkpointConfig = checkpointConfig;
        this.messageConverter = messageConverter;
    }

    public CompletableFuture<Void> onMessageAsync(IMessage serviceBusMessage) {
        HashMap<String, AzureCheckpointer> headers = new HashMap<String, AzureCheckpointer>();
        AzureCheckpointer checkpointer = new AzureCheckpointer(() -> this.success(serviceBusMessage.getLockToken()), () -> this.failure(serviceBusMessage.getLockToken()));
        if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.MANUAL) {
            headers.put("azure_checkpointer", checkpointer);
        }
        Message message = this.messageConverter.toMessage(serviceBusMessage, (Map)new MessageHeaders(headers), this.payloadType);
        this.consumer.accept(message);
        if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.RECORD) {
            return checkpointer.success().whenComplete((v, t) -> this.checkpointHandler((Message<?>)message, (Throwable)t));
        }
        return CompletableFuture.completedFuture(null);
    }

    public void notifyException(Throwable exception, ExceptionPhase phase) {
        LOG.error(String.format("Exception encountered in phase %s", phase), exception);
    }

    protected abstract CompletableFuture<Void> success(UUID var1);

    protected abstract CompletableFuture<Void> failure(UUID var1);

    protected abstract String buildCheckpointFailMessage(Message<?> var1);

    protected abstract String buildCheckpointSuccessMessage(Message<?> var1);

    protected void checkpointHandler(Message<?> message, Throwable t) {
        if (t != null) {
            if (LOG.isWarnEnabled()) {
                LOG.warn(this.buildCheckpointFailMessage(message), t);
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug(this.buildCheckpointSuccessMessage(message));
        }
    }
}

