/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import com.google.common.util.concurrent.Uninterruptibles;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Checksum;
import net.jpountz.lz4.LZ4BlockOutputStream;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundTcpConnectionPool;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.CoalescingStrategies;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
import org.apache.cassandra.utils.UUIDGen;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.SnappyOutputStream;

public class OutboundTcpConnection
extends Thread {
    private static final Logger logger;
    private static final String PREFIX = "cassandra.";
    private static final String INTRADC_TCP_NODELAY_PROPERTY = "cassandra.otc_intradc_tcp_nodelay";
    private static final boolean INTRADC_TCP_NODELAY;
    private static final String BUFFER_SIZE_PROPERTY = "cassandra.otc_buffer_size";
    private static final int BUFFER_SIZE;
    private static final MessageOut CLOSE_SENTINEL;
    private volatile boolean isStopped = false;
    private static final int OPEN_RETRY_DELAY = 100;
    public static final int WAIT_FOR_VERSION_MAX_TIME = 5000;
    private static final int NO_VERSION = Integer.MIN_VALUE;
    static final int LZ4_HASH_SEED = -1756908916;
    private final BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<QueuedMessage>();
    private final OutboundTcpConnectionPool poolReference;
    private final CoalescingStrategies.CoalescingStrategy cs;
    private DataOutputStreamPlus out;
    private Socket socket;
    private volatile long completed;
    private final AtomicLong dropped = new AtomicLong();
    private volatile int currentMsgBufferCount = 0;
    private int targetVersion;

    private static CoalescingStrategies.CoalescingStrategy newCoalescingStrategy(String displayName) {
        return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(), DatabaseDescriptor.getOtcCoalescingWindow(), logger, displayName);
    }

    public OutboundTcpConnection(OutboundTcpConnectionPool pool) {
        super("MessagingService-Outgoing-" + pool.endPoint());
        this.poolReference = pool;
        this.cs = OutboundTcpConnection.newCoalescingStrategy(pool.endPoint().getHostAddress());
    }

    private static boolean isLocalDC(InetAddress targetHost) {
        String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
        String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
        return remoteDC.equals(localDC);
    }

    public void enqueue(MessageOut<?> message, int id) {
        if (this.backlog.size() > 1024) {
            this.expireMessages();
        }
        try {
            this.backlog.put(new QueuedMessage(message, id));
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)e);
        }
    }

    void closeSocket(boolean destroyThread) {
        this.backlog.clear();
        this.isStopped = destroyThread;
        this.enqueue(CLOSE_SENTINEL, -1);
    }

    void softCloseSocket() {
        this.enqueue(CLOSE_SENTINEL, -1);
    }

    public int getTargetVersion() {
        return this.targetVersion;
    }

    @Override
    public void run() {
        int drainedMessageSize = 128;
        ArrayList drainedMessages = new ArrayList(128);
        block4: while (true) {
            try {
                this.cs.coalesce(this.backlog, drainedMessages, 128);
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
            this.currentMsgBufferCount = drainedMessages.size();
            int count = drainedMessages.size();
            for (QueuedMessage qm : drainedMessages) {
                try {
                    MessageOut<?> m = qm.message;
                    if (m == CLOSE_SENTINEL) {
                        this.disconnect();
                        if (!this.isStopped) continue;
                        break block4;
                    }
                    if (qm.isTimedOut()) {
                        this.dropped.incrementAndGet();
                    } else if (this.socket != null || this.connect()) {
                        this.writeConnected(qm, count == 1 && this.backlog.isEmpty());
                    } else {
                        this.backlog.clear();
                    }
                }
                catch (Exception e) {
                    JVMStabilityInspector.inspectThrowable(e);
                    logger.error("error processing a message intended for {}", (Object)this.poolReference.endPoint(), (Object)e);
                }
                this.currentMsgBufferCount = --count;
            }
            drainedMessages.clear();
        }
    }

    public int getPendingMessages() {
        return this.backlog.size() + this.currentMsgBufferCount;
    }

    public long getCompletedMesssages() {
        return this.completed;
    }

    public long getDroppedMessages() {
        return this.dropped.get();
    }

    private boolean shouldCompressConnection() {
        return DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all || DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !OutboundTcpConnection.isLocalDC(this.poolReference.endPoint());
    }

    private void writeConnected(QueuedMessage qm, boolean flush) {
        block11: {
            try {
                byte[] sessionBytes = qm.message.parameters.get("TraceSession");
                if (sessionBytes != null) {
                    UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                    TraceState state = Tracing.instance.get(sessionId);
                    String message = String.format("Sending %s message to %s", new Object[]{qm.message.verb, this.poolReference.endPoint()});
                    if (state == null) {
                        TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
                    } else {
                        state.trace(message);
                        if (qm.message.verb == MessagingService.Verb.REQUEST_RESPONSE) {
                            Tracing.instance.doneWithNonLocalSession(state);
                        }
                    }
                }
                long timestampMillis = NanoTimeToCurrentTimeMillis.convert(qm.timestampNanos);
                this.writeInternal(qm.message, qm.id, timestampMillis);
                ++this.completed;
                if (flush) {
                    this.out.flush();
                }
            }
            catch (Exception e) {
                this.disconnect();
                if (e instanceof IOException) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("error writing to {}", (Object)this.poolReference.endPoint(), (Object)e);
                    }
                    if (!qm.shouldRetry()) break block11;
                    try {
                        this.backlog.put(new RetriedQueuedMessage(qm));
                    }
                    catch (InterruptedException e1) {
                        throw new AssertionError((Object)e1);
                    }
                }
                logger.error("error writing to {}", (Object)this.poolReference.endPoint(), (Object)e);
            }
        }
    }

    private void writeInternal(MessageOut message, int id, long timestamp) throws IOException {
        this.out.writeInt(-900387334);
        if (this.targetVersion < 7) {
            this.out.writeUTF(String.valueOf(id));
        } else {
            this.out.writeInt(id);
        }
        this.out.writeInt((int)timestamp);
        message.serialize(this.out, this.targetVersion);
    }

    private static void writeHeader(DataOutput out, int version, boolean compressionEnabled) throws IOException {
        int header = 0;
        if (compressionEnabled) {
            header |= 4;
        }
        out.writeInt(header |= version << 8);
    }

    private void disconnect() {
        if (this.socket != null) {
            block4: {
                try {
                    this.socket.close();
                    if (logger.isTraceEnabled()) {
                        logger.trace("Socket to {} closed", (Object)this.poolReference.endPoint());
                    }
                }
                catch (IOException e) {
                    if (!logger.isTraceEnabled()) break block4;
                    logger.trace("exception closing connection to " + this.poolReference.endPoint(), (Throwable)e);
                }
            }
            this.out = null;
            this.socket = null;
        }
    }

    private boolean connect() {
        if (logger.isDebugEnabled()) {
            logger.debug("attempting to connect to {}", (Object)this.poolReference.endPoint());
        }
        long start = System.nanoTime();
        long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
        while (System.nanoTime() - start < timeout) {
            this.targetVersion = MessagingService.instance().getVersion(this.poolReference.endPoint());
            try {
                this.socket = this.poolReference.newSocket();
                this.socket.setKeepAlive(true);
                if (OutboundTcpConnection.isLocalDC(this.poolReference.endPoint())) {
                    this.socket.setTcpNoDelay(INTRADC_TCP_NODELAY);
                } else {
                    this.socket.setTcpNoDelay(DatabaseDescriptor.getInterDCTcpNoDelay());
                }
                if (DatabaseDescriptor.getInternodeSendBufferSize() != null) {
                    try {
                        this.socket.setSendBufferSize(DatabaseDescriptor.getInternodeSendBufferSize());
                    }
                    catch (SocketException se) {
                        logger.warn("Failed to set send buffer size on internode socket.", (Throwable)se);
                    }
                }
                this.out = new DataOutputStreamPlus(new BufferedOutputStream(this.socket.getOutputStream(), BUFFER_SIZE));
                this.out.writeInt(-900387334);
                OutboundTcpConnection.writeHeader(this.out, this.targetVersion, this.shouldCompressConnection());
                this.out.flush();
                DataInputStream in = new DataInputStream(this.socket.getInputStream());
                int maxTargetVersion = this.handshakeVersion(in);
                if (maxTargetVersion == Integer.MIN_VALUE) {
                    logger.debug("Target max version is {}; no version information yet, will retry", (Object)maxTargetVersion);
                    if (DatabaseDescriptor.getSeeds().contains(this.poolReference.endPoint())) {
                        logger.warn("Seed gossip version is {}; will not connect with that version", (Object)maxTargetVersion);
                    }
                    this.disconnect();
                    continue;
                }
                MessagingService.instance().setVersion(this.poolReference.endPoint(), maxTargetVersion);
                if (this.targetVersion > maxTargetVersion) {
                    logger.debug("Target max version is {}; will reconnect with that version", (Object)maxTargetVersion);
                    this.disconnect();
                    return false;
                }
                if (this.targetVersion < maxTargetVersion && this.targetVersion < 8) {
                    logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done", (Object)maxTargetVersion, (Object)this.targetVersion);
                    this.softCloseSocket();
                }
                this.out.writeInt(8);
                CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), this.out);
                if (this.shouldCompressConnection()) {
                    this.out.flush();
                    logger.trace("Upgrading OutputStream to be compressed");
                    if (this.targetVersion < 8) {
                        this.out = new DataOutputStreamPlus((OutputStream)new SnappyOutputStream(this.socket.getOutputStream()));
                    } else {
                        LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
                        Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(-1756908916).asChecksum();
                        this.out = new DataOutputStreamPlus((OutputStream)new LZ4BlockOutputStream(this.socket.getOutputStream(), 16384, compressor, checksum, true));
                    }
                }
                return true;
            }
            catch (IOException e) {
                this.socket = null;
                if (logger.isTraceEnabled()) {
                    logger.trace("unable to connect to " + this.poolReference.endPoint(), (Throwable)e);
                }
                Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
            }
        }
        return false;
    }

    private int handshakeVersion(final DataInputStream inputStream) {
        final AtomicInteger version = new AtomicInteger(Integer.MIN_VALUE);
        final CountDownLatch versionLatch = new CountDownLatch(1);
        new Thread("HANDSHAKE-" + this.poolReference.endPoint()){

            @Override
            public void run() {
                try {
                    logger.info("Handshaking version with {}", (Object)OutboundTcpConnection.this.poolReference.endPoint());
                    version.set(inputStream.readInt());
                }
                catch (IOException ex) {
                    String msg = "Cannot handshake version with " + OutboundTcpConnection.this.poolReference.endPoint();
                    if (logger.isTraceEnabled()) {
                        logger.trace(msg, (Throwable)ex);
                    } else {
                        logger.info(msg);
                    }
                }
                finally {
                    versionLatch.countDown();
                }
            }
        }.start();
        try {
            versionLatch.await(5000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ex) {
            throw new AssertionError((Object)ex);
        }
        return version.get();
    }

    private void expireMessages() {
        Iterator iter = this.backlog.iterator();
        while (iter.hasNext()) {
            QueuedMessage qm = (QueuedMessage)iter.next();
            if (!qm.droppable) continue;
            if (!qm.isTimedOut()) {
                return;
            }
            iter.remove();
            this.dropped.incrementAndGet();
        }
    }

    static {
        String strategy;
        logger = LoggerFactory.getLogger(OutboundTcpConnection.class);
        INTRADC_TCP_NODELAY = Boolean.valueOf(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true"));
        BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 65536);
        switch (strategy = DatabaseDescriptor.getOtcCoalescingStrategy()) {
            case "TIMEHORIZON": {
                break;
            }
            case "MOVINGAVERAGE": 
            case "FIXED": 
            case "DISABLED": {
                logger.info("OutboundTcpConnection using coalescing strategy " + strategy);
                break;
            }
            default: {
                OutboundTcpConnection.newCoalescingStrategy("dummy");
            }
        }
        int coalescingWindow = DatabaseDescriptor.getOtcCoalescingWindow();
        if (coalescingWindow != 200) {
            logger.info("OutboundTcpConnection coalescing window set to " + coalescingWindow + "\u03bcs");
        }
        if (coalescingWindow < 0) {
            throw new ExceptionInInitializerError("Value provided for coalescing window must be greather than 0: " + coalescingWindow);
        }
        CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
    }

    private static class RetriedQueuedMessage
    extends QueuedMessage {
        RetriedQueuedMessage(QueuedMessage msg) {
            super(msg.message, msg.id);
        }

        @Override
        boolean shouldRetry() {
            return false;
        }
    }

    private static class QueuedMessage
    implements CoalescingStrategies.Coalescable {
        final MessageOut<?> message;
        final int id;
        final long timestampNanos;
        final boolean droppable;

        QueuedMessage(MessageOut<?> message, int id) {
            this.message = message;
            this.id = id;
            this.timestampNanos = System.nanoTime();
            this.droppable = MessagingService.DROPPABLE_VERBS.contains((Object)message.verb);
        }

        boolean isTimedOut() {
            return this.droppable && this.timestampNanos < System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(this.message.getTimeout());
        }

        boolean shouldRetry() {
            return !this.droppable;
        }

        @Override
        public long timestampNanos() {
            return this.timestampNanos;
        }
    }
}

