/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.protocol;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MapMaker;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.ConnectionEvents;
import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.protocol.ChannelLogDescriptor;
import com.lambdaworks.redis.protocol.RedisCommand;
import com.lambdaworks.redis.protocol.RedisStateMachine;
import com.lambdaworks.redis.resource.ClientResources;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.local.LocalAddress;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

@ChannelHandler.Sharable
public class CommandHandler<K, V>
extends ChannelDuplexHandler
implements RedisChannelWriter<K, V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class);
    private static final WriteLogListener WRITE_LOG_LISTENER = new WriteLogListener();
    private static final Set<String> SUPPRESS_IO_EXCEPTION_MESSAGES = ImmutableSet.of((Object)"Connection reset by peer", (Object)"Broken pipe", (Object)"Connection timed out");
    protected final ClientOptions clientOptions;
    protected final ClientResources clientResources;
    protected final Queue<RedisCommand<K, V, ?>> queue;
    protected final ReentrantLock writeLock = new ReentrantLock();
    protected Queue<RedisCommand<K, V, ?>> commandBuffer = this.newCommandBuffer();
    protected ByteBuf buffer;
    protected RedisStateMachine<K, V> rsm;
    protected Channel channel;
    private final boolean traceEnabled;
    private final boolean debugEnabled;
    private final Reliability reliability;
    private LifecycleState lifecycleState = LifecycleState.NOT_CONNECTED;
    private RedisChannelHandler<K, V> redisChannelHandler;
    private Throwable connectionError;
    private String logPrefix;
    private boolean autoFlushCommands = true;
    private final Object stateLock = new Object();
    private final Map<RedisCommand<K, V, ?>, SentReceived> sentTimes = new MapMaker().concurrencyLevel(4).weakKeys().makeMap();

    public CommandHandler(ClientOptions clientOptions, ClientResources clientResources, Queue<RedisCommand<K, V, ?>> queue) {
        Preconditions.checkArgument((clientOptions != null ? 1 : 0) != 0, (Object)"clientOptions must not be null");
        Preconditions.checkArgument((clientResources != null ? 1 : 0) != 0, (Object)"clientResources must not be null");
        Preconditions.checkArgument((queue != null ? 1 : 0) != 0, (Object)"queue must not be null");
        this.clientOptions = clientOptions;
        this.clientResources = clientResources;
        this.queue = queue;
        this.traceEnabled = logger.isTraceEnabled();
        this.debugEnabled = logger.isDebugEnabled();
        this.reliability = clientOptions.isAutoReconnect() ? Reliability.AT_LEAST_ONCE : Reliability.AT_MOST_ONCE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.setState(LifecycleState.REGISTERED);
        this.buffer = ctx.alloc().directBuffer(65536);
        this.rsm = new RedisStateMachine();
        Object object = this.stateLock;
        synchronized (object) {
            this.channel = ctx.channel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        this.releaseBuffer();
        if (this.lifecycleState == LifecycleState.CLOSED) {
            this.cancelCommands("Connection closed");
        }
        Object object = this.stateLock;
        synchronized (object) {
            this.channel = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf input = (ByteBuf)msg;
        if (!input.isReadable() || input.refCnt() == 0 || this.buffer == null) {
            return;
        }
        try {
            this.buffer.writeBytes(input);
            if (this.traceEnabled) {
                logger.trace("{} Received: {}", (Object)this.logPrefix(), (Object)this.buffer.toString(Charset.defaultCharset()).trim());
            }
            this.decode(ctx, this.buffer);
        }
        finally {
            input.release();
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
        while (!this.queue.isEmpty()) {
            RedisCommand<K, V, ?> command = this.queue.peek();
            SentReceived sentReceived = this.sentTimes.get(command);
            if (this.debugEnabled) {
                logger.debug("{} Queue contains: {} commands", (Object)this.logPrefix(), (Object)this.queue.size());
            }
            if (sentReceived != null && sentReceived.firstResponse == -1L) {
                sentReceived.firstResponse = this.nanoTime();
            }
            if (!this.rsm.decode(buffer, command, command.getOutput())) {
                return;
            }
            command = this.queue.poll();
            this.sentTimes.remove(command);
            this.recordLatency(command, sentReceived);
            command.complete();
            if (buffer == null || buffer.refCnt() == 0) continue;
            buffer.discardReadBytes();
        }
    }

    private void recordLatency(RedisCommand<K, V, ?> command, SentReceived sentReceived) {
        if (sentReceived != null && this.channel != null && this.remote() != null && this.clientResources.commandLatencyCollector().isEnabled()) {
            long firstResponseLatency = this.nanoTime() - sentReceived.firstResponse;
            long completionLatency = this.nanoTime() - sentReceived.sent;
            this.clientResources.commandLatencyCollector().recordCommandLatency(this.local(), this.remote(), command.getType(), firstResponseLatency, completionLatency);
        }
    }

    private SocketAddress remote() {
        return this.channel.remoteAddress();
    }

    private SocketAddress local() {
        if (this.channel.localAddress() != null) {
            return this.channel.localAddress();
        }
        return LocalAddress.ANY;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public <T, C extends RedisCommand<K, V, T>> C write(C command) {
        Preconditions.checkArgument((command != null ? 1 : 0) != 0, (Object)"command must not be null");
        if (this.lifecycleState == LifecycleState.CLOSED) {
            throw new RedisException("Connection is closed");
        }
        if (this.commandBuffer.size() + this.queue.size() >= this.clientOptions.getRequestQueueSize()) {
            throw new RedisException("Request queue size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops.");
        }
        if ((this.channel == null || !this.isConnected()) && this.isRejectCommand()) {
            throw new RedisException("Currently not connected. Commands are rejected.");
        }
        try {
            this.writeLock.lock();
            Channel channel = this.channel;
            if (this.autoFlushCommands) {
                if (channel != null && this.isConnected() && channel.isActive()) {
                    if (this.debugEnabled) {
                        logger.debug("{} write() writeAndFlush Command {}", (Object)this.logPrefix(), command);
                    }
                    if (this.reliability == Reliability.AT_MOST_ONCE) {
                        channel.writeAndFlush(command).addListener(new AtMostOnceWriteListener(command, this.queue, this.sentTimes));
                    }
                    if (this.reliability != Reliability.AT_LEAST_ONCE) return command;
                    channel.writeAndFlush(command).addListener((GenericFutureListener)WRITE_LOG_LISTENER);
                    return command;
                }
                if (this.commandBuffer.contains(command) || this.queue.contains(command)) {
                    C c = command;
                    return c;
                }
                if (this.connectionError == null) {
                    this.bufferCommand(command);
                    C c = command;
                    return c;
                }
                if (this.debugEnabled) {
                    logger.debug("{} write() completing Command {} due to connection error", (Object)this.logPrefix(), command);
                }
                command.completeExceptionally(this.connectionError);
                C c = command;
                return c;
            }
            this.bufferCommand(command);
            return command;
        }
        finally {
            this.writeLock.unlock();
            if (this.debugEnabled) {
                logger.debug("{} write() done", (Object)this.logPrefix());
            }
        }
    }

    private boolean isRejectCommand() {
        if (this.clientOptions == null) {
            return false;
        }
        switch (this.clientOptions.getDisconnectedBehavior()) {
            case REJECT_COMMANDS: {
                return true;
            }
            case ACCEPT_COMMANDS: {
                return false;
            }
        }
        return !this.clientOptions.isAutoReconnect();
    }

    private <T> void bufferCommand(RedisCommand<K, V, T> command) {
        if (this.debugEnabled) {
            logger.debug("{} write() buffering Command {}", (Object)this.logPrefix(), command);
        }
        this.commandBuffer.add(command);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isConnected() {
        LifecycleState lifecycleState = this.lifecycleState;
        synchronized (lifecycleState) {
            return this.lifecycleState.ordinal() >= LifecycleState.CONNECTED.ordinal() && this.lifecycleState.ordinal() <= LifecycleState.DISCONNECTED.ordinal();
        }
    }

    @Override
    public void flushCommands() {
        if (this.channel != null && this.isConnected()) {
            Queue queuedCommands;
            try {
                this.writeLock.lock();
                queuedCommands = this.commandBuffer;
                this.commandBuffer = this.newCommandBuffer();
            }
            finally {
                this.writeLock.unlock();
            }
            if (this.reliability == Reliability.AT_MOST_ONCE) {
                this.channel.writeAndFlush(queuedCommands).addListener(new AtMostOnceWriteListener(queuedCommands, this.queue, this.sentTimes));
            }
            if (this.reliability == Reliability.AT_LEAST_ONCE) {
                this.channel.writeAndFlush(queuedCommands).addListener((GenericFutureListener)WRITE_LOG_LISTENER);
            }
        }
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof Collection) {
            Collection commands = (Collection)msg;
            for (RedisCommand command : commands) {
                this.queueCommand(promise, command);
            }
            ctx.write((Object)commands, promise);
            return;
        }
        RedisCommand cmd = (RedisCommand)msg;
        this.queueCommand(promise, cmd);
        ctx.write((Object)cmd, promise);
    }

    private void queueCommand(ChannelPromise promise, RedisCommand<K, V, ?> cmd) throws Exception {
        if (cmd.isCancelled()) {
            return;
        }
        try {
            if (cmd.getOutput() == null) {
                cmd.complete();
            } else {
                this.sentTimes.put(cmd, new SentReceived(this.nanoTime()));
                this.queue.add(cmd);
            }
        }
        catch (Exception e) {
            cmd.completeExceptionally(e);
            promise.setFailure((Throwable)e);
            throw e;
        }
    }

    private long nanoTime() {
        return System.nanoTime();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.logPrefix = null;
        if (this.debugEnabled) {
            logger.debug("{} channelActive()", (Object)this.logPrefix());
        }
        this.setStateIfNotClosed(LifecycleState.CONNECTED);
        try {
            this.executeQueuedCommands(ctx);
        }
        catch (Exception e) {
            if (this.debugEnabled) {
                logger.debug("{} channelActive() ran into an exception", (Object)this.logPrefix());
            }
            if (this.clientOptions.isCancelCommandsOnReconnectFailure()) {
                this.reset();
            }
            throw e;
        }
        super.channelActive(ctx);
        if (this.channel != null) {
            this.channel.eventLoop().submit(new Runnable(){

                @Override
                public void run() {
                    CommandHandler.this.channel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Activated());
                }
            });
        }
        if (this.debugEnabled) {
            logger.debug("{} channelActive() done", (Object)this.logPrefix());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void executeQueuedCommands(ChannelHandlerContext ctx) {
        ArrayDeque<RedisCommand<K, V, ?>> tmp = this.newCommandBuffer();
        try {
            this.writeLock.lock();
            this.connectionError = null;
            tmp.addAll(this.commandBuffer);
            tmp.addAll(this.queue);
            this.queue.clear();
            this.sentTimes.clear();
            this.commandBuffer = tmp;
            if (this.debugEnabled) {
                logger.debug("{} executeQueuedCommands {} command(s) queued", (Object)this.logPrefix(), (Object)tmp.size());
            }
            Object object = this.stateLock;
            synchronized (object) {
                this.channel = ctx.channel();
            }
            if (this.redisChannelHandler != null) {
                if (this.debugEnabled) {
                    logger.debug("{} activating channel handler", (Object)this.logPrefix());
                }
                this.setStateIfNotClosed(LifecycleState.ACTIVATING);
                this.redisChannelHandler.activated();
            }
            this.setStateIfNotClosed(LifecycleState.ACTIVE);
            this.flushCommands();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} channelInactive()", (Object)this.logPrefix());
        }
        this.setStateIfNotClosed(LifecycleState.DISCONNECTED);
        if (this.redisChannelHandler != null) {
            if (this.debugEnabled) {
                logger.debug("{} deactivating channel handler", (Object)this.logPrefix());
            }
            this.setStateIfNotClosed(LifecycleState.DEACTIVATING);
            this.redisChannelHandler.deactivated();
        }
        this.setStateIfNotClosed(LifecycleState.DEACTIVATED);
        if (this.buffer != null) {
            this.rsm.reset();
            this.buffer.clear();
        }
        if (this.debugEnabled) {
            logger.debug("{} channelInactive() done", (Object)this.logPrefix());
        }
        super.channelInactive(ctx);
    }

    protected void setStateIfNotClosed(LifecycleState lifecycleState) {
        if (this.lifecycleState != LifecycleState.CLOSED) {
            this.setState(lifecycleState);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void setState(LifecycleState lifecycleState) {
        Object object = this.stateLock;
        synchronized (object) {
            this.lifecycleState = lifecycleState;
        }
    }

    protected LifecycleState getState() {
        return this.lifecycleState;
    }

    private void cancelCommands(String message) {
        int size = 0;
        if (this.queue != null) {
            size += this.queue.size();
        }
        if (this.commandBuffer != null) {
            size += this.commandBuffer.size();
        }
        ArrayList toCancel = new ArrayList(size);
        if (this.queue != null) {
            toCancel.addAll(this.queue);
            this.queue.clear();
        }
        if (this.commandBuffer != null) {
            toCancel.addAll(this.commandBuffer);
            this.commandBuffer.clear();
        }
        this.sentTimes.clear();
        for (RedisCommand redisCommand : toCancel) {
            if (redisCommand.getOutput() != null) {
                redisCommand.getOutput().setError(message);
            }
            redisCommand.cancel();
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        InternalLogLevel logLevel = InternalLogLevel.WARN;
        if (!this.queue.isEmpty()) {
            RedisCommand<K, V, ?> command = this.queue.poll();
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in {}", (Object)this.logPrefix(), command);
            }
            logLevel = InternalLogLevel.DEBUG;
            command.completeExceptionally(cause);
        }
        if (this.channel == null || !this.channel.isActive() || !this.isConnected()) {
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in connectionError", (Object)this.logPrefix());
            }
            logLevel = InternalLogLevel.DEBUG;
            this.connectionError = cause;
        }
        if (cause instanceof IOException && logLevel.ordinal() > InternalLogLevel.INFO.ordinal()) {
            logLevel = InternalLogLevel.INFO;
            if (SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
                logLevel = InternalLogLevel.DEBUG;
            }
        }
        logger.log(logLevel, "{} Unexpected exception during request: {}", new Object[]{this.logPrefix, cause.toString(), cause});
    }

    @Override
    public void close() {
        if (this.debugEnabled) {
            logger.debug("{} close()", (Object)this.logPrefix());
        }
        if (this.lifecycleState == LifecycleState.CLOSED) {
            return;
        }
        this.setStateIfNotClosed(LifecycleState.CLOSED);
        Channel currentChannel = this.channel;
        if (currentChannel != null) {
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.PrepareClose());
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Close());
            ChannelFuture close = currentChannel.pipeline().close();
            if (currentChannel.isOpen()) {
                close.syncUninterruptibly();
            }
        }
    }

    private void releaseBuffer() {
        if (this.buffer != null) {
            this.buffer.release();
            this.buffer = null;
        }
    }

    public boolean isClosed() {
        return this.lifecycleState == LifecycleState.CLOSED;
    }

    @Override
    public void reset() {
        if (this.debugEnabled) {
            logger.debug("{} reset()", (Object)this.logPrefix());
        }
        try {
            this.writeLock.lock();
            this.cancelCommands("Reset");
        }
        finally {
            this.writeLock.unlock();
        }
        if (this.buffer != null) {
            this.rsm.reset();
            this.buffer.clear();
        }
    }

    public void initialState() {
        this.setState(LifecycleState.NOT_CONNECTED);
        this.queue.clear();
        this.commandBuffer.clear();
        Channel currentChannel = this.channel;
        if (currentChannel != null) {
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.PrepareClose());
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Close());
            currentChannel.pipeline().close();
        }
    }

    @Override
    public void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler) {
        this.redisChannelHandler = redisChannelHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setAutoFlushCommands(boolean autoFlush) {
        Object object = this.stateLock;
        synchronized (object) {
            this.autoFlushCommands = autoFlush;
        }
    }

    protected String logPrefix() {
        if (this.logPrefix != null) {
            return this.logPrefix;
        }
        StringBuffer buffer = new StringBuffer(64);
        buffer.append('[').append(ChannelLogDescriptor.logDescriptor(this.channel)).append(']');
        this.logPrefix = buffer.toString();
        return this.logPrefix;
    }

    private ArrayDeque<RedisCommand<K, V, ?>> newCommandBuffer() {
        return new ArrayDeque(512);
    }

    static class SentReceived {
        final long sent;
        long firstResponse = -1L;

        public SentReceived(long sent) {
            this.sent = sent;
        }
    }

    static class WriteLogListener
    implements GenericFutureListener<Future<Void>> {
        WriteLogListener() {
        }

        public void operationComplete(Future<Void> future) throws Exception {
            Throwable cause = future.cause();
            if (!future.isSuccess() && !(cause instanceof ClosedChannelException)) {
                String message = "Unexpected exception during request: {}";
                InternalLogLevel logLevel = InternalLogLevel.WARN;
                if (cause instanceof IOException && SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
                    logLevel = InternalLogLevel.DEBUG;
                }
                logger.log(logLevel, message, (Object)cause.toString(), (Object)cause);
            }
        }
    }

    private static class AtMostOnceWriteListener<K, V, T>
    implements ChannelFutureListener {
        private final Collection<RedisCommand<K, V, T>> sentCommands;
        private final Queue<?> queue;
        private final Map<RedisCommand<K, V, T>, SentReceived> sentTimes;

        public AtMostOnceWriteListener(RedisCommand<K, V, T> sentCommand, Queue<?> queue, Map<RedisCommand<K, V, T>, SentReceived> sentTimes) {
            this((Collection<RedisCommand<K, V, T>>)ImmutableList.of(sentCommand), queue, sentTimes);
        }

        public AtMostOnceWriteListener(Collection<RedisCommand<K, V, T>> sentCommand, Queue<?> queue, Map<RedisCommand<K, V, T>, SentReceived> sentTimes) {
            this.sentCommands = sentCommand;
            this.queue = queue;
            this.sentTimes = sentTimes;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            future.await();
            if (future.cause() != null) {
                for (RedisCommand<K, V, T> sentCommand : this.sentCommands) {
                    sentCommand.completeExceptionally(future.cause());
                }
                this.queue.removeAll(this.sentCommands);
                for (RedisCommand<K, V, T> sentCommand : this.sentCommands) {
                    this.sentTimes.remove(sentCommand);
                }
            }
        }
    }

    private static enum Reliability {
        AT_MOST_ONCE,
        AT_LEAST_ONCE;

    }

    public static enum LifecycleState {
        NOT_CONNECTED,
        REGISTERED,
        CONNECTED,
        ACTIVATING,
        ACTIVE,
        DISCONNECTED,
        DEACTIVATING,
        DEACTIVATED,
        CLOSED;

    }
}

