/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.driver.internal.core.channel;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.connection.BusyConnectionException;
import com.datastax.oss.driver.api.core.connection.ClosedConnectionException;
import com.datastax.oss.driver.api.core.connection.HeartbeatException;
import com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.channel.EventCallback;
import com.datastax.oss.driver.internal.core.channel.ResponseCallback;
import com.datastax.oss.driver.internal.core.channel.StreamIdGenerator;
import com.datastax.oss.driver.internal.core.protocol.FrameDecodingException;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.shaded.guava.common.collect.BiMap;
import com.datastax.oss.driver.shaded.guava.common.collect.HashBiMap;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.request.Query;
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Promise;
import java.util.HashMap;
import java.util.Map;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class InFlightHandler
extends ChannelDuplexHandler {
    private static final Logger LOG = LoggerFactory.getLogger(InFlightHandler.class);
    private final ProtocolVersion protocolVersion;
    private final StreamIdGenerator streamIds;
    final ChannelPromise closeStartedFuture;
    private final String ownerLogPrefix;
    private final BiMap<Integer, ResponseCallback> inFlight;
    private final Map<Integer, ResponseCallback> orphaned;
    private volatile int orphanedSize;
    private final long setKeyspaceTimeoutMillis;
    private final EventCallback eventCallback;
    private final int maxOrphanStreamIds;
    private boolean closingGracefully;
    private SetKeyspaceRequest setKeyspaceRequest;
    private String logPrefix;

    InFlightHandler(ProtocolVersion protocolVersion, StreamIdGenerator streamIds, int maxOrphanStreamIds, long setKeyspaceTimeoutMillis, ChannelPromise closeStartedFuture, EventCallback eventCallback, String ownerLogPrefix) {
        this.protocolVersion = protocolVersion;
        this.streamIds = streamIds;
        this.maxOrphanStreamIds = maxOrphanStreamIds;
        this.closeStartedFuture = closeStartedFuture;
        this.ownerLogPrefix = ownerLogPrefix;
        this.logPrefix = ownerLogPrefix + "|connecting...";
        this.inFlight = HashBiMap.create(streamIds.getMaxAvailableIds());
        this.orphaned = new HashMap<Integer, ResponseCallback>(maxOrphanStreamIds);
        this.setKeyspaceTimeoutMillis = setKeyspaceTimeoutMillis;
        this.eventCallback = eventCallback;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        String channelId = ctx.channel().toString();
        this.logPrefix = this.ownerLogPrefix + "|" + channelId.substring(1, channelId.length() - 1);
    }

    public void write(ChannelHandlerContext ctx, Object in, ChannelPromise promise) throws Exception {
        if (in == DriverChannel.GRACEFUL_CLOSE_MESSAGE) {
            LOG.debug("[{}] Received graceful close request", (Object)this.logPrefix);
            this.startGracefulShutdown(ctx);
        } else if (in == DriverChannel.FORCEFUL_CLOSE_MESSAGE) {
            LOG.debug("[{}] Received forceful close request, aborting pending queries", (Object)this.logPrefix);
            this.abortAllInFlight(new ClosedConnectionException("Channel was force-closed"));
            ctx.channel().close();
        } else if (in instanceof HeartbeatException) {
            this.abortAllInFlight(new ClosedConnectionException("Heartbeat query failed", (HeartbeatException)in));
            ctx.close();
        } else if (in instanceof DriverChannel.RequestMessage) {
            this.write(ctx, (DriverChannel.RequestMessage)in, promise);
        } else if (in instanceof ResponseCallback) {
            this.cancel(ctx, (ResponseCallback)in, promise);
        } else {
            promise.setFailure((Throwable)new IllegalArgumentException("Unsupported message type " + in.getClass().getName()));
        }
    }

    private void write(ChannelHandlerContext ctx, DriverChannel.RequestMessage message, ChannelPromise promise) {
        if (this.closingGracefully) {
            promise.setFailure((Throwable)new IllegalStateException("Channel is closing"));
            this.streamIds.cancelPreAcquire();
            return;
        }
        int streamId = this.streamIds.acquire();
        if (streamId < 0) {
            promise.setFailure((Throwable)new BusyConnectionException(String.format("Couldn't acquire a stream id from InFlightHandler on %s", ctx.channel())));
            this.streamIds.cancelPreAcquire();
            return;
        }
        if (this.inFlight.containsKey(streamId)) {
            promise.setFailure((Throwable)new IllegalStateException("Found pending callback for stream id " + streamId));
            this.streamIds.cancelPreAcquire();
            return;
        }
        LOG.trace("[{}] Writing {} on stream id {}", new Object[]{this.logPrefix, message.responseCallback, streamId});
        Frame frame = Frame.forRequest((int)this.protocolVersion.getCode(), (int)streamId, (boolean)message.tracing, message.customPayload, (Message)message.request);
        this.inFlight.put(streamId, message.responseCallback);
        ChannelFuture writeFuture = ctx.write((Object)frame, promise);
        writeFuture.addListener(future -> {
            if (future.isSuccess()) {
                message.responseCallback.onStreamIdAssigned(streamId);
            } else {
                this.release(streamId, ctx);
            }
        });
    }

    private void cancel(ChannelHandlerContext ctx, ResponseCallback responseCallback, ChannelPromise promise) {
        Integer streamId = (Integer)this.inFlight.inverse().remove(responseCallback);
        if (streamId == null) {
            LOG.trace("[{}] Received cancellation for unknown or already cancelled callback {}, skipping", (Object)this.logPrefix, (Object)responseCallback);
        } else {
            LOG.trace("[{}] Cancelled callback {} for stream id {}", new Object[]{this.logPrefix, responseCallback, streamId});
            if (this.closingGracefully && this.inFlight.isEmpty()) {
                LOG.debug("[{}] Last pending query was cancelled, closing channel", (Object)this.logPrefix);
                ctx.channel().close();
            } else {
                this.orphaned.put(streamId, responseCallback);
                if (this.orphaned.size() > this.maxOrphanStreamIds) {
                    LOG.debug("[{}] Orphan stream ids exceeded the configured threshold ({}), closing gracefully", (Object)this.logPrefix, (Object)this.maxOrphanStreamIds);
                    this.startGracefulShutdown(ctx);
                } else {
                    this.orphanedSize = this.orphaned.size();
                }
            }
        }
        promise.setSuccess();
    }

    private void startGracefulShutdown(ChannelHandlerContext ctx) {
        if (this.inFlight.isEmpty()) {
            LOG.debug("[{}] No pending queries, completing graceful shutdown now", (Object)this.logPrefix);
            ctx.channel().close();
        } else {
            ChannelHandler heartbeatHandler = ctx.pipeline().get("heartbeat");
            if (heartbeatHandler != null) {
                ctx.pipeline().remove(heartbeatHandler);
            }
            LOG.debug("[{}] There are pending queries, delaying graceful shutdown", (Object)this.logPrefix);
            this.closingGracefully = true;
            this.closeStartedFuture.setSuccess();
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Frame responseFrame = (Frame)msg;
        int streamId = responseFrame.streamId;
        if (streamId < 0) {
            Message event = responseFrame.message;
            if (this.eventCallback == null) {
                LOG.debug("[{}] Received event {} but no callback was registered", (Object)this.logPrefix, (Object)event);
            } else {
                LOG.debug("[{}] Received event {}, notifying callback", (Object)this.logPrefix, (Object)event);
                try {
                    this.eventCallback.onEvent(event);
                }
                catch (Throwable t) {
                    Loggers.warnWithException(LOG, "[{}] Unexpected error while invoking event handler", this.logPrefix, t);
                }
            }
        } else {
            boolean wasInFlight = true;
            ResponseCallback callback = (ResponseCallback)this.inFlight.get(streamId);
            if (callback == null) {
                wasInFlight = false;
                callback = this.orphaned.get(streamId);
                if (callback == null) {
                    LOG.trace("[{}] Got response on unknown stream id {}, skipping", (Object)this.logPrefix, (Object)streamId);
                    return;
                }
            }
            try {
                if (callback.isLastResponse(responseFrame)) {
                    LOG.debug("[{}] Got last response on {} stream id {}, completing and releasing", new Object[]{this.logPrefix, wasInFlight ? "in-flight" : "orphaned", streamId});
                    this.release(streamId, ctx);
                } else {
                    LOG.trace("[{}] Got non-last response on {} stream id {}, still holding", new Object[]{this.logPrefix, wasInFlight ? "in-flight" : "orphaned", streamId});
                }
                if (wasInFlight) {
                    callback.onResponse(responseFrame);
                }
            }
            catch (Throwable t) {
                if (wasInFlight) {
                    this.fail(callback, new IllegalArgumentException("Unexpected error while invoking response handler", t));
                }
                Loggers.warnWithException(LOG, "[{}] Unexpected error while invoking response handler on stream id {}", this.logPrefix, t, streamId);
            }
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable exception) throws Exception {
        if (exception instanceof FrameDecodingException) {
            int streamId = ((FrameDecodingException)((Object)exception)).streamId;
            LOG.debug("[{}] Error while decoding response on stream id {}", (Object)this.logPrefix, (Object)streamId);
            if (streamId >= 0) {
                ResponseCallback responseCallback = (ResponseCallback)this.inFlight.get(streamId);
                if (responseCallback != null) {
                    this.fail(responseCallback, exception.getCause());
                }
                this.release(streamId, ctx);
            } else {
                Loggers.warnWithException(LOG, "[{}] Unexpected error while decoding incoming event frame", this.logPrefix, exception.getCause());
            }
        } else {
            this.abortAllInFlight(exception instanceof HeartbeatException ? (HeartbeatException)exception : new ClosedConnectionException("Unexpected error on channel", exception));
            ctx.close();
        }
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
        if (event instanceof DriverChannel.SetKeyspaceEvent) {
            DriverChannel.SetKeyspaceEvent setKeyspaceEvent = (DriverChannel.SetKeyspaceEvent)event;
            if (this.setKeyspaceRequest != null) {
                setKeyspaceEvent.promise.setFailure((Throwable)new IllegalStateException("Can't call setKeyspace while a keyspace switch is already in progress"));
            } else {
                LOG.debug("[{}] Switching to keyspace {}", (Object)this.logPrefix, (Object)setKeyspaceEvent.keyspaceName.asInternal());
                this.setKeyspaceRequest = new SetKeyspaceRequest(ctx, setKeyspaceEvent);
                this.setKeyspaceRequest.send();
            }
        } else {
            super.userEventTriggered(ctx, event);
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.abortAllInFlight(new ClosedConnectionException("Lost connection to remote peer"));
        super.channelInactive(ctx);
    }

    private void release(int streamId, ChannelHandlerContext ctx) {
        LOG.trace("[{}] Releasing stream id {}", (Object)this.logPrefix, (Object)streamId);
        if (this.inFlight.remove(streamId) != null) {
            if (this.closingGracefully && this.inFlight.isEmpty()) {
                LOG.debug("[{}] Done handling the last pending query, closing channel", (Object)this.logPrefix);
                ctx.channel().close();
            }
        } else if (this.orphaned.remove(streamId) != null) {
            this.orphanedSize = this.orphaned.size();
        }
        this.streamIds.release(streamId);
    }

    private void abortAllInFlight(DriverException cause) {
        this.abortAllInFlight(cause, null);
    }

    private void abortAllInFlight(DriverException cause, ResponseCallback ignore) {
        if (!this.inFlight.isEmpty()) {
            ImmutableSet<ResponseCallback> responseCallbacks = ImmutableSet.copyOf(this.inFlight.values());
            this.inFlight.clear();
            for (ResponseCallback responseCallback : responseCallbacks) {
                if (responseCallback == ignore) continue;
                this.fail(responseCallback, cause);
            }
        }
    }

    private void fail(ResponseCallback callback, Throwable failure) {
        try {
            callback.onFailure(failure);
        }
        catch (Throwable throwable) {
            LOG.error("[{}] Unexpected error while failing {}", new Object[]{this.logPrefix, callback, throwable});
        }
    }

    int getAvailableIds() {
        return this.streamIds.getAvailableIds();
    }

    boolean preAcquireId() {
        return this.streamIds.preAcquire();
    }

    int getInFlight() {
        return this.streamIds.getMaxAvailableIds() - this.streamIds.getAvailableIds();
    }

    int getOrphanIds() {
        return this.orphanedSize;
    }

    private class SetKeyspaceRequest
    extends ChannelHandlerRequest {
        private final CqlIdentifier keyspaceName;
        private final Promise<Void> promise;

        SetKeyspaceRequest(ChannelHandlerContext ctx, DriverChannel.SetKeyspaceEvent setKeyspaceEvent) {
            super(ctx, InFlightHandler.this.setKeyspaceTimeoutMillis);
            this.keyspaceName = setKeyspaceEvent.keyspaceName;
            this.promise = setKeyspaceEvent.promise;
        }

        @Override
        String describe() {
            return "[" + InFlightHandler.this.logPrefix + "] Set keyspace request (USE " + this.keyspaceName.asCql(true) + ")";
        }

        @Override
        Message getRequest() {
            return new Query("USE " + this.keyspaceName.asCql(false));
        }

        @Override
        void onResponse(Message response) {
            if (response instanceof SetKeyspace) {
                if (this.promise.trySuccess(null)) {
                    InFlightHandler.this.setKeyspaceRequest = null;
                }
            } else {
                this.failOnUnexpected(response);
            }
        }

        @Override
        void fail(String message, Throwable cause) {
            ClosedConnectionException setKeyspaceException = new ClosedConnectionException(message, cause);
            if (this.promise.tryFailure((Throwable)setKeyspaceException)) {
                InFlightHandler.this.setKeyspaceRequest = null;
                Loggers.warnWithException(LOG, "[{}] Unexpected error while switching keyspace", InFlightHandler.this.logPrefix, setKeyspaceException);
                InFlightHandler.this.abortAllInFlight(setKeyspaceException, this);
                this.ctx.channel().close();
            }
        }
    }
}

