package org.apache.cassandra.streaming;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.messages.CompleteMessage;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.streaming.messages.IncomingFileMessage;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.streaming.messages.PrepareMessage;
import org.apache.cassandra.streaming.messages.ReceivedMessage;
import org.apache.cassandra.streaming.messages.RetryMessage;
import org.apache.cassandra.streaming.messages.SessionFailedMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/StreamSession.class */
public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener {
    private static final Logger logger;
    private static final DebuggableThreadPoolExecutor streamExecutor;
    public final InetAddress peer;
    private StreamResultFuture streamResult;
    private final StreamingMetrics metrics;
    private final StreamConnectionFactory factory;
    private int retries;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
    private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap();
    private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap();
    private AtomicBoolean isAborted = new AtomicBoolean(false);
    private volatile State state = State.INITIALIZED;
    private volatile boolean completeSent = false;
    public final ConnectionHandler handler = new ConnectionHandler(this);

    /* loaded from: input_file:org/apache/cassandra/streaming/StreamSession$SSTableStreamingSections.class */
    public static class SSTableStreamingSections {
        public final SSTableReader sstable;
        public final List<Pair<Long, Long>> sections;
        public final long estimatedKeys;

        public SSTableStreamingSections(SSTableReader sSTableReader, List<Pair<Long, Long>> list, long j) {
            this.sstable = sSTableReader;
            this.sections = list;
            this.estimatedKeys = j;
        }
    }

    /* loaded from: input_file:org/apache/cassandra/streaming/StreamSession$State.class */
    public enum State {
        INITIALIZED,
        PREPARING,
        STREAMING,
        WAIT_COMPLETE,
        COMPLETE,
        FAILED
    }

    public StreamSession(InetAddress inetAddress, StreamConnectionFactory streamConnectionFactory) {
        this.peer = inetAddress;
        this.factory = streamConnectionFactory;
        this.metrics = StreamingMetrics.get(inetAddress);
    }

    public UUID planId() {
        if (this.streamResult == null) {
            return null;
        }
        return this.streamResult.planId;
    }

    public String description() {
        if (this.streamResult == null) {
            return null;
        }
        return this.streamResult.description;
    }

    public void init(StreamResultFuture streamResultFuture) {
        this.streamResult = streamResultFuture;
        Gossiper.instance.register(this);
        FailureDetector.instance.registerFailureDetectionEventListener(this);
    }

    public void start() {
        if (!this.requests.isEmpty() || !this.transfers.isEmpty()) {
            streamExecutor.execute(new Runnable() { // from class: org.apache.cassandra.streaming.StreamSession.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        StreamSession.this.handler.initiate();
                        StreamSession.this.onInitializationComplete();
                    } catch (IOException e) {
                        StreamSession.this.onError(e);
                    }
                }
            });
        } else {
            logger.info("[Stream #{}] Session does not have any tasks.", planId());
            closeSession(State.COMPLETE);
        }
    }

    public Socket createConnection() throws IOException {
        if ($assertionsDisabled || this.factory != null) {
            return this.factory.createConnection(this.peer);
        }
        throw new AssertionError();
    }

    public void addStreamRequest(String str, Collection<Range<Token>> collection, Collection<String> collection2) {
        this.requests.add(new StreamRequest(str, collection, collection2));
    }

    public void addTransferRanges(String str, Collection<Range<Token>> collection, Collection<String> collection2, boolean z) {
        Collection<ColumnFamilyStore> hashSet = new HashSet<>();
        if (collection2.isEmpty()) {
            hashSet.addAll(Keyspace.open(str).getColumnFamilyStores());
        } else {
            Iterator<String> it = collection2.iterator();
            while (it.hasNext()) {
                hashSet.add(Keyspace.open(str).getColumnFamilyStore(it.next()));
            }
        }
        if (z) {
            flushSSTables(hashSet);
        }
        List<SSTableStreamingSections> sSTableSectionsForRanges = getSSTableSectionsForRanges(Range.normalize(collection), hashSet);
        try {
            addTransferFiles(sSTableSectionsForRanges);
            Iterator<SSTableStreamingSections> it2 = sSTableSectionsForRanges.iterator();
            while (it2.hasNext()) {
                it2.next().sstable.releaseReference();
            }
        } catch (Throwable th) {
            Iterator<SSTableStreamingSections> it3 = sSTableSectionsForRanges.iterator();
            while (it3.hasNext()) {
                it3.next().sstable.releaseReference();
            }
            throw th;
        }
    }

    private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> collection, Collection<ColumnFamilyStore> collection2) {
        ArrayList<SSTableReader> arrayList = new ArrayList();
        try {
            for (ColumnFamilyStore columnFamilyStore : collection2) {
                ArrayList arrayList2 = new ArrayList(collection.size());
                Iterator<Range<Token>> it = collection.iterator();
                while (it.hasNext()) {
                    arrayList2.add(it.next().toRowBounds());
                }
                arrayList.addAll(columnFamilyStore.markReferenced(arrayList2).sstables);
            }
            ArrayList arrayList3 = new ArrayList(arrayList.size());
            for (SSTableReader sSTableReader : arrayList) {
                arrayList3.add(new SSTableStreamingSections(sSTableReader, sSTableReader.getPositionsForRanges(collection), sSTableReader.estimatedKeysForRanges(collection)));
            }
            return arrayList3;
        } catch (Throwable th) {
            SSTableReader.releaseReferences(arrayList);
            throw th;
        }
    }

    public void addTransferFiles(Collection<SSTableStreamingSections> collection) {
        Iterator<SSTableStreamingSections> it = collection.iterator();
        while (it.hasNext()) {
            SSTableStreamingSections next = it.next();
            if (next.sections.isEmpty()) {
                next.sstable.releaseReference();
                it.remove();
            } else {
                UUID uuid = next.sstable.metadata.cfId;
                StreamTransferTask streamTransferTask = this.transfers.get(uuid);
                if (streamTransferTask == null) {
                    streamTransferTask = new StreamTransferTask(this, uuid);
                    this.transfers.put(uuid, streamTransferTask);
                }
                streamTransferTask.addTransferFile(next.sstable, next.estimatedKeys, next.sections);
                it.remove();
            }
        }
    }

    private synchronized void closeSession(State state) {
        if (this.isAborted.compareAndSet(false, true)) {
            state(state);
            if (state == State.FAILED) {
                Iterator it = Iterables.concat(this.receivers.values(), this.transfers.values()).iterator();
                while (it.hasNext()) {
                    ((StreamTask) it.next()).abort();
                }
            }
            this.handler.close();
            Gossiper.instance.unregister(this);
            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
            this.streamResult.handleSessionComplete(this);
        }
    }

    public void state(State state) {
        this.state = state;
    }

    public State state() {
        return this.state;
    }

    public boolean isSuccess() {
        return this.state == State.COMPLETE;
    }

    public void messageReceived(StreamMessage streamMessage) {
        switch (streamMessage.type) {
            case PREPARE:
                PrepareMessage prepareMessage = (PrepareMessage) streamMessage;
                prepare(prepareMessage.requests, prepareMessage.summaries);
                return;
            case FILE:
                receive((IncomingFileMessage) streamMessage);
                return;
            case RECEIVED:
                ReceivedMessage receivedMessage = (ReceivedMessage) streamMessage;
                received(receivedMessage.cfId, receivedMessage.sequenceNumber);
                return;
            case RETRY:
                RetryMessage retryMessage = (RetryMessage) streamMessage;
                retry(retryMessage.cfId, retryMessage.sequenceNumber);
                return;
            case COMPLETE:
                complete();
                return;
            case SESSION_FAILED:
                sessionFailed();
                return;
            default:
                return;
        }
    }

    public void onInitializationComplete() {
        state(State.PREPARING);
        PrepareMessage prepareMessage = new PrepareMessage();
        prepareMessage.requests.addAll(this.requests);
        Iterator<StreamTransferTask> it = this.transfers.values().iterator();
        while (it.hasNext()) {
            prepareMessage.summaries.add(it.next().getSummary());
        }
        this.handler.sendMessage(prepareMessage);
        if (this.requests.isEmpty()) {
            startStreamingFiles();
        }
    }

    public void onError(Throwable th) {
        logger.error("[Stream #" + planId() + "] Streaming error occurred", th);
        if (this.handler.isOutgoingConnected()) {
            this.handler.sendMessage(new SessionFailedMessage());
        }
        closeSession(State.FAILED);
    }

    public void prepare(Collection<StreamRequest> collection, Collection<StreamSummary> collection2) {
        state(State.PREPARING);
        for (StreamRequest streamRequest : collection) {
            addTransferRanges(streamRequest.keyspace, streamRequest.ranges, streamRequest.columnFamilies, true);
        }
        Iterator<StreamSummary> it = collection2.iterator();
        while (it.hasNext()) {
            prepareReceiving(it.next());
        }
        if (!collection.isEmpty()) {
            PrepareMessage prepareMessage = new PrepareMessage();
            Iterator<StreamTransferTask> it2 = this.transfers.values().iterator();
            while (it2.hasNext()) {
                prepareMessage.summaries.add(it2.next().getSummary());
            }
            this.handler.sendMessage(prepareMessage);
        }
        if (maybeCompleted()) {
            return;
        }
        startStreamingFiles();
    }

    public void fileSent(FileMessageHeader fileMessageHeader) {
        long size = fileMessageHeader.size();
        StreamingMetrics.totalOutgoingBytes.inc(size);
        this.metrics.outgoingBytes.inc(size);
        StreamTransferTask streamTransferTask = this.transfers.get(fileMessageHeader.cfId);
        if (streamTransferTask != null) {
            streamTransferTask.scheduleTimeout(fileMessageHeader.sequenceNumber, 12L, TimeUnit.HOURS);
        }
    }

    public void receive(IncomingFileMessage incomingFileMessage) {
        long size = incomingFileMessage.header.size();
        StreamingMetrics.totalIncomingBytes.inc(size);
        this.metrics.incomingBytes.inc(size);
        this.handler.sendMessage(new ReceivedMessage(incomingFileMessage.header.cfId, incomingFileMessage.header.sequenceNumber));
        this.receivers.get(incomingFileMessage.header.cfId).received(incomingFileMessage.sstable);
    }

    public void progress(Descriptor descriptor, ProgressInfo.Direction direction, long j, long j2) {
        this.streamResult.handleProgress(new ProgressInfo(this.peer, descriptor.filenameFor(Component.DATA), direction, j, j2));
    }

    public void received(UUID uuid, int i) {
        this.transfers.get(uuid).complete(i);
    }

    public void retry(UUID uuid, int i) {
        this.handler.sendMessage(this.transfers.get(uuid).createMessageForRetry(i));
    }

    public synchronized void complete() {
        if (this.state != State.WAIT_COMPLETE) {
            state(State.WAIT_COMPLETE);
            return;
        }
        if (!this.completeSent) {
            this.handler.sendMessage(new CompleteMessage());
            this.completeSent = true;
        }
        closeSession(State.COMPLETE);
    }

    public synchronized void sessionFailed() {
        closeSession(State.FAILED);
    }

    public void doRetry(FileMessageHeader fileMessageHeader, Throwable th) {
        logger.warn("[Stream #" + planId() + "] Retrying for following error", th);
        this.retries++;
        if (this.retries > DatabaseDescriptor.getMaxStreamingRetries()) {
            onError(new IOException("Too many retries for " + fileMessageHeader, th));
        } else {
            this.handler.sendMessage(new RetryMessage(fileMessageHeader.cfId, fileMessageHeader.sequenceNumber));
        }
    }

    public SessionInfo getSessionInfo() {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<StreamReceiveTask> it = this.receivers.values().iterator();
        while (it.hasNext()) {
            newArrayList.add(it.next().getSummary());
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator<StreamTransferTask> it2 = this.transfers.values().iterator();
        while (it2.hasNext()) {
            newArrayList2.add(it2.next().getSummary());
        }
        return new SessionInfo(this.peer, newArrayList, newArrayList2, this.state);
    }

    public synchronized void taskCompleted(StreamReceiveTask streamReceiveTask) {
        this.receivers.remove(streamReceiveTask.cfId);
        maybeCompleted();
    }

    public synchronized void taskCompleted(StreamTransferTask streamTransferTask) {
        this.transfers.remove(streamTransferTask.cfId);
        maybeCompleted();
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onJoin(InetAddress inetAddress, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void beforeChange(InetAddress inetAddress, EndpointState endpointState, ApplicationState applicationState, VersionedValue versionedValue) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onChange(InetAddress inetAddress, ApplicationState applicationState, VersionedValue versionedValue) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onAlive(InetAddress inetAddress, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onDead(InetAddress inetAddress, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRemove(InetAddress inetAddress) {
        convict(inetAddress, Double.MAX_VALUE);
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRestart(InetAddress inetAddress, EndpointState endpointState) {
        convict(inetAddress, Double.MAX_VALUE);
    }

    @Override // org.apache.cassandra.gms.IFailureDetectionEventListener
    public void convict(InetAddress inetAddress, double d) {
        if (inetAddress.equals(this.peer) && d >= 100.0d * DatabaseDescriptor.getPhiConvictThreshold()) {
            closeSession(State.FAILED);
        }
    }

    private boolean maybeCompleted() {
        boolean z = this.receivers.isEmpty() && this.transfers.isEmpty();
        if (z) {
            if (this.state == State.WAIT_COMPLETE) {
                if (!this.completeSent) {
                    this.handler.sendMessage(new CompleteMessage());
                    this.completeSent = true;
                }
                closeSession(State.COMPLETE);
            } else {
                this.handler.sendMessage(new CompleteMessage());
                this.completeSent = true;
                state(State.WAIT_COMPLETE);
            }
        }
        return z;
    }

    private void flushSSTables(Iterable<ColumnFamilyStore> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<ColumnFamilyStore> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().forceFlush());
        }
        FBUtilities.waitOnFutures(arrayList);
    }

    private void prepareReceiving(StreamSummary streamSummary) {
        if (streamSummary.files > 0) {
            this.receivers.put(streamSummary.cfId, new StreamReceiveTask(this, streamSummary.cfId, streamSummary.files, streamSummary.totalSize));
        }
    }

    private void startStreamingFiles() {
        this.streamResult.handleSessionPrepared(this);
        state(State.STREAMING);
        for (StreamTransferTask streamTransferTask : this.transfers.values()) {
            Collection<OutgoingFileMessage> fileMessages = streamTransferTask.getFileMessages();
            if (fileMessages.size() > 0) {
                this.handler.sendMessages(fileMessages);
            } else {
                taskCompleted(streamTransferTask);
            }
        }
    }

    static {
        $assertionsDisabled = !StreamSession.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(StreamSession.class);
        streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", FBUtilities.getAvailableProcessors());
    }
}
