/*
 * Decompiled with CFR 0.152.
 */
package net.rubyeye.xmemcached.impl;

import com.google.code.yanf4j.buffer.IoBuffer;
import com.google.code.yanf4j.core.Session;
import com.google.code.yanf4j.core.impl.AbstractSession;
import com.google.code.yanf4j.core.impl.HandlerAdapter;
import com.google.code.yanf4j.util.SystemUtils;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientStateListener;
import net.rubyeye.xmemcached.auth.AuthMemcachedConnectListener;
import net.rubyeye.xmemcached.command.Command;
import net.rubyeye.xmemcached.command.CommandType;
import net.rubyeye.xmemcached.command.MapReturnValueAware;
import net.rubyeye.xmemcached.command.OperationStatus;
import net.rubyeye.xmemcached.command.StoreCommand;
import net.rubyeye.xmemcached.command.binary.BinaryGetCommand;
import net.rubyeye.xmemcached.command.binary.BinaryVersionCommand;
import net.rubyeye.xmemcached.command.text.TextGetOneCommand;
import net.rubyeye.xmemcached.command.text.TextVersionCommand;
import net.rubyeye.xmemcached.impl.MemcachedTCPSession;
import net.rubyeye.xmemcached.impl.ReconnectRequest;
import net.rubyeye.xmemcached.monitor.StatisticsHandler;
import net.rubyeye.xmemcached.networking.MemcachedSessionConnectListener;
import net.rubyeye.xmemcached.transcoders.CachedData;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;
import net.rubyeye.xmemcached.utils.Protocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemcachedHandler
extends HandlerAdapter {
    private static final int MAX_HEARTBEAT_THREADS = Integer.parseInt(System.getProperty("xmemcached.heartbeat.max_threads", String.valueOf(Runtime.getRuntime().availableProcessors())));
    private final StatisticsHandler statisticsHandler;
    private ExecutorService heartBeatThreadPool;
    private final MemcachedSessionConnectListener listener;
    private final MemcachedClient client;
    private static final Logger log = LoggerFactory.getLogger(MemcachedHandler.class);
    private volatile boolean enableHeartBeat = true;
    public static final IoBuffer EMPTY_BUF = IoBuffer.allocate(0);
    private static final String HEART_BEAT_FAIL_COUNT_ATTR = "heartBeatFailCount";
    private static final int MAX_HEART_BEAT_FAIL_COUNT = Integer.parseInt(System.getProperty("xmemcached.heartbeat.max.fail.times", "3"));
    final long HEARTBEAT_PERIOD = Long.parseLong(System.getProperty("xmemcached.heartbeat.period", "5000"));

    public final void onMessageReceived(Session session, Object msg) {
        Command command = (Command)msg;
        if (this.statisticsHandler.isStatistics()) {
            if (command.getCopiedMergeCount() > 0 && command instanceof MapReturnValueAware) {
                Map<String, CachedData> returnValues = ((MapReturnValueAware)((Object)command)).getReturnValues();
                int size = returnValues.size();
                this.statisticsHandler.statistics(CommandType.GET_HIT, size);
                this.statisticsHandler.statistics(CommandType.GET_MISS, command.getCopiedMergeCount() - size);
            } else if (command instanceof TextGetOneCommand || command instanceof BinaryGetCommand) {
                if (command.getResult() != null) {
                    this.statisticsHandler.statistics(CommandType.GET_HIT);
                } else {
                    this.statisticsHandler.statistics(CommandType.GET_MISS);
                }
            } else if (command.getCopiedMergeCount() > 0) {
                this.statisticsHandler.statistics(command.getCommandType(), command.getCopiedMergeCount());
            } else {
                this.statisticsHandler.statistics(command.getCommandType());
            }
        }
    }

    public void setEnableHeartBeat(boolean enableHeartBeat) {
        this.enableHeartBeat = enableHeartBeat;
    }

    public final void onMessageSent(Session session, Object msg) {
        Command command = (Command)msg;
        command.setStatus(OperationStatus.SENT);
        if (!command.isNoreply() || this.client.getProtocol() == Protocol.Binary) {
            ((MemcachedTCPSession)session).addCommand(command);
        }
        command.setIoBuffer(EMPTY_BUF);
        switch (command.getCommandType()) {
            case SET: 
            case SET_MANY: {
                ((StoreCommand)((Object)command)).setValue(null);
            }
        }
    }

    public void onExceptionCaught(Session session, Throwable throwable) {
        log.error("XMemcached network layout exception", throwable);
    }

    public void onSessionStarted(Session session) {
        session.setUseBlockingRead(true);
        session.setAttribute(HEART_BEAT_FAIL_COUNT_ATTR, new AtomicInteger(0));
        for (MemcachedClientStateListener listener : this.client.getStateListeners()) {
            listener.onConnected(this.client, session.getRemoteSocketAddress());
        }
        this.listener.onConnect((MemcachedTCPSession)session, this.client);
    }

    public final void onSessionClosed(Session session) {
        this.client.getConnector().removeSession(session);
        ((AbstractSession)session).clearWriteQueue();
        MemcachedTCPSession memcachedSession = (MemcachedTCPSession)session;
        memcachedSession.destroy();
        if (this.client.getConnector().isStarted() && memcachedSession.isAllowReconnect()) {
            this.reconnect(memcachedSession);
        }
        for (MemcachedClientStateListener listener : this.client.getStateListeners()) {
            listener.onDisconnected(this.client, session.getRemoteSocketAddress());
        }
    }

    public void onSessionIdle(Session session) {
        this.checkHeartBeat(session);
    }

    private void checkHeartBeat(Session session) {
        if (this.enableHeartBeat) {
            log.debug("Check session ({}) is alive,send heartbeat", (Object)(session.getRemoteSocketAddress() == null ? "unknown" : SystemUtils.getRawAddress(session.getRemoteSocketAddress()) + ":" + session.getRemoteSocketAddress().getPort()));
            Command versionCommand = null;
            CountDownLatch latch = new CountDownLatch(1);
            versionCommand = this.client.getProtocol() == Protocol.Binary ? new BinaryVersionCommand(latch, session.getRemoteSocketAddress()) : new TextVersionCommand(latch, session.getRemoteSocketAddress());
            session.write(versionCommand);
            if (this.heartBeatThreadPool != null) {
                this.heartBeatThreadPool.execute(new CheckHeartResultThread(versionCommand, session));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void reconnect(MemcachedTCPSession session) {
        if (!this.client.isShutdown()) {
            MemcachedTCPSession memcachedTCPSession = session;
            synchronized (memcachedTCPSession) {
                if (!session.isAllowReconnect()) {
                    return;
                }
                session.setAllowReconnect(false);
            }
            MemcachedTCPSession memcachedTCPSession2 = session;
            InetSocketAddressWrapper inetSocketAddressWrapper = memcachedTCPSession2.getInetSocketAddressWrapper();
            this.client.getConnector().addToWatingQueue(new ReconnectRequest(inetSocketAddressWrapper, 0, this.client.getHealSessionInterval()));
        }
    }

    public void stop() {
        this.heartBeatThreadPool.shutdown();
    }

    public void start() {
        final String name = "XMemcached-HeartBeatPool[" + this.client.getName() + "]";
        final AtomicInteger threadCounter = new AtomicInteger();
        long keepAliveTime = this.client.getConnector().getSessionIdleTimeout() * 3L / 2L;
        this.heartBeatThreadPool = new ThreadPoolExecutor(1, MAX_HEARTBEAT_THREADS, keepAliveTime, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){

            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, name + "-" + threadCounter.getAndIncrement());
                t.setDaemon(true);
                if (t.getPriority() != 5) {
                    t.setPriority(5);
                }
                return t;
            }
        }, new ThreadPoolExecutor.DiscardPolicy());
    }

    public MemcachedHandler(MemcachedClient client) {
        this.client = client;
        this.listener = new AuthMemcachedConnectListener();
        this.statisticsHandler = new StatisticsHandler();
    }

    static final class CheckHeartResultThread
    implements Runnable {
        private final Command versionCommand;
        private final Session session;

        public CheckHeartResultThread(Command versionCommand, Session session) {
            this.versionCommand = versionCommand;
            this.session = session;
        }

        public void run() {
            try {
                AtomicInteger heartBeatFailCount = (AtomicInteger)this.session.getAttribute(MemcachedHandler.HEART_BEAT_FAIL_COUNT_ATTR);
                if (heartBeatFailCount != null) {
                    if (!this.versionCommand.getLatch().await(2000L, TimeUnit.MILLISECONDS)) {
                        heartBeatFailCount.incrementAndGet();
                    }
                    if (this.versionCommand.getResult() == null) {
                        heartBeatFailCount.incrementAndGet();
                    } else {
                        heartBeatFailCount.set(0);
                    }
                    if (heartBeatFailCount.get() > MAX_HEART_BEAT_FAIL_COUNT) {
                        log.warn("Session(" + SystemUtils.getRawAddress(this.session.getRemoteSocketAddress()) + ":" + this.session.getRemoteSocketAddress().getPort() + ") heartbeat fail " + heartBeatFailCount.get() + " times,close session and try to heal it");
                        this.session.close();
                        heartBeatFailCount.set(0);
                    }
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

