/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.DefsTable;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MigrationManager
implements IEndpointStateChangeSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
    private static final int MIGRATION_REQUEST_RETRIES = 3;

    @Override
    public void onJoin(InetAddress endpoint, EndpointState epState) {
    }

    @Override
    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {
        if (state != ApplicationState.SCHEMA || endpoint.equals(FBUtilities.getBroadcastAddress())) {
            return;
        }
        MigrationManager.rectifySchema(UUID.fromString(value.value), endpoint);
    }

    @Override
    public void onAlive(InetAddress endpoint, EndpointState state) {
        VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
        if (value != null) {
            MigrationManager.rectifySchema(UUID.fromString(value.value), endpoint);
        }
    }

    @Override
    public void onDead(InetAddress endpoint, EndpointState state) {
    }

    @Override
    public void onRestart(InetAddress endpoint, EndpointState state) {
    }

    @Override
    public void onRemove(InetAddress endpoint) {
    }

    public static void rectifySchema(UUID theirVersion, final InetAddress endpoint) {
        if (Schema.instance.getVersion().equals(theirVersion)) {
            return;
        }
        StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                Message message = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.MIGRATION_REQUEST, ArrayUtils.EMPTY_BYTE_ARRAY, Gossiper.instance.getVersion(endpoint));
                for (int retries = 0; retries < 3; ++retries) {
                    if (!FailureDetector.instance.isAlive(endpoint)) {
                        logger.error("Can't send migration request: node {} is down.", (Object)endpoint);
                        return;
                    }
                    IAsyncResult iar = MessagingService.instance().sendRR(message, endpoint);
                    try {
                        byte[] reply = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
                        DefsTable.mergeRemoteSchema(reply, message.getVersion());
                        return;
                    }
                    catch (TimeoutException e) {
                        continue;
                    }
                }
            }
        });
    }

    public static boolean isReadyForBootstrap() {
        return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0;
    }

    private static void pushSchemaMutation(InetAddress endpoint, Collection<RowMutation> schema) {
        try {
            Message msg = MigrationManager.makeMigrationMessage(schema, Gossiper.instance.getVersion(endpoint));
            MessagingService.instance().sendOneWay(msg, endpoint);
        }
        catch (IOException ex) {
            throw new IOError(ex);
        }
    }

    public static void announce(Collection<RowMutation> schema) {
        for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) {
            if (endpoint.equals(FBUtilities.getBroadcastAddress())) continue;
            MigrationManager.pushSchemaMutation(endpoint, schema);
        }
    }

    public static void passiveAnnounce(UUID version) {
        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
        logger.debug("Gossiping my schema version " + version);
    }

    private static Message makeMigrationMessage(Collection<RowMutation> schema, int version) throws IOException {
        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.DEFINITIONS_UPDATE, MigrationManager.serializeSchema(schema, version), version);
    }

    public static byte[] serializeSchema(Collection<RowMutation> schema, int version) throws IOException {
        FastByteArrayOutputStream bout = new FastByteArrayOutputStream();
        DataOutputStream dout = new DataOutputStream(bout);
        dout.writeInt(schema.size());
        for (RowMutation mutation : schema) {
            RowMutation.serializer().serialize(mutation, (DataOutput)dout, version);
        }
        dout.close();
        return bout.toByteArray();
    }

    public static Collection<RowMutation> deserializeMigrationMessage(byte[] data, int version) throws IOException {
        if (version < 4) {
            throw new IOException("Can't accept schema migrations from Cassandra versions previous to 1.1, please update first.");
        }
        ArrayList<RowMutation> schema = new ArrayList<RowMutation>();
        DataInputStream in = new DataInputStream(new FastByteArrayInputStream(data));
        int count = in.readInt();
        for (int i = 0; i < count; ++i) {
            schema.add(RowMutation.serializer().deserialize(in, version));
        }
        return schema;
    }
}

