package org.apache.cassandra.streaming;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.streaming.StreamReply;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/StreamInSession.class */
public class StreamInSession {
    private static final Logger logger;
    private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions;
    private final Pair<InetAddress, Long> context;
    private final Runnable callback;
    private String table;
    private PendingFile current;
    private Socket socket;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Set<PendingFile> files = new NonBlockingHashSet();
    private final List<SSTableReader> readers = new ArrayList();

    private StreamInSession(Pair<InetAddress, Long> pair, Runnable runnable) {
        this.context = pair;
        this.callback = runnable;
    }

    public static StreamInSession create(InetAddress inetAddress, Runnable runnable) {
        Pair<InetAddress, Long> pair = new Pair<>(inetAddress, Long.valueOf(System.nanoTime()));
        StreamInSession streamInSession = new StreamInSession(pair, runnable);
        sessions.put(pair, streamInSession);
        return streamInSession;
    }

    public static StreamInSession get(InetAddress inetAddress, long j) {
        Pair<InetAddress, Long> pair = new Pair<>(inetAddress, Long.valueOf(j));
        StreamInSession streamInSession = sessions.get(pair);
        if (streamInSession == null) {
            StreamInSession streamInSession2 = new StreamInSession(pair, null);
            StreamInSession putIfAbsent = sessions.putIfAbsent(pair, streamInSession2);
            streamInSession = putIfAbsent;
            if (putIfAbsent == null) {
                streamInSession = streamInSession2;
            }
        }
        return streamInSession;
    }

    public void setCurrentFile(PendingFile pendingFile) {
        this.current = pendingFile;
    }

    public void setTable(String str) {
        this.table = str;
    }

    public void setSocket(Socket socket) {
        this.socket = socket;
    }

    public void addFiles(Collection<PendingFile> collection) {
        for (PendingFile pendingFile : collection) {
            if (logger.isDebugEnabled()) {
                logger.debug("Adding file {} to Stream Request queue", pendingFile.getFilename());
            }
            this.files.add(pendingFile);
        }
    }

    public void finished(PendingFile pendingFile, SSTableReader sSTableReader) throws IOException {
        if (logger.isDebugEnabled()) {
            logger.debug("Finished {} (from {}). Sending ack to {}", new Object[]{pendingFile, getHost(), this});
        }
        if (!$assertionsDisabled && sSTableReader == null) {
            throw new AssertionError();
        }
        this.readers.add(sSTableReader);
        this.files.remove(pendingFile);
        if (pendingFile.equals(this.current)) {
            this.current = null;
        }
        StreamReply streamReply = new StreamReply(pendingFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED);
        sendMessage(streamReply.getMessage(Gossiper.instance.getVersion(getHost())));
        logger.debug("ack {} sent for {}", streamReply, pendingFile);
    }

    public void retry(PendingFile pendingFile) throws IOException {
        StreamReply streamReply = new StreamReply(pendingFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
        logger.info("Streaming of file {} from {} failed: requesting a retry.", pendingFile, this);
        sendMessage(streamReply.getMessage(Gossiper.instance.getVersion(getHost())));
    }

    public void sendMessage(Message message) throws IOException {
        OutboundTcpConnection.write(message, String.valueOf(getSessionId()), new DataOutputStream(this.socket.getOutputStream()));
    }

    public void closeIfFinished() throws IOException {
        if (this.files.isEmpty()) {
            HashMap hashMap = new HashMap();
            try {
                for (SSTableReader sSTableReader : this.readers) {
                    if (!$assertionsDisabled && !sSTableReader.getTableName().equals(this.table)) {
                        throw new AssertionError();
                    }
                    if (!sSTableReader.acquireReference()) {
                        throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transfered");
                    }
                    ColumnFamilyStore columnFamilyStore = Table.open(sSTableReader.getTableName()).getColumnFamilyStore(sSTableReader.getColumnFamilyName());
                    columnFamilyStore.addSSTable(sSTableReader);
                    if (!hashMap.containsKey(columnFamilyStore)) {
                        hashMap.put(columnFamilyStore, new ArrayList());
                    }
                    ((List) hashMap.get(columnFamilyStore)).add(sSTableReader);
                }
                for (Map.Entry entry : hashMap.entrySet()) {
                    if (entry.getKey() != null) {
                        ((ColumnFamilyStore) entry.getKey()).indexManager.maybeBuildSecondaryIndexes((Collection) entry.getValue(), ((ColumnFamilyStore) entry.getKey()).indexManager.getIndexedColumns());
                    }
                }
                StreamReply streamReply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);
                logger.info("Finished streaming session {} from {}", Long.valueOf(getSessionId()), getHost());
                try {
                    if (this.socket != null) {
                        OutboundTcpConnection.write(streamReply.getMessage(Gossiper.instance.getVersion(getHost())), this.context.right.toString(), new DataOutputStream(this.socket.getOutputStream()));
                    } else {
                        logger.debug("No socket to reply to {} with!", getHost());
                    }
                    if (this.callback != null) {
                        this.callback.run();
                    }
                    sessions.remove(this.context);
                } finally {
                    if (this.socket != null) {
                        this.socket.close();
                    }
                }
            } finally {
                Iterator it = hashMap.values().iterator();
                while (it.hasNext()) {
                    SSTableReader.releaseReferences((List) it.next());
                }
            }
        }
    }

    public long getSessionId() {
        return this.context.right.longValue();
    }

    public InetAddress getHost() {
        return this.context.left;
    }

    public static Set<InetAddress> getSources() {
        HashSet hashSet = new HashSet();
        Iterator<StreamInSession> it = sessions.values().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getHost());
        }
        return hashSet;
    }

    public static Set<PendingFile> getIncomingFiles(InetAddress inetAddress) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<Pair<InetAddress, Long>, StreamInSession> entry : sessions.entrySet()) {
            if (entry.getKey().left.equals(inetAddress)) {
                StreamInSession value = entry.getValue();
                if (value.current != null) {
                    hashSet.add(value.current);
                }
                hashSet.addAll(value.files);
            }
        }
        return hashSet;
    }

    static {
        $assertionsDisabled = !StreamInSession.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(StreamInSession.class);
        sessions = new NonBlockingHashMap();
    }
}
