/*
 * Decompiled with CFR 0.152.
 */
package ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.eventbus.impl;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Queue;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.AsyncResult;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.Context;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.Future;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.Handler;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.Vertx;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.eventbus.Message;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.eventbus.MessageConsumer;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.eventbus.ReplyException;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.eventbus.ReplyFailure;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.eventbus.impl.BodyReadStream;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.eventbus.impl.EventBusImpl;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.eventbus.impl.HandlerHolder;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.eventbus.impl.MessageImpl;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.impl.Arguments;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.impl.ContextInternal;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.logging.Logger;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.logging.LoggerFactory;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.spi.metrics.EventBusMetrics;
import ru.yandex.clickhouse.jdbcbridge.internal.vertx.core.streams.ReadStream;

public class HandlerRegistration<T>
implements MessageConsumer<T>,
Handler<Message<T>> {
    private static final Logger log = LoggerFactory.getLogger(HandlerRegistration.class);
    public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
    private final Vertx vertx;
    private final EventBusMetrics metrics;
    private final EventBusImpl eventBus;
    private final String address;
    private final String repliedAddress;
    private final boolean localOnly;
    private final Handler<AsyncResult<Message<T>>> asyncResultHandler;
    private long timeoutID = -1L;
    private HandlerHolder<T> registered;
    private Handler<Message<T>> handler;
    private ContextInternal handlerContext;
    private AsyncResult<Void> result;
    private Handler<AsyncResult<Void>> completionHandler;
    private Handler<Void> endHandler;
    private Handler<Message<T>> discardHandler;
    private int maxBufferedMessages = 1000;
    private final Queue<Message<T>> pending = new ArrayDeque<Message<T>>(8);
    private long demand = Long.MAX_VALUE;
    private Object metric;

    public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address, String repliedAddress, boolean localOnly, Handler<AsyncResult<Message<T>>> asyncResultHandler, long timeout) {
        this.vertx = vertx;
        this.metrics = metrics;
        this.eventBus = eventBus;
        this.address = address;
        this.repliedAddress = repliedAddress;
        this.localOnly = localOnly;
        this.asyncResultHandler = asyncResultHandler;
        if (timeout != -1L) {
            this.timeoutID = vertx.setTimer(timeout, tid -> {
                if (metrics != null) {
                    metrics.replyFailure(address, ReplyFailure.TIMEOUT);
                }
                this.sendAsyncResultFailure(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress));
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
        ArrayList<Message<T>> discarded;
        Handler<Message<T>> discardHandler;
        Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
        HandlerRegistration handlerRegistration = this;
        synchronized (handlerRegistration) {
            this.maxBufferedMessages = maxBufferedMessages;
            int n = this.pending.size() - maxBufferedMessages;
            if (n <= 0) {
                return this;
            }
            discardHandler = this.discardHandler;
            discarded = new ArrayList<Message<T>>(n);
            while (this.pending.size() > maxBufferedMessages) {
                discarded.add(this.pending.poll());
            }
        }
        for (Message message : discarded) {
            if (this.metrics != null) {
                this.metrics.discardMessage(this.metric, ((MessageImpl)message).isLocal(), message);
            }
            if (discardHandler == null) continue;
            discardHandler.handle(message);
        }
        return this;
    }

    @Override
    public synchronized int getMaxBufferedMessages() {
        return this.maxBufferedMessages;
    }

    @Override
    public String address() {
        return this.address;
    }

    @Override
    public synchronized void completionHandler(Handler<AsyncResult<Void>> completionHandler) {
        Objects.requireNonNull(completionHandler);
        if (this.result != null) {
            AsyncResult<Void> value = this.result;
            this.vertx.runOnContext(v -> completionHandler.handle(value));
        } else {
            this.completionHandler = completionHandler;
        }
    }

    @Override
    public void unregister() {
        this.doUnregister(null);
    }

    @Override
    public void unregister(Handler<AsyncResult<Void>> completionHandler) {
        this.doUnregister(completionHandler);
    }

    public void sendAsyncResultFailure(ReplyException failure) {
        this.unregister();
        this.asyncResultHandler.handle(Future.failedFuture(failure));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doUnregister(Handler<AsyncResult<Void>> doneHandler) {
        HandlerRegistration handlerRegistration = this;
        synchronized (handlerRegistration) {
            Handler<Object> handler;
            if (this.handler == null) {
                this.callHandlerAsync(Future.succeededFuture(), doneHandler);
                return;
            }
            this.handler = null;
            if (this.timeoutID != -1L) {
                this.vertx.cancelTimer(this.timeoutID);
            }
            if (this.endHandler != null) {
                Handler<Void> theEndHandler = this.endHandler;
                handler = doneHandler;
                doneHandler = ar -> {
                    theEndHandler.handle(null);
                    if (handler != null) {
                        handler.handle(ar);
                    }
                };
            }
            if (this.pending.size() > 0 && this.discardHandler != null) {
                ArrayDeque discarded = new ArrayDeque(this.pending);
                handler = this.discardHandler;
                this.handlerContext.runOnContext(v -> {
                    Message msg;
                    while ((msg = (Message)discarded.poll()) != null) {
                        handler.handle(msg);
                    }
                });
            }
            if (this.metrics != null) {
                Message<T> msg;
                while ((msg = this.pending.poll()) != null) {
                    this.metrics.discardMessage(this.metric, ((MessageImpl)msg).isLocal(), msg);
                }
            } else {
                this.pending.clear();
            }
            this.pending.clear();
            this.discardHandler = null;
            this.eventBus.removeRegistration(this.registered, doneHandler);
            this.registered = null;
            if (this.result == null) {
                this.result = Future.failedFuture("Consumer unregistered before registration completed");
                this.callHandlerAsync(this.result, this.completionHandler);
            } else {
                EventBusMetrics metrics = this.eventBus.metrics;
                if (metrics != null) {
                    metrics.handlerUnregistered(this.metric);
                }
            }
        }
    }

    private void callHandlerAsync(AsyncResult<Void> result, Handler<AsyncResult<Void>> completionHandler) {
        if (completionHandler != null) {
            this.vertx.runOnContext(v -> completionHandler.handle(result));
        }
    }

    synchronized void setHandlerContext(Context context) {
        this.handlerContext = (ContextInternal)context;
    }

    public synchronized void setResult(AsyncResult<Void> result) {
        if (this.result != null) {
            return;
        }
        this.result = result;
        if (result.failed()) {
            log.error("Failed to propagate registration for handler " + this.handler + " and address " + this.address);
        } else {
            if (this.metrics != null) {
                this.metric = this.metrics.handlerRegistered(this.address, this.repliedAddress);
            }
            this.callHandlerAsync(result, this.completionHandler);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handle(Message<T> message) {
        ContextInternal ctx;
        Handler<Message<T>> theHandler;
        boolean local = ((MessageImpl)message).isLocal();
        HandlerRegistration handlerRegistration = this;
        synchronized (handlerRegistration) {
            if (this.registered == null) {
                if (this.metrics != null) {
                    this.metrics.discardMessage(this.metric, local, message);
                }
                return;
            }
            if (this.demand == 0L) {
                if (this.pending.size() < this.maxBufferedMessages) {
                    this.pending.add(message);
                } else {
                    if (this.metrics != null) {
                        this.metrics.discardMessage(this.metric, local, message);
                    }
                    if (this.discardHandler != null) {
                        this.discardHandler.handle(message);
                    } else {
                        log.warn("Discarding message as more than " + this.maxBufferedMessages + " buffered in paused consumer. address: " + this.address);
                    }
                }
                return;
            }
            if (this.pending.size() > 0) {
                this.pending.add(message);
                message = this.pending.poll();
            }
            if (this.demand != Long.MAX_VALUE) {
                --this.demand;
            }
            theHandler = this.handler;
            ctx = this.handlerContext;
        }
        this.deliver(theHandler, message, ctx);
    }

    private void deliver(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
        String creditsAddress = message.headers().get("__vertx.credit");
        if (creditsAddress != null) {
            this.eventBus.send(creditsAddress, 1);
        }
        try {
            if (this.metrics != null) {
                this.metrics.beginHandleMessage(this.metric, ((MessageImpl)message).isLocal());
            }
            theHandler.handle(message);
            if (this.metrics != null) {
                this.metrics.endHandleMessage(this.metric, null);
            }
        }
        catch (Exception e) {
            log.error((Object)("Failed to handleMessage. address: " + message.address()), e);
            if (this.metrics != null) {
                this.metrics.endHandleMessage(this.metric, e);
            }
            context.reportException(e);
        }
        this.checkNextTick();
    }

    private synchronized void checkNextTick() {
        if (!this.pending.isEmpty() && this.demand > 0L) {
            this.handlerContext.runOnContext(v -> {
                ContextInternal ctx;
                Handler<Message<T>> theHandler;
                Message<T> message;
                HandlerRegistration handlerRegistration = this;
                synchronized (handlerRegistration) {
                    if (this.demand == 0L || (message = this.pending.poll()) == null) {
                        return;
                    }
                    if (this.demand != Long.MAX_VALUE) {
                        --this.demand;
                    }
                    theHandler = this.handler;
                    ctx = this.handlerContext;
                }
                this.deliver(theHandler, message, ctx);
            });
        }
    }

    public synchronized void discardHandler(Handler<Message<T>> handler) {
        this.discardHandler = handler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized MessageConsumer<T> handler(Handler<Message<T>> h2) {
        if (h2 != null) {
            HandlerRegistration handlerRegistration = this;
            synchronized (handlerRegistration) {
                this.handler = h2;
                if (this.registered == null) {
                    this.registered = this.eventBus.addRegistration(this.address, this, this.repliedAddress != null, this.localOnly);
                }
            }
            return this;
        }
        this.unregister();
        return this;
    }

    @Override
    public ReadStream<T> bodyStream() {
        return new BodyReadStream(this);
    }

    @Override
    public synchronized boolean isRegistered() {
        return this.registered != null;
    }

    @Override
    public synchronized MessageConsumer<T> pause() {
        this.demand = 0L;
        return this;
    }

    @Override
    public MessageConsumer<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public synchronized MessageConsumer<T> fetch(long amount) {
        if (amount < 0L) {
            throw new IllegalArgumentException();
        }
        this.demand += amount;
        if (this.demand < 0L) {
            this.demand = Long.MAX_VALUE;
        }
        if (this.demand > 0L) {
            this.checkNextTick();
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
        if (endHandler != null) {
            Context endCtx = this.vertx.getOrCreateContext();
            this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
        } else {
            this.endHandler = null;
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public Handler<Message<T>> getHandler() {
        return this.handler;
    }

    public Object getMetric() {
        return this.metric;
    }
}

