/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.protocol;

import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisCommandInterruptedException;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.protocol.RedisCommand;
import com.lambdaworks.redis.protocol.RedisStateMachine;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

@ChannelHandler.Sharable
public class CommandHandler<K, V>
extends ChannelDuplexHandler
implements RedisChannelWriter<K, V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class);
    protected BlockingQueue<RedisCommand<K, V, ?>> queue;
    protected BlockingQueue<RedisCommand<K, V, ?>> commandBuffer = new LinkedBlockingQueue();
    protected ByteBuf buffer;
    protected RedisStateMachine<K, V> rsm;
    private AtomicReference<Channel> channel = new AtomicReference();
    private boolean closed;
    private RedisChannelHandler<K, V> redisChannelHandler;
    private final ReentrantLock writeLock = new ReentrantLock();
    private final ReentrantLock readLock = new ReentrantLock();

    public CommandHandler(BlockingQueue<RedisCommand<K, V, ?>> queue) {
        this.queue = queue;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.buffer = ctx.alloc().heapBuffer();
        this.rsm = new RedisStateMachine();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf input = (ByteBuf)msg;
        try {
            if (!input.isReadable() || input.refCnt() == 0) {
                return;
            }
            if (this.buffer == null) {
                logger.warn("CommandHandler is closed, incoming response will be discarded.");
                return;
            }
            try {
                this.readLock.lock();
                this.buffer.writeBytes(input);
                if (logger.isTraceEnabled()) {
                    logger.trace("[" + ctx.channel().remoteAddress() + "] Received: " + this.buffer.toString(Charset.defaultCharset()).trim());
                }
                this.decode(ctx, this.buffer);
            }
            finally {
                this.readLock.unlock();
            }
        }
        finally {
            input.release();
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
        while (!this.queue.isEmpty() && this.rsm.decode(buffer, (RedisCommand)this.queue.peek(), ((RedisCommand)this.queue.peek()).getOutput())) {
            RedisCommand<K, V, ?> cmd = this.queue.take();
            cmd.complete();
            if (buffer == null || buffer.refCnt() == 0) continue;
            buffer.discardReadBytes();
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (!this.queue.isEmpty()) {
            RedisCommand<K, V, ?> command = this.queue.take();
            command.setException(cause);
            command.complete();
        }
        super.exceptionCaught(ctx, cause);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
        try {
            if (this.closed) {
                throw new RedisException("Connection is closed");
            }
            try {
                this.writeLock.lock();
                Channel channel = this.channel.get();
                if (channel != null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("[" + this + "] write() writeAndFlush Command " + command);
                    }
                    channel.writeAndFlush(command);
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("[" + this + "] write() buffering Command " + command);
                    }
                    this.commandBuffer.put(command);
                }
            }
            finally {
                this.writeLock.unlock();
            }
        }
        catch (InterruptedException e) {
            throw new RedisCommandInterruptedException(e);
        }
        return command;
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        RedisCommand cmd = (RedisCommand)msg;
        ByteBuf buf = ctx.alloc().heapBuffer();
        cmd.encode(buf);
        if (logger.isTraceEnabled()) {
            logger.trace("[" + ctx.channel().remoteAddress() + "] Sent: " + buf.toString(Charset.defaultCharset()).trim());
        }
        if (cmd.getOutput() == null) {
            ctx.write((Object)buf, promise);
            cmd.complete();
        } else {
            this.queue.put(cmd);
            ctx.write((Object)buf, promise);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.debug("[" + this + "] channelActive()");
        ArrayList tmp = new ArrayList(this.queue.size() + this.commandBuffer.size());
        try {
            this.writeLock.lock();
            tmp.addAll(this.commandBuffer);
            tmp.addAll(this.queue);
            this.queue.clear();
            this.commandBuffer.clear();
            this.channel.set(ctx.channel());
            if (this.redisChannelHandler != null) {
                this.redisChannelHandler.activated();
            }
        }
        finally {
            this.writeLock.unlock();
        }
        for (RedisCommand redisCommand : tmp) {
            if (redisCommand.isCancelled()) continue;
            if (logger.isDebugEnabled()) {
                logger.debug("[" + this + "] channelActive() triggering command " + redisCommand);
            }
            ctx.channel().writeAndFlush((Object)redisCommand);
        }
        tmp.clear();
        logger.debug("[" + this + "] channelActive() done");
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.debug("[" + this + "] channelInactive()");
        this.channel.set(null);
        if (this.closed) {
            int size = 0;
            if (this.queue != null) {
                size += this.queue.size();
            }
            if (this.commandBuffer != null) {
                size += this.commandBuffer.size();
            }
            ArrayList toCancel = new ArrayList(size);
            if (this.queue != null) {
                toCancel.addAll(this.queue);
                this.queue.clear();
                this.queue = null;
            }
            if (this.commandBuffer != null) {
                toCancel.addAll(this.commandBuffer);
                this.commandBuffer.clear();
                this.commandBuffer = null;
            }
            for (RedisCommand redisCommand : toCancel) {
                if (redisCommand.getOutput() != null) {
                    redisCommand.getOutput().setError("Connection closed");
                }
                redisCommand.complete();
            }
        }
        if (this.redisChannelHandler != null) {
            this.redisChannelHandler.deactivated();
        }
        logger.debug("[" + this + "] channelInactive() done");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        logger.debug("[" + this + "] close()");
        if (this.closed) {
            return;
        }
        if (this.buffer != null) {
            try {
                this.readLock.lock();
                this.buffer.release();
            }
            finally {
                this.readLock.unlock();
            }
            this.buffer = null;
        }
        this.closed = true;
        if (this.channel.get() != null) {
            ConnectionWatchdog watchdog = (ConnectionWatchdog)this.channel.get().pipeline().get(ConnectionWatchdog.class);
            if (watchdog != null) {
                watchdog.setReconnect(false);
            }
            try {
                this.channel.get().close().sync();
            }
            catch (InterruptedException e) {
                throw new RedisException(e);
            }
            this.channel.set(null);
        }
    }

    public boolean isClosed() {
        return this.closed;
    }

    @Override
    public void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler) {
        this.redisChannelHandler = redisChannelHandler;
    }
}

