/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.proton.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.impl.ProtonDeliveryImpl;
import io.vertx.proton.impl.ProtonLinkImpl;
import java.io.ByteArrayOutputStream;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;

public class ProtonReceiverImpl
extends ProtonLinkImpl<ProtonReceiver>
implements ProtonReceiver {
    private ProtonMessageHandler handler;
    private int prefetch = 1000;
    private Handler<AsyncResult<Void>> drainCompleteHandler;
    private Long drainTimeoutTaskId = null;
    protected ByteArrayOutputStream current = new ByteArrayOutputStream();
    byte[] buffer = new byte[1024];
    private boolean autoAccept = true;

    ProtonReceiverImpl(Receiver receiver) {
        super((Link)receiver);
    }

    @Override
    protected ProtonReceiverImpl self() {
        return this;
    }

    private Receiver getReceiver() {
        return (Receiver)this.link;
    }

    public int recv(byte[] bytes, int offset, int size) {
        return this.getReceiver().recv(bytes, offset, size);
    }

    @Override
    public ProtonReceiver drain(long timeout, Handler<AsyncResult<Void>> completionHandler) {
        if (this.prefetch > 0) {
            throw new IllegalStateException("Manual credit management not available while prefetch is non-zero");
        }
        if (completionHandler == null) {
            throw new IllegalArgumentException("A completion handler must be provided");
        }
        if (this.drainCompleteHandler != null) {
            throw new IllegalStateException("A previous drain operation has not yet completed");
        }
        if (this.getCredit() - this.getQueued() <= 0) {
            if (this.getQueued() == 0) {
                completionHandler.handle((Object)Future.succeededFuture());
            } else {
                this.setDrainHandlerAndTimeoutTask(timeout, completionHandler);
            }
        } else {
            this.setDrainHandlerAndTimeoutTask(timeout, completionHandler);
            this.getReceiver().drain(0);
            this.flushConnection();
        }
        return this;
    }

    private void setDrainHandlerAndTimeoutTask(long delay, Handler<AsyncResult<Void>> completionHandler) {
        this.drainCompleteHandler = completionHandler;
        if (delay > 0L) {
            Vertx vertx = Vertx.currentContext().owner();
            this.drainTimeoutTaskId = vertx.setTimer(delay, x -> {
                this.drainTimeoutTaskId = null;
                this.drainCompleteHandler = null;
                completionHandler.handle((Object)Future.failedFuture((String)"Drain attempt timed out"));
            });
        }
    }

    @Override
    public ProtonReceiver flow(int credits) throws IllegalStateException {
        this.flow(credits, true);
        return this;
    }

    private void flow(int credits, boolean checkPrefetch) throws IllegalStateException {
        if (checkPrefetch && this.prefetch > 0) {
            throw new IllegalStateException("Manual credit management not available while prefetch is non-zero");
        }
        if (this.drainCompleteHandler != null) {
            throw new IllegalStateException("A previous drain operation has not yet completed");
        }
        this.getReceiver().flow(credits);
        this.flushConnection();
    }

    public boolean draining() {
        return this.getReceiver().draining();
    }

    public ProtonReceiver setDrain(boolean drain) {
        this.getReceiver().setDrain(drain);
        return this;
    }

    @Override
    public ProtonReceiver handler(ProtonMessageHandler handler) {
        this.handler = handler;
        this.onDelivery();
        return this;
    }

    private void flushConnection() {
        this.getSession().getConnectionImpl().flush();
    }

    void onDelivery() {
        if (this.handler == null) {
            return;
        }
        Receiver receiver = this.getReceiver();
        Delivery delivery = receiver.current();
        if (delivery != null) {
            int count;
            while ((count = receiver.recv(this.buffer, 0, this.buffer.length)) > 0) {
                this.current.write(this.buffer, 0, count);
            }
            if (delivery.isPartial()) {
                return;
            }
            byte[] data = this.current.toByteArray();
            this.current.reset();
            Message msg = Proton.message();
            msg.decode(data, 0, data.length);
            receiver.advance();
            ProtonDeliveryImpl delImpl = new ProtonDeliveryImpl(delivery);
            this.handler.handle(delImpl, msg);
            if (this.autoAccept && delivery.getLocalState() == null) {
                ProtonHelper.accepted(delImpl, true);
            }
            if (this.prefetch > 0) {
                this.flow(1, false);
            } else {
                this.processForDrainCompletion();
            }
        }
    }

    @Override
    public boolean isAutoAccept() {
        return this.autoAccept;
    }

    @Override
    public ProtonReceiver setAutoAccept(boolean autoAccept) {
        this.autoAccept = autoAccept;
        return this;
    }

    @Override
    public ProtonReceiver setPrefetch(int messages) {
        if (messages < 0) {
            throw new IllegalArgumentException("Value must not be negative");
        }
        this.prefetch = messages;
        return this;
    }

    @Override
    public int getPrefetch() {
        return this.prefetch;
    }

    @Override
    public ProtonReceiver open() {
        super.open();
        if (this.prefetch > 0) {
            this.flow(this.prefetch, false);
        }
        return this;
    }

    @Override
    void handleLinkFlow() {
        this.processForDrainCompletion();
    }

    private void processForDrainCompletion() {
        Handler<AsyncResult<Void>> h = this.drainCompleteHandler;
        if (h != null && this.getCredit() <= 0 && this.getQueued() <= 0) {
            boolean timeoutTaskCleared = false;
            Long timerId = this.drainTimeoutTaskId;
            if (timerId != null) {
                Vertx vertx = Vertx.currentContext().owner();
                timeoutTaskCleared = vertx.cancelTimer(timerId.longValue());
            } else {
                timeoutTaskCleared = true;
            }
            this.drainTimeoutTaskId = null;
            this.drainCompleteHandler = null;
            if (timeoutTaskCleared) {
                h.handle((Object)Future.succeededFuture());
            }
        }
    }
}

