/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.DataOutput;
import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.net.AsyncResult;
import org.apache.cassandra.net.CallbackInfo;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.IMessageCallback;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.IncomingTcpConnection;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDeliveryTask;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.net.OutboundTcpConnectionPool;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.FileStreamTask;
import org.apache.cassandra.streaming.StreamHeader;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.StatusLogger;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MessagingService
implements MessagingServiceMBean {
    public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
    public static final int VERSION_07 = 1;
    public static final int VERSION_080 = 2;
    public static final int VERSION_10 = 3;
    public static final int VERSION_11 = 4;
    public static final int version_ = 4;
    static SerializerType serializerType_ = SerializerType.BINARY;
    static final int PROTOCOL_MAGIC = -900387334;
    private final ExpiringMap<String, CallbackInfo> callbacks;
    private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
    private final ConcurrentMap<InetAddress, DebuggableThreadPoolExecutor> streamExecutors = new NonBlockingHashMap();
    private final AtomicInteger activeStreamsOutbound = new AtomicInteger(0);
    private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap();
    private static final Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
    private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
    private List<SocketThread> socketThreads = Lists.newArrayList();
    private final SimpleCondition listenGate;
    public static final EnumSet<StorageService.Verb> DROPPABLE_VERBS = EnumSet.of(StorageService.Verb.BINARY, new StorageService.Verb[]{StorageService.Verb.MUTATION, StorageService.Verb.READ_REPAIR, StorageService.Verb.READ, StorageService.Verb.RANGE_SLICE, StorageService.Verb.REQUEST_RESPONSE});
    private final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
    private final Map<StorageService.Verb, Integer> lastDropped = Collections.synchronizedMap(new EnumMap(StorageService.Verb.class));
    private final Map<StorageService.Verb, Integer> lastDroppedInternal = new EnumMap<StorageService.Verb, Integer>(StorageService.Verb.class);
    private long totalTimeouts = 0L;
    private long recentTotalTimeouts = 0L;
    private final Map<String, AtomicLong> timeoutsPerHost = new HashMap<String, AtomicLong>();
    private final Map<String, AtomicLong> recentTimeoutsPerHost = new HashMap<String, AtomicLong>();
    private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
    private static final long DEFAULT_CALLBACK_TIMEOUT = DatabaseDescriptor.getRpcTimeout();
    private static AtomicInteger idGen = new AtomicInteger(0);

    public static MessagingService instance() {
        return MSHandle.instance;
    }

    private MessagingService() {
        for (StorageService.Verb verb : DROPPABLE_VERBS) {
            this.droppedMessages.put(verb, new AtomicInteger());
            this.lastDropped.put(verb, 0);
            this.lastDroppedInternal.put(verb, 0);
        }
        this.listenGate = new SimpleCondition();
        this.verbHandlers_ = new EnumMap<StorageService.Verb, IVerbHandler>(StorageService.Verb.class);
        Runnable logDropped = new Runnable(){

            @Override
            public void run() {
                MessagingService.this.logDroppedMessages();
            }
        };
        StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, 5000L, 5000L, TimeUnit.MILLISECONDS);
        Function<Pair<String, CallbackInfo>, Object> timeoutReporter = new Function<Pair<String, CallbackInfo>, Object>(){

            public Object apply(Pair<String, CallbackInfo> pair) {
                CallbackInfo expiredCallbackInfo = (CallbackInfo)pair.right;
                MessagingService.this.maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, DatabaseDescriptor.getRpcTimeout());
                MessagingService.this.totalTimeouts++;
                String ip = expiredCallbackInfo.target.getHostAddress();
                AtomicLong c = (AtomicLong)MessagingService.this.timeoutsPerHost.get(ip);
                if (c == null) {
                    c = new AtomicLong();
                    MessagingService.this.timeoutsPerHost.put(ip, c);
                }
                c.incrementAndGet();
                if (MessagingService.this.recentTimeoutsPerHost.get(ip) == null) {
                    MessagingService.this.recentTimeoutsPerHost.put(ip, new AtomicLong());
                }
                if (expiredCallbackInfo.shouldHint()) {
                    assert (expiredCallbackInfo.message != null);
                    try {
                        RowMutation rm = RowMutation.fromBytes(expiredCallbackInfo.message.getMessageBody(), expiredCallbackInfo.message.getVersion());
                        return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null);
                    }
                    catch (IOException e) {
                        logger_.error("Unable to deserialize mutation when writting hint for: " + expiredCallbackInfo.target);
                    }
                }
                return null;
            }
        };
        this.callbacks = new ExpiringMap<String, CallbackInfo>(DEFAULT_CALLBACK_TIMEOUT, timeoutReporter);
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency) {
        if (cb.isLatencyForSnitch()) {
            this.addLatency(address, latency);
        }
    }

    public void addLatency(InetAddress address, double latency) {
        for (ILatencySubscriber subscriber : this.subscribers) {
            subscriber.receiveTiming(address, latency);
        }
    }

    public void convict(InetAddress ep) {
        logger_.debug("Resetting pool for " + ep);
        this.getConnectionPool(ep).reset();
    }

    public void listen(InetAddress localEp) throws IOException, ConfigurationException {
        this.callbacks.reset();
        for (ServerSocket ss : this.getServerSocket(localEp)) {
            SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
            th.start();
            this.socketThreads.add(th);
        }
        this.listenGate.signalAll();
    }

    private List<ServerSocket> getServerSocket(InetAddress localEp) throws IOException, ConfigurationException {
        ArrayList<ServerSocket> ss = new ArrayList<ServerSocket>();
        if (DatabaseDescriptor.getEncryptionOptions().internode_encryption != EncryptionOptions.InternodeEncryption.none) {
            ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getSSLStoragePort()));
            logger_.info("Starting Encrypted Messaging Service on SSL port {}", (Object)DatabaseDescriptor.getSSLStoragePort());
        }
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        ServerSocket socket = serverChannel.socket();
        socket.setReuseAddress(true);
        InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
        try {
            socket.bind(address);
        }
        catch (BindException e) {
            if (e.getMessage().contains("in use")) {
                throw new ConfigurationException(address + " is in use by another process.  Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
            }
            if (e.getMessage().contains("Cannot assign requested address")) {
                throw new ConfigurationException("Unable to bind to address " + address + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
            }
            throw e;
        }
        logger_.info("Starting Messaging Service on port {}", (Object)DatabaseDescriptor.getStoragePort());
        ss.add(socket);
        return ss;
    }

    public void waitUntilListening() {
        try {
            this.listenGate.await();
        }
        catch (InterruptedException ie) {
            logger_.debug("await interrupted");
        }
    }

    public OutboundTcpConnectionPool getConnectionPool(InetAddress to) {
        OutboundTcpConnectionPool cp = (OutboundTcpConnectionPool)this.connectionManagers_.get((Object)to);
        if (cp == null) {
            this.connectionManagers_.putIfAbsent((Object)to, (Object)new OutboundTcpConnectionPool(to));
            cp = (OutboundTcpConnectionPool)this.connectionManagers_.get((Object)to);
        }
        return cp;
    }

    public OutboundTcpConnection getConnection(InetAddress to, Message msg) {
        return this.getConnectionPool(to).getConnection(msg);
    }

    public void registerVerbHandlers(StorageService.Verb verb, IVerbHandler verbHandler) {
        assert (!this.verbHandlers_.containsKey((Object)verb));
        this.verbHandlers_.put(verb, verbHandler);
    }

    public IVerbHandler getVerbHandler(StorageService.Verb type) {
        return this.verbHandlers_.get((Object)type);
    }

    public String addCallback(IMessageCallback cb, Message message, InetAddress to) {
        return this.addCallback(cb, message, to, DEFAULT_CALLBACK_TIMEOUT);
    }

    public String addCallback(IMessageCallback cb, Message message, InetAddress to, long timeout) {
        String messageId = MessagingService.nextId();
        CallbackInfo previous = DatabaseDescriptor.hintedHandoffEnabled() && message.getVerb() == StorageService.Verb.MUTATION ? this.callbacks.put(messageId, new CallbackInfo(to, cb, message), timeout) : this.callbacks.put(messageId, new CallbackInfo(to, cb), timeout);
        assert (previous == null);
        return messageId;
    }

    private static String nextId() {
        return Integer.toString(idGen.incrementAndGet());
    }

    public String sendRR(Message message, InetAddress to, IMessageCallback cb) {
        return this.sendRR(message, to, cb, DEFAULT_CALLBACK_TIMEOUT);
    }

    public String sendRR(Message message, InetAddress to, IMessageCallback cb, long timeout) {
        String id = this.addCallback(cb, message, to, timeout);
        this.sendOneWay(message, id, to);
        return id;
    }

    public void sendOneWay(Message message, InetAddress to) {
        this.sendOneWay(message, MessagingService.nextId(), to);
    }

    public void sendReply(Message message, String id, InetAddress to) {
        this.sendOneWay(message, id, to);
    }

    public String sendRR(MessageProducer producer, InetAddress to, IAsyncCallback cb) {
        try {
            return this.sendRR(producer.getMessage(Gossiper.instance.getVersion(to)), to, (IMessageCallback)cb);
        }
        catch (IOException ex) {
            throw new IOError(ex);
        }
    }

    public void sendOneWay(Message message, String id, InetAddress to) {
        if (logger_.isTraceEnabled()) {
            logger_.trace(FBUtilities.getBroadcastAddress() + " sending " + (Object)((Object)message.getVerb()) + " to " + id + "@" + to);
        }
        if (message.getFrom().equals(to)) {
            this.receive(message, id);
            return;
        }
        Message processedMessage = SinkManager.processClientMessage(message, id, to);
        if (processedMessage == null) {
            return;
        }
        OutboundTcpConnection connection = this.getConnection(to, processedMessage);
        connection.enqueue(processedMessage, id);
    }

    public IAsyncResult sendRR(Message message, InetAddress to) {
        AsyncResult iar = new AsyncResult();
        this.sendRR(message, to, iar);
        return iar;
    }

    public void stream(StreamHeader header, InetAddress to) {
        DebuggableThreadPoolExecutor old;
        DebuggableThreadPoolExecutor executor = (DebuggableThreadPoolExecutor)this.streamExecutors.get(to);
        if (executor == null && (old = this.streamExecutors.putIfAbsent(to, executor = new DebuggableThreadPoolExecutor(0, 1, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("Streaming to " + to)))) != null) {
            executor.shutdown();
            executor = old;
        }
        executor.execute(new FileStreamTask(header, to));
    }

    public void incrementActiveStreamsOutbound() {
        this.activeStreamsOutbound.incrementAndGet();
    }

    public void decrementActiveStreamsOutbound() {
        this.activeStreamsOutbound.decrementAndGet();
    }

    public int getActiveStreamsOutbound() {
        return this.activeStreamsOutbound.get();
    }

    public void register(ILatencySubscriber subcriber) {
        this.subscribers.add(subcriber);
    }

    public void clearCallbacksUnsafe() {
        this.callbacks.reset();
    }

    public void waitForStreaming() throws InterruptedException {
        for (DebuggableThreadPoolExecutor e : this.streamExecutors.values()) {
            e.shutdown();
        }
        for (DebuggableThreadPoolExecutor e : this.streamExecutors.values()) {
            if (!e.awaitTermination(24L, TimeUnit.HOURS)) continue;
            logger_.error("Stream took more than 24H to complete; skipping");
        }
    }

    public void shutdown() {
        logger_.info("Waiting for messaging service to quiesce");
        assert (!StageManager.getStage(Stage.MUTATION).isShutdown());
        this.callbacks.shutdown();
        try {
            for (SocketThread th : this.socketThreads) {
                th.close();
            }
        }
        catch (IOException e) {
            throw new IOError(e);
        }
    }

    public void receive(Message message, String id) {
        if (logger_.isTraceEnabled()) {
            logger_.trace(FBUtilities.getBroadcastAddress() + " received " + (Object)((Object)message.getVerb()) + " from " + id + "@" + message.getFrom());
        }
        if ((message = SinkManager.processServerMessage(message, id)) == null) {
            return;
        }
        MessageDeliveryTask runnable = new MessageDeliveryTask(message, id);
        ThreadPoolExecutor stage = StageManager.getStage(message.getMessageType());
        assert (stage != null) : "No stage for message type " + (Object)((Object)message.getVerb());
        stage.execute(runnable);
    }

    public CallbackInfo removeRegisteredCallback(String messageId) {
        return this.callbacks.remove(messageId);
    }

    public long getRegisteredCallbackAge(String messageId) {
        return this.callbacks.getAge(messageId);
    }

    public static void validateMagic(int magic) throws IOException {
        if (magic != -900387334) {
            throw new IOException("invalid protocol header");
        }
    }

    public static int getBits(int x, int p, int n) {
        return x >>> p + 1 - n & ~(-1 << n);
    }

    public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress, int version) {
        byte[] bytes;
        Object buffer;
        int header = 0;
        header |= serializerType_.ordinal();
        if (compress) {
            header |= 4;
        }
        header |= 8;
        header |= version << 8;
        try {
            buffer = new DataOutputBuffer();
            StreamHeader.serializer().serialize(streamHeader, (DataOutput)buffer, version);
            bytes = ((DataOutputBuffer)buffer).getData();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        assert (bytes.length > 0);
        buffer = ByteBuffer.allocate(12 + bytes.length);
        ((ByteBuffer)buffer).putInt(-900387334);
        ((ByteBuffer)buffer).putInt(header);
        ((ByteBuffer)buffer).putInt(bytes.length);
        ((ByteBuffer)buffer).put(bytes);
        ((ByteBuffer)buffer).flip();
        return buffer;
    }

    public void incrementDroppedMessages(StorageService.Verb verb) {
        assert (DROPPABLE_VERBS.contains((Object)verb)) : "Verb " + (Object)((Object)verb) + " should not legally be dropped";
        this.droppedMessages.get((Object)verb).incrementAndGet();
    }

    private void logDroppedMessages() {
        boolean logTpstats = false;
        for (Map.Entry<StorageService.Verb, AtomicInteger> entry : this.droppedMessages.entrySet()) {
            AtomicInteger dropped = entry.getValue();
            StorageService.Verb verb = entry.getKey();
            int recent = dropped.get() - this.lastDroppedInternal.get((Object)verb);
            if (recent <= 0) continue;
            logTpstats = true;
            logger_.info("{} {} messages dropped in last {}ms", new Object[]{recent, verb, 5000});
            this.lastDroppedInternal.put(verb, dropped.get());
        }
        if (logTpstats) {
            StatusLogger.log();
        }
    }

    @Override
    public Map<String, Integer> getCommandPendingTasks() {
        HashMap<String, Integer> pendingTasks = new HashMap<String, Integer>();
        for (Map.Entry entry : this.connectionManagers_.entrySet()) {
            pendingTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).cmdCon.getPendingMessages());
        }
        return pendingTasks;
    }

    @Override
    public Map<String, Long> getCommandCompletedTasks() {
        HashMap<String, Long> completedTasks = new HashMap<String, Long>();
        for (Map.Entry entry : this.connectionManagers_.entrySet()) {
            completedTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).cmdCon.getCompletedMesssages());
        }
        return completedTasks;
    }

    @Override
    public Map<String, Long> getCommandDroppedTasks() {
        HashMap<String, Long> droppedTasks = new HashMap<String, Long>();
        for (Map.Entry entry : this.connectionManagers_.entrySet()) {
            droppedTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).cmdCon.getDroppedMessages());
        }
        return droppedTasks;
    }

    @Override
    public Map<String, Integer> getResponsePendingTasks() {
        HashMap<String, Integer> pendingTasks = new HashMap<String, Integer>();
        for (Map.Entry entry : this.connectionManagers_.entrySet()) {
            pendingTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).ackCon.getPendingMessages());
        }
        return pendingTasks;
    }

    @Override
    public Map<String, Long> getResponseCompletedTasks() {
        HashMap<String, Long> completedTasks = new HashMap<String, Long>();
        for (Map.Entry entry : this.connectionManagers_.entrySet()) {
            completedTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).ackCon.getCompletedMesssages());
        }
        return completedTasks;
    }

    public static long getDefaultCallbackTimeout() {
        return DEFAULT_CALLBACK_TIMEOUT;
    }

    @Override
    public Map<String, Integer> getDroppedMessages() {
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        for (Map.Entry<StorageService.Verb, AtomicInteger> entry : this.droppedMessages.entrySet()) {
            map.put(entry.getKey().toString(), entry.getValue().get());
        }
        return map;
    }

    @Override
    public Map<String, Integer> getRecentlyDroppedMessages() {
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        for (Map.Entry<StorageService.Verb, AtomicInteger> entry : this.droppedMessages.entrySet()) {
            StorageService.Verb verb = entry.getKey();
            Integer dropped = entry.getValue().get();
            Integer recentlyDropped = dropped - this.lastDropped.get((Object)verb);
            map.put(verb.toString(), recentlyDropped);
            this.lastDropped.put(verb, dropped);
        }
        return map;
    }

    @Override
    public long getTotalTimeouts() {
        return this.totalTimeouts;
    }

    @Override
    public long getRecentTotalTimouts() {
        long recent = this.totalTimeouts - this.recentTotalTimeouts;
        this.recentTotalTimeouts = this.totalTimeouts;
        return recent;
    }

    @Override
    public Map<String, Long> getTimeoutsPerHost() {
        HashMap<String, Long> result = new HashMap<String, Long>();
        for (Map.Entry<String, AtomicLong> entry : this.timeoutsPerHost.entrySet()) {
            result.put(entry.getKey(), entry.getValue().get());
        }
        return result;
    }

    @Override
    public Map<String, Long> getRecentTimeoutsPerHost() {
        HashMap<String, Long> result = new HashMap<String, Long>();
        for (Map.Entry<String, AtomicLong> entry : this.recentTimeoutsPerHost.entrySet()) {
            String ip = entry.getKey();
            AtomicLong recent = entry.getValue();
            Long timeout = this.timeoutsPerHost.get(ip).get();
            result.put(ip, timeout - recent.getAndSet(timeout));
        }
        return result;
    }

    private static class SocketThread
    extends Thread {
        private final ServerSocket server;

        SocketThread(ServerSocket server, String name) {
            super(name);
            this.server = server;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Socket socket = this.server.accept();
                    new IncomingTcpConnection(socket).start();
                }
            }
            catch (AsynchronousCloseException e) {
                logger_.info("MessagingService shutting down server thread.");
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        void close() throws IOException {
            this.server.close();
        }
    }

    private static class MSHandle {
        public static final MessagingService instance = new MessagingService();

        private MSHandle() {
        }
    }
}

