/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.impl.BodyReadStream;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Queue;

public class MessageConsumerImpl<T>
extends HandlerRegistration<T>
implements MessageConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class);
    private static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
    private final Vertx vertx;
    private final ContextInternal context;
    private final EventBusImpl eventBus;
    private final String address;
    private final boolean localOnly;
    private Handler<Message<T>> handler;
    private Handler<AsyncResult<Void>> completionHandler;
    private Handler<Void> endHandler;
    private Handler<Message<T>> discardHandler;
    private int maxBufferedMessages = 1000;
    private Queue<Message<T>> pending = new ArrayDeque<Message<T>>(8);
    private long demand = Long.MAX_VALUE;
    private Promise<Void> result;

    MessageConsumerImpl(Vertx vertx, ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly) {
        super(context, eventBus, address, false);
        this.vertx = vertx;
        this.context = context;
        this.eventBus = eventBus;
        this.address = address;
        this.localOnly = localOnly;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
        ArrayList<Message<T>> discarded;
        Handler<Message<T>> discardHandler;
        Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
        MessageConsumerImpl messageConsumerImpl = this;
        synchronized (messageConsumerImpl) {
            this.maxBufferedMessages = maxBufferedMessages;
            int n = this.pending.size() - maxBufferedMessages;
            if (n <= 0) {
                return this;
            }
            discardHandler = this.discardHandler;
            if (discardHandler == null) {
                while (this.pending.size() > maxBufferedMessages) {
                    this.pending.poll();
                }
                return this;
            }
            discarded = new ArrayList<Message<T>>(n);
            while (this.pending.size() > maxBufferedMessages) {
                discarded.add(this.pending.poll());
            }
        }
        for (Message message : discarded) {
            discardHandler.handle(message);
        }
        return this;
    }

    @Override
    public synchronized int getMaxBufferedMessages() {
        return this.maxBufferedMessages;
    }

    @Override
    public String address() {
        return this.address;
    }

    @Override
    public synchronized void completionHandler(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        if (this.result != null) {
            this.result.future().setHandler(handler);
        } else {
            this.completionHandler = handler;
        }
    }

    @Override
    protected synchronized void doUnregister() {
        this.handler = null;
        if (this.endHandler != null) {
            this.endHandler.handle(null);
        }
        if (this.pending.size() > 0 && this.discardHandler != null) {
            Queue discarded = this.pending;
            Handler handler = this.discardHandler;
            this.pending = new ArrayDeque<Message<T>>();
            this.context.runOnContext(v -> {
                Message msg;
                while ((msg = (Message)discarded.poll()) != null) {
                    handler.handle(msg);
                }
            });
        }
        this.discardHandler = null;
        if (this.result != null) {
            this.result.tryFail("blah");
            this.result = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doReceive(Message<T> message) {
        Handler<Message<T>> theHandler;
        MessageConsumerImpl messageConsumerImpl = this;
        synchronized (messageConsumerImpl) {
            if (!this.isRegistered()) {
                return;
            }
            if (this.demand == 0L) {
                if (this.pending.size() < this.maxBufferedMessages) {
                    this.pending.add(message);
                } else if (this.discardHandler != null) {
                    this.discardHandler.handle(message);
                } else {
                    log.warn("Discarding message as more than " + this.maxBufferedMessages + " buffered in paused consumer. address: " + this.address);
                }
                return;
            }
            if (this.pending.size() > 0) {
                this.pending.add(message);
                message = this.pending.poll();
            }
            if (this.demand != Long.MAX_VALUE) {
                --this.demand;
            }
            theHandler = this.handler;
        }
        this.deliver(theHandler, message);
    }

    private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
        String creditsAddress = message.headers().get("__vertx.credit");
        if (creditsAddress != null) {
            this.eventBus.send(creditsAddress, 1);
        }
        this.dispatch(theHandler, message, this.context.duplicate());
        this.checkNextTick();
    }

    private synchronized void checkNextTick() {
        if (!this.pending.isEmpty() && this.demand > 0L) {
            this.context.runOnContext(v -> {
                Handler<Message<T>> theHandler;
                Message<T> message;
                MessageConsumerImpl messageConsumerImpl = this;
                synchronized (messageConsumerImpl) {
                    if (this.demand == 0L || (message = this.pending.poll()) == null) {
                        return;
                    }
                    if (this.demand != Long.MAX_VALUE) {
                        --this.demand;
                    }
                    theHandler = this.handler;
                }
                this.deliver(theHandler, message);
            });
        }
    }

    public synchronized void discardHandler(Handler<Message<T>> handler) {
        this.discardHandler = handler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
        if (h != null) {
            MessageConsumerImpl messageConsumerImpl = this;
            synchronized (messageConsumerImpl) {
                this.handler = h;
                if (this.result == null) {
                    PromiseInternal<Void> p = this.context.promise();
                    if (this.completionHandler != null) {
                        p.future().setHandler(this.completionHandler);
                    }
                    this.result = p;
                    this.register(null, this.localOnly, ar -> {
                        if (ar.succeeded()) {
                            p.tryComplete();
                        } else {
                            p.tryFail(ar.cause());
                        }
                    });
                }
            }
        } else {
            this.unregister();
        }
        return this;
    }

    @Override
    public ReadStream<T> bodyStream() {
        return new BodyReadStream(this);
    }

    @Override
    public synchronized MessageConsumer<T> pause() {
        this.demand = 0L;
        return this;
    }

    @Override
    public MessageConsumer<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public synchronized MessageConsumer<T> fetch(long amount) {
        if (amount < 0L) {
            throw new IllegalArgumentException();
        }
        this.demand += amount;
        if (this.demand < 0L) {
            this.demand = Long.MAX_VALUE;
        }
        if (this.demand > 0L) {
            this.checkNextTick();
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
        if (endHandler != null) {
            Context endCtx = this.vertx.getOrCreateContext();
            this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
        } else {
            this.endHandler = null;
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public synchronized Handler<Message<T>> getHandler() {
        return this.handler;
    }
}

