/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.implementation.handler.LinkHandler;
import com.azure.core.util.logging.ClientLogger;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class ReceiveLinkHandler
extends LinkHandler {
    private final String linkName;
    private final AtomicBoolean isRemoteActive = new AtomicBoolean();
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final Sinks.Many<Delivery> deliveries = Sinks.many().multicast().onBackpressureBuffer();
    private final Set<Delivery> queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap());
    private final String entityPath;

    public ReceiveLinkHandler(String connectionId, String hostname, String linkName, String entityPath) {
        super(connectionId, hostname, entityPath, new ClientLogger(ReceiveLinkHandler.class));
        this.linkName = Objects.requireNonNull(linkName, "'linkName' cannot be null.");
        this.entityPath = entityPath;
    }

    public String getLinkName() {
        return this.linkName;
    }

    public Flux<Delivery> getDeliveredMessages() {
        return this.deliveries.asFlux().doOnNext(this.queuedDeliveries::remove);
    }

    @Override
    public void close() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        this.clearAndCompleteDeliveries("Could not emit deliveries.close when closing handler.");
        this.onNext(EndpointState.CLOSED);
    }

    public void onLinkLocalOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Receiver) {
            this.logger.verbose("onLinkLocalOpen connectionId[{}], entityPath[{}], linkName[{}], localSource[{}]", new Object[]{this.getConnectionId(), this.entityPath, link.getName(), link.getSource()});
        }
    }

    public void onLinkRemoteOpen(Event event) {
        Link link = event.getLink();
        if (!(link instanceof Receiver)) {
            return;
        }
        if (link.getRemoteSource() != null) {
            this.logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteSource[{}]", new Object[]{this.getConnectionId(), this.entityPath, link.getName(), link.getRemoteSource()});
            if (!this.isRemoteActive.getAndSet(true)) {
                this.onNext(EndpointState.ACTIVE);
            }
        } else {
            this.logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], action[waitingForError]", new Object[]{this.getConnectionId(), this.entityPath, link.getName()});
        }
    }

    public void onDelivery(Event event) {
        if (!this.isRemoteActive.getAndSet(true)) {
            this.onNext(EndpointState.ACTIVE);
        }
        Delivery delivery = event.getDelivery();
        Receiver link = (Receiver)delivery.getLink();
        boolean wasSettled = delivery.isSettled();
        if (!delivery.isPartial()) {
            if (wasSettled) {
                if (link != null) {
                    this.logger.info("onDelivery connectionId[{}], entityPath[{}], linkName[{}], updatedLinkCredit[{}], remoteCredit[{}], remoteCondition[{}], delivery.isSettled[{}] Was already settled.", new Object[]{this.getConnectionId(), this.entityPath, link.getName(), link.getCredit(), link.getRemoteCredit(), link.getRemoteCondition(), delivery.isSettled()});
                } else {
                    this.logger.warning("connectionId[{}], entityPath[{}] delivery.isSettled[{}] Settled delivery with no  link.", new Object[]{this.getConnectionId(), this.entityPath, delivery.isSettled()});
                }
            } else if (link.getLocalState() == EndpointState.CLOSED) {
                delivery.disposition((DeliveryState)new Modified());
                delivery.settle();
            } else {
                this.queuedDeliveries.add(delivery);
                this.deliveries.emitNext((Object)delivery, (signalType, emitResult) -> {
                    this.logger.warning("connectionId[{}], entityPath[{}], linkName[{}], emitResult[{}] Could not emit delivery. {}", new Object[]{this.getConnectionId(), this.entityPath, this.linkName, emitResult, delivery});
                    if (emitResult == Sinks.EmitResult.FAIL_OVERFLOW && link.getLocalState() != EndpointState.CLOSED) {
                        link.setCondition(new ErrorCondition(Symbol.getSymbol((String)"delivery-buffer-overflow"), "Deliveries are not processed fast enough. Closing local link."));
                        link.close();
                        return true;
                    }
                    return false;
                });
            }
        }
        if (link != null) {
            ErrorCondition condition = link.getRemoteCondition();
            this.logger.verbose("onDelivery connectionId[{}], linkName[{}], updatedLinkCredit[{}],remoteCredit[{}], remoteCondition[{}], delivery.isPartial[{}], delivery.isSettled[{}]", new Object[]{this.getConnectionId(), link.getName(), link.getCredit(), link.getRemoteCredit(), condition != null && condition.getCondition() != null ? condition : "N/A", delivery.isPartial(), wasSettled});
        }
    }

    @Override
    public void onLinkLocalClose(Event event) {
        super.onLinkLocalClose(event);
        if (!this.isRemoteActive.get()) {
            this.logger.info("connectionId[{}] linkName[{}] entityPath[{}] Receiver link was never active. Closing endpoint states.", new Object[]{this.getConnectionId(), this.getLinkName(), this.entityPath});
            super.close();
        }
    }

    @Override
    public void onLinkRemoteClose(Event event) {
        this.clearAndCompleteDeliveries("Could not complete 'deliveries' when remotely closed.");
        super.onLinkRemoteClose(event);
    }

    @Override
    public void onLinkFinal(Event event) {
        this.close();
        super.onLinkFinal(event);
    }

    private void clearAndCompleteDeliveries(String errorMessage) {
        this.deliveries.emitComplete((signalType, emitResult) -> {
            this.logger.verbose("connectionId[{}], entityPath[{}], linkName[{}] {}", new Object[]{this.getConnectionId(), this.entityPath, this.linkName, errorMessage});
            return false;
        });
        this.queuedDeliveries.forEach(delivery -> {
            delivery.disposition((DeliveryState)new Modified());
            delivery.settle();
        });
        this.queuedDeliveries.clear();
    }
}

