/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.IStreamCallback;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.streaming.StreamOutSession;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.UUIDSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingRepairTask
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class);
    private static final ConcurrentMap<UUID, StreamingRepairTask> tasks = new ConcurrentHashMap<UUID, StreamingRepairTask>();
    public static final StreamingRepairTaskSerializer serializer = new StreamingRepairTaskSerializer();
    public final UUID id;
    private final InetAddress owner;
    public final InetAddress src;
    public final InetAddress dst;
    private final String tableName;
    private final String cfName;
    private final Collection<Range<Token>> ranges;
    private final IStreamCallback callback;

    private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, IStreamCallback callback) {
        this.id = id;
        this.owner = owner;
        this.src = src;
        this.dst = dst;
        this.tableName = tableName;
        this.cfName = cfName;
        this.ranges = ranges;
        this.callback = callback;
    }

    public static StreamingRepairTask create(InetAddress ep1, InetAddress ep2, String tableName, String cfName, Collection<Range<Token>> ranges, Runnable callback) {
        InetAddress local = FBUtilities.getBroadcastAddress();
        UUID id = UUIDGen.getTimeUUID();
        InetAddress src = ep2.equals(local) ? ep2 : ep1;
        InetAddress dst = ep2.equals(local) ? ep1 : ep2;
        StreamingRepairTask task = new StreamingRepairTask(id, local, src, dst, tableName, cfName, ranges, StreamingRepairTask.wrapCallback(callback, id, local.equals(src)));
        tasks.put(id, task);
        return task;
    }

    public boolean isLocalTask() {
        return this.owner.equals(this.src);
    }

    @Override
    public void run() {
        if (this.src.equals(FBUtilities.getBroadcastAddress())) {
            this.initiateStreaming();
        } else {
            this.forwardToSource();
        }
    }

    private void initiateStreaming() {
        ColumnFamilyStore cfstore = Table.open(this.tableName).getColumnFamilyStore(this.cfName);
        try {
            logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", this.id, this.ranges.size(), this.dst));
            Collection<SSTableReader> sstables = cfstore.markCurrentSSTablesReferenced();
            StreamOutSession outsession = StreamOutSession.create(this.tableName, this.dst, this.callback);
            StreamOut.transferSSTables(outsession, sstables, this.ranges, OperationType.AES);
            StreamIn.requestRanges(this.dst, this.tableName, Collections.singleton(cfstore), this.ranges, this.callback, OperationType.AES);
        }
        catch (Exception e) {
            throw new RuntimeException("Streaming repair failed", e);
        }
    }

    private void forwardToSource() {
        logger.info(String.format("[streaming task #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", this.id, this.ranges.size(), this.src, this.dst));
        MessageOut<StreamingRepairTask> msg = new MessageOut<StreamingRepairTask>(MessagingService.Verb.STREAMING_REPAIR_REQUEST, this, serializer);
        MessagingService.instance().sendOneWay(msg, this.src);
    }

    private static IStreamCallback makeReplyingCallback(final InetAddress taskOwner, final UUID taskId) {
        return new IStreamCallback(){
            private final AtomicInteger outstanding = new AtomicInteger(2);

            @Override
            public void onSuccess() {
                if (this.outstanding.decrementAndGet() > 0) {
                    return;
                }
                StreamingRepairResponse.reply(taskOwner, taskId);
            }

            @Override
            public void onFailure() {
            }
        };
    }

    private static IStreamCallback wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask) {
        return new IStreamCallback(){
            private final AtomicInteger outstanding;
            {
                this.outstanding = new AtomicInteger(isLocalTask ? 2 : 1);
            }

            @Override
            public void onSuccess() {
                if (this.outstanding.decrementAndGet() > 0) {
                    return;
                }
                tasks.remove(taskid);
                if (callback != null) {
                    callback.run();
                }
            }

            @Override
            public void onFailure() {
            }
        };
    }

    private static class StreamingRepairTaskSerializer
    implements IVersionedSerializer<StreamingRepairTask> {
        private StreamingRepairTaskSerializer() {
        }

        @Override
        public void serialize(StreamingRepairTask task, DataOutput dos, int version) throws IOException {
            UUIDSerializer.serializer.serialize(task.id, dos, version);
            CompactEndpointSerializationHelper.serialize(task.owner, dos);
            CompactEndpointSerializationHelper.serialize(task.src, dos);
            CompactEndpointSerializationHelper.serialize(task.dst, dos);
            dos.writeUTF(task.tableName);
            dos.writeUTF(task.cfName);
            dos.writeInt(task.ranges.size());
            for (Range range : task.ranges) {
                AbstractBounds.serializer.serialize(range, dos, version);
            }
        }

        @Override
        public StreamingRepairTask deserialize(DataInput dis, int version) throws IOException {
            UUID id = UUIDSerializer.serializer.deserialize(dis, version);
            InetAddress owner = CompactEndpointSerializationHelper.deserialize(dis);
            InetAddress src = CompactEndpointSerializationHelper.deserialize(dis);
            InetAddress dst = CompactEndpointSerializationHelper.deserialize(dis);
            String tableName = dis.readUTF();
            String cfName = dis.readUTF();
            int rangesCount = dis.readInt();
            ArrayList<Range> ranges = new ArrayList<Range>(rangesCount);
            for (int i = 0; i < rangesCount; ++i) {
                ranges.add((Range)((AbstractBounds)AbstractBounds.serializer.deserialize(dis, version)).toTokenBounds());
            }
            return new StreamingRepairTask(id, owner, src, dst, tableName, cfName, ranges, StreamingRepairTask.makeReplyingCallback(owner, id));
        }

        @Override
        public long serializedSize(StreamingRepairTask task, int version) {
            long size = UUIDSerializer.serializer.serializedSize(task.id, version);
            size += (long)(3 * CompactEndpointSerializationHelper.serializedSize(task.owner));
            size += (long)TypeSizes.NATIVE.sizeof(task.tableName);
            size += (long)TypeSizes.NATIVE.sizeof(task.cfName);
            size += (long)TypeSizes.NATIVE.sizeof(task.ranges.size());
            for (Range range : task.ranges) {
                size += AbstractBounds.serializer.serializedSize(range, version);
            }
            return size;
        }
    }

    public static class StreamingRepairResponse
    implements IVerbHandler<UUID> {
        @Override
        public void doVerb(MessageIn<UUID> message, String id) {
            UUID taskid = (UUID)message.payload;
            StreamingRepairTask task = (StreamingRepairTask)tasks.get(taskid);
            if (task == null) {
                logger.error(String.format("Received a stream repair response from %s for unknow taks %s (have this node been restarted recently?)", message.from, taskid));
                return;
            }
            assert (task.owner.equals(FBUtilities.getBroadcastAddress()));
            logger.info(String.format("[streaming task #%s] task succeeded", task.id));
            if (task.callback != null) {
                task.callback.onSuccess();
            }
        }

        private static void reply(InetAddress remote, UUID taskid) {
            logger.info(String.format("[streaming task #%s] task suceed, forwarding response to %s", taskid, remote));
            MessageOut<UUID> message = new MessageOut<UUID>(MessagingService.Verb.STREAMING_REPAIR_RESPONSE, taskid, UUIDSerializer.serializer);
            MessagingService.instance().sendOneWay(message, remote);
        }
    }

    public static class StreamingRepairRequest
    implements IVerbHandler<StreamingRepairTask> {
        @Override
        public void doVerb(MessageIn<StreamingRepairTask> message, String id) {
            StreamingRepairTask task = (StreamingRepairTask)message.payload;
            assert (task.src.equals(FBUtilities.getBroadcastAddress()));
            assert (task.owner.equals(message.from));
            logger.info(String.format("[streaming task #%s] Received task from %s to stream %d ranges to %s", task.id, message.from, task.ranges.size(), task.dst));
            task.run();
        }
    }
}

