package org.voltdb.join;

import com.google_voltpatches.common.base.Charsets;
import com.google_voltpatches.common.base.Supplier;
import com.google_voltpatches.common.base.Throwables;
import com.google_voltpatches.common.collect.ArrayListMultimap;
import com.google_voltpatches.common.collect.ImmutableList;
import com.google_voltpatches.common.collect.Maps;
import com.google_voltpatches.common.collect.Multimap;
import com.google_voltpatches.common.collect.Multimaps;
import com.google_voltpatches.common.collect.Ordering;
import com.google_voltpatches.common.collect.Sets;
import com.google_voltpatches.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;
import org.apache.zookeeper_voltpatches.CreateMode;
import org.apache.zookeeper_voltpatches.KeeperException;
import org.apache.zookeeper_voltpatches.WatchedEvent;
import org.apache.zookeeper_voltpatches.Watcher;
import org.apache.zookeeper_voltpatches.ZooDefs;
import org.apache.zookeeper_voltpatches.ZooKeeper;
import org.apache.zookeeper_voltpatches.data.Stat;
import org.json_voltpatches.JSONException;
import org.json_voltpatches.JSONObject;
import org.voltcore.logging.VoltLogger;
import org.voltcore.messaging.HostMessenger;
import org.voltcore.utils.CoreUtils;
import org.voltcore.zk.CoreZK;
import org.voltcore.zk.LeaderElector;
import org.voltcore.zk.LeaderNoticeHandler;
import org.voltcore.zk.ZKUtil;
import org.voltdb.AbstractTopology;
import org.voltdb.CatalogContext;
import org.voltdb.ClientInterface;
import org.voltdb.ElasticHashinator;
import org.voltdb.ExportStatsBase;
import org.voltdb.SimpleClientResponseAdapter;
import org.voltdb.SnapshotCompletionInterest;
import org.voltdb.SnapshotFormat;
import org.voltdb.StoredProcedureInvocation;
import org.voltdb.TheHashinator;
import org.voltdb.VoltDB;
import org.voltdb.VoltProZK;
import org.voltdb.VoltSystemProcedure;
import org.voltdb.VoltTable;
import org.voltdb.VoltZK;
import org.voltdb.catalog.Database;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ProcedureCallback;
import org.voltdb.client.SyncCallback;
import org.voltdb.common.Constants;
import org.voltdb.iv2.Cartographer;
import org.voltdb.jni.ExecutionEngine;
import org.voltdb.join.ElasticOperationUtils;
import org.voltdb.rejoin.JoinCoordinator;
import org.voltdb.settings.ClusterSettings;
import org.voltdb.settings.Settings;
import org.voltdb.settings.SettingsException;
import org.voltdb.sysprocs.saverestore.SnapshotPathType;
import org.voltdb.sysprocs.saverestore.SnapshotUtil;
import org.voltdb.utils.CatalogUtil;

/* loaded from: input_file:org/voltdb/join/ElasticJoinCoordinator.class */
public class ElasticJoinCoordinator implements LeaderNoticeHandler, ElasticJoinService {
    private static final VoltLogger log;
    static final String zkNode = "/db/elastic_join/joining";
    private static final int EXECUTETASK_TIMEOUT = 30000;
    private final LeaderElector m_leaderElector;
    private final HostMessenger m_messenger;
    private final ZooKeeper m_zk;
    private final Cartographer m_cartographer;
    private final String m_clSnapshotPath;
    private final Supplier<ClusterSettings> m_clusterSettings;
    private volatile JSONObject m_topo;
    private int m_topoVersion;
    private final int m_expectedHostCount;
    private final ClientInterface m_ci;
    private static final int kStateIdle = -1;
    private static final int kStateInitializing = 0;
    private static final int kStateSnapshotting = 1;
    private static final int kStateDRElasticAdd = 2;
    private static final int kStateIndexing = 3;
    private static final int kStateMigrating = 4;
    private int m_state;
    private String m_snapshotNonce;
    private ElasticHashinator m_goalHashinator;
    private ElasticHashinator m_mpiHashinator;
    private final BalancePartitionsStatistics m_stats;
    private Map<Long, Long> m_snapshotSinkHSIds;
    private Map<Integer, Long> m_lowestSiteSinkHSIds;
    private List<Integer> m_partitionsToAdd;
    private Future<Object> m_migrateFuture;
    private Future<SnapshotCompletionInterest.SnapshotCompletionEvent> m_indexBuildFuture;
    private static final long PING_INTERVAL_MS = 200;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean m_isLeader = false;
    private final Collection<Integer> m_joiningHosts = new HashSet();
    private final ExecutorService m_es = CoreUtils.getCachedSingleThreadExecutor("Elastic join coordinator", 15000);
    private ZKUtil.CancellableWatcher m_watcher = getCancellableWatcher();
    private ZKUtil.CancellableWatcher m_childrenWatcher = getCancellableChildWatcher();
    private volatile boolean m_snapshotFailed = false;
    private final SnapshotUtil.SnapshotResponseHandler m_snapshotResponseHandler = new SnapshotUtil.SnapshotResponseHandler() { // from class: org.voltdb.join.ElasticJoinCoordinator.3
        static final /* synthetic */ boolean $assertionsDisabled;

        @Override // org.voltdb.sysprocs.saverestore.SnapshotUtil.SnapshotResponseHandler
        public void handleResponse(ClientResponse clientResponse) {
            if (clientResponse == null) {
                ElasticJoinCoordinator.log.error("Failed to initiate snapshot");
                ElasticJoinCoordinator.this.m_snapshotFailed = true;
            } else if (clientResponse.getStatus() != 1) {
                ElasticJoinCoordinator.log.error("Failed to initiate snapshot: " + clientResponse.getStatusString());
                ElasticJoinCoordinator.this.m_snapshotFailed = true;
            }
            if (!$assertionsDisabled && clientResponse == null) {
                throw new AssertionError();
            }
            VoltTable[] results = clientResponse.getResults();
            if (!SnapshotUtil.didSnapshotRequestSucceed(results)) {
                ElasticJoinCoordinator.this.m_snapshotFailed = true;
            } else if (clientResponse.getAppStatusString() == null) {
                ElasticJoinCoordinator.log.error("Snapshot request failed: " + clientResponse.getStatusString());
                ElasticJoinCoordinator.this.m_snapshotFailed = true;
            }
            if (ElasticJoinCoordinator.this.m_snapshotFailed && results != null) {
                for (VoltTable voltTable : clientResponse.getResults()) {
                    ElasticJoinCoordinator.log.error(voltTable);
                }
            }
            if (ElasticJoinCoordinator.this.m_snapshotFailed) {
                ElasticJoinCoordinator.this.m_es.execute(new Runnable() { // from class: org.voltdb.join.ElasticJoinCoordinator.3.1
                    @Override // java.lang.Runnable
                    public void run() {
                        ElasticJoinCoordinator.this.reset();
                    }
                });
            }
        }

        static {
            $assertionsDisabled = !ElasticJoinCoordinator.class.desiredAssertionStatus();
        }
    };
    private final Runnable m_pingWork = new Runnable() { // from class: org.voltdb.join.ElasticJoinCoordinator.9
        @Override // java.lang.Runnable
        public void run() {
            if (ElasticJoinCoordinator.this.m_state == 0) {
                ElasticJoinCoordinator.this.pingPartitions(false);
                ElasticJoinCoordinator.this.schedulePingWork();
            }
        }
    };
    private final SimpleClientResponseAdapter m_adapter = new SimpleClientResponseAdapter(ClientInterface.ELASTIC_JOIN_CID, "ElasticJoinCoordinatorAdaptor");

    private ZKUtil.CancellableWatcher getCancellableWatcher() {
        return new ZKUtil.CancellableWatcher(this.m_es) { // from class: org.voltdb.join.ElasticJoinCoordinator.1
            @Override // org.voltcore.zk.ZKUtil.CancellableWatcher
            public void pProcess(final WatchedEvent watchedEvent) {
                if (ElasticJoinCoordinator.this.m_isLeader) {
                    ElasticJoinCoordinator.this.m_es.submit(new Runnable() { // from class: org.voltdb.join.ElasticJoinCoordinator.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                ElasticJoinCoordinator.this.m_zk.exists(ElasticJoinCoordinator.zkNode, ElasticJoinCoordinator.this.m_watcher);
                            } catch (Exception e) {
                                ElasticJoinCoordinator.log.error("Error setting up new watch on the elastic join ZK node", e);
                            }
                            if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                                ElasticJoinCoordinator.this.handleNodeDataChange();
                            }
                        }
                    });
                }
            }
        };
    }

    private ZKUtil.CancellableWatcher getCancellableChildWatcher() {
        return new ZKUtil.CancellableWatcher(this.m_es) { // from class: org.voltdb.join.ElasticJoinCoordinator.2
            @Override // org.voltcore.zk.ZKUtil.CancellableWatcher
            public void pProcess(WatchedEvent watchedEvent) {
                if (ElasticJoinCoordinator.this.m_isLeader) {
                    ElasticJoinCoordinator.this.m_es.submit(new Runnable() { // from class: org.voltdb.join.ElasticJoinCoordinator.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                List<String> children = ElasticJoinCoordinator.this.m_zk.getChildren(ElasticJoinCoordinator.zkNode, ElasticJoinCoordinator.this.m_childrenWatcher);
                                if (children.isEmpty()) {
                                    ElasticJoinCoordinator.this.reset();
                                } else {
                                    ElasticJoinCoordinator.this.handleReadyHostsChanged(children);
                                }
                            } catch (KeeperException e) {
                                if (e.code() != KeeperException.Code.NONODE) {
                                    ElasticJoinCoordinator.log.error("Failed to get the children of ZK join node", e);
                                }
                            } catch (Exception e2) {
                                ElasticJoinCoordinator.log.error("Error setting up new children watch on the elastic join ZK node", e2);
                            }
                        }
                    });
                }
            }
        };
    }

    public ElasticJoinCoordinator(HostMessenger hostMessenger, ClientInterface clientInterface, Cartographer cartographer, BalancePartitionsStatistics balancePartitionsStatistics, String str, int i, Supplier<ClusterSettings> supplier) throws InterruptedException, ExecutionException, KeeperException {
        this.m_messenger = hostMessenger;
        this.m_zk = hostMessenger.getZK();
        this.m_ci = clientInterface;
        this.m_cartographer = cartographer;
        this.m_stats = balancePartitionsStatistics;
        this.m_clSnapshotPath = str;
        this.m_expectedHostCount = i + 1;
        this.m_clusterSettings = supplier;
        this.m_ci.bindAdapter(this.m_adapter, null);
        VoltProZK.createPersistentZKNodes(this.m_zk);
        this.m_leaderElector = new LeaderElector(this.m_zk, VoltProZK.leaders_elastic_coordinator, "elasticjoin", null, this);
        this.m_leaderElector.start(true);
    }

    @Override // org.voltcore.zk.LeaderNoticeHandler
    public void becomeLeader() {
        log.info("This node is promoted as the leader of elastic join coordinators");
        this.m_isLeader = true;
        this.m_es.submit(new Runnable() { // from class: org.voltdb.join.ElasticJoinCoordinator.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ElasticJoinCoordinator.this.resetInternalStates();
                    ElasticJoinCoordinator.this.resumeState();
                } catch (Throwable th) {
                    VoltDB.crashLocalVoltDB("Exception ", true, th);
                }
            }
        });
    }

    @Override // org.voltcore.zk.LeaderNoticeHandler
    public void noticedTopologyChange(boolean z, boolean z2) {
        if (z2 && this.m_isLeader) {
            this.m_es.submit(new Runnable() { // from class: org.voltdb.join.ElasticJoinCoordinator.5
                @Override // java.lang.Runnable
                public void run() {
                    ElasticJoinCoordinator.this.checkAndAbortJoin();
                }
            });
        }
    }

    @Override // org.voltdb.join.ElasticJoinService
    public void shutdown() {
        try {
            this.m_leaderElector.shutdown();
            this.m_es.shutdown();
            this.m_es.awaitTermination(356L, TimeUnit.DAYS);
        } catch (Exception e) {
            log.error("Failed to shutdown elastic join coordinator", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkAndAbortJoin() {
        if (this.m_state == -1 || this.m_state == 4) {
            return false;
        }
        log.error("Cluster is in the process of resolving node failures, terminating the join in progress. Please retry.");
        this.m_messenger.sendPoisonPill(this.m_joiningHosts, "Elastic join request rejected. Either another join is in progress or the cluster is still rebalancing partitions after command log recovery. It's also possible that the join failed and was rolled back by killing this new node. Please retry later.", 2);
        reset();
        return true;
    }

    private void abortBlockedJoin(String str) {
        log.error(str);
        this.m_messenger.sendPoisonPill(this.m_joiningHosts, "Elastic join request rejected. " + str, 2);
        reset();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resumeState() {
        log.debug("Scanning ZK joining node to determine the current state");
        ElasticJoinNodeInfo elasticJoinNodeInfo = null;
        try {
            elasticJoinNodeInfo = new ElasticJoinNodeInfo(this.m_zk.getData(zkNode, this.m_watcher, (Stat) null));
        } catch (KeeperException e) {
            if (e.code() == KeeperException.Code.NONODE) {
                createZKNode();
                return;
            }
            VoltDB.crashLocalVoltDB("Failed to get ZK node data", true, e);
        } catch (Exception e2) {
            VoltDB.crashLocalVoltDB("Failed to check joining host count", true, e2);
        }
        if (!$assertionsDisabled && elasticJoinNodeInfo == null) {
            throw new AssertionError();
        }
        Set<Integer> readyHosts = elasticJoinNodeInfo.getReadyHosts();
        Object obj = "";
        if (readyHosts.size() == this.m_expectedHostCount) {
            this.m_topo = elasticJoinNodeInfo.getTopology();
            this.m_goalHashinator = new ElasticHashinator(elasticJoinNodeInfo.getHashinatorConfig(), true);
            this.m_joiningHosts.clear();
            this.m_joiningHosts.addAll(readyHosts);
            this.m_state = 0;
            obj = "Detected initializing join state";
        }
        if (elasticJoinNodeInfo.getInitializedElasticCoordinatorHSIds().size() == this.m_expectedHostCount) {
            this.m_snapshotSinkHSIds = elasticJoinNodeInfo.getSnapshotSinkHSIds();
            this.m_state = 1;
            obj = "Detected snapshotting join state";
        }
        if (elasticJoinNodeInfo.getFinishedElasticCoordinatorHSIds().size() == this.m_expectedHostCount) {
            this.m_state = 2;
            obj = "Detected DR Elastic Add Event state";
        }
        if (this.m_state == 2 && isDrElasticChangeEventsComplete()) {
            this.m_state = 4;
            obj = "Detected migrating join state";
        }
        if (checkAndAbortJoin()) {
            log.info(obj);
            return;
        }
        if (this.m_state == 4) {
            VoltZK.createActionBlocker(this.m_zk, VoltZK.elasticJoinInProgress, CreateMode.EPHEMERAL, log, "elastic join");
            log.info("Resuming data migration");
        }
        handleStateTransitioned();
    }

    private void resetZK() {
        try {
            ZKUtil.deleteRecursively(this.m_zk, zkNode);
        } catch (InterruptedException e) {
            log.error("Interrupted while deleting elastic join ZK node", e);
        } catch (KeeperException e2) {
            if (e2.code() != KeeperException.Code.NONODE) {
                log.error("Error deleting elastic join ZK node", e2);
            }
        }
        VoltZK.removeActionBlocker(this.m_zk, VoltZK.elasticJoinInProgress, log);
        VoltZK.removeActionBlocker(this.m_zk, VoltZK.elasticJoinMigration, log);
        createZKNode();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetInternalStates() {
        try {
        } catch (CancellationException e) {
        } catch (Exception e2) {
            VoltDB.crashLocalVoltDB("Unexpected exception resetting join coordinator", true, e2);
        } finally {
            this.m_migrateFuture = null;
        }
        if (this.m_migrateFuture != null) {
            this.m_migrateFuture.cancel(false);
            this.m_migrateFuture.get();
        }
        this.m_watcher.cancel();
        this.m_watcher = getCancellableWatcher();
        this.m_childrenWatcher.cancel();
        this.m_childrenWatcher = getCancellableChildWatcher();
        this.m_indexBuildFuture = null;
        this.m_topo = null;
        this.m_topoVersion = -1;
        this.m_state = -1;
        this.m_partitionsToAdd = null;
        this.m_joiningHosts.clear();
        this.m_snapshotNonce = null;
        this.m_goalHashinator = null;
        this.m_mpiHashinator = null;
        this.m_snapshotSinkHSIds = null;
        this.m_lowestSiteSinkHSIds = null;
        this.m_snapshotFailed = false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reset() {
        log.info("Resetting join state to allow new nodes to join");
        resetInternalStates();
        resetZK();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleReadyHostsChanged(List<String> list) throws KeeperException, InterruptedException, JSONException {
        do {
            Stat stat = new Stat();
            ElasticJoinNodeInfo elasticJoinNodeInfo = new ElasticJoinNodeInfo(this.m_zk.getData(zkNode, false, stat));
            HashSet newHashSet = Sets.newHashSet(elasticJoinNodeInfo.getReadyHosts());
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                newHashSet.remove(Integer.valueOf(Integer.parseInt(it.next())));
            }
            if (!newHashSet.isEmpty()) {
                Iterator it2 = newHashSet.iterator();
                while (it2.hasNext()) {
                    int intValue = ((Integer) it2.next()).intValue();
                    log.info("Removing failed joining host " + intValue);
                    elasticJoinNodeInfo.removeReadyHost(intValue);
                    this.m_joiningHosts.remove(Integer.valueOf(intValue));
                }
            }
            try {
                this.m_zk.setData(zkNode, elasticJoinNodeInfo.toBytes(), stat.getVersion());
                return;
            } catch (KeeperException e) {
            }
        } while (e.code() == KeeperException.Code.BADVERSION);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleNodeDataChange() {
        ElasticJoinNodeInfo elasticJoinNodeInfo = null;
        try {
            elasticJoinNodeInfo = new ElasticJoinNodeInfo(this.m_zk.getData(zkNode, false, (Stat) null));
        } catch (Exception e) {
            VoltDB.crashLocalVoltDB("Failed to retrieve joining information from ZK", true, e);
        }
        int i = this.m_state;
        switch (this.m_state) {
            case -1:
                checkParticipantsReady(elasticJoinNodeInfo);
                checkResumeMigration(elasticJoinNodeInfo);
                break;
            case 0:
                checkNodeInitialized(elasticJoinNodeInfo);
                break;
            case 1:
                checkParticipantsFinished(elasticJoinNodeInfo);
                break;
        }
        if (this.m_state != i) {
            handleStateTransitioned();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleStateTransitioned() {
        if (log.isDebugEnabled()) {
            log.debug("Transitioning to state " + this.m_state);
        }
        switch (this.m_state) {
            case -1:
            default:
                return;
            case 0:
                activateJoin();
                return;
            case 1:
                preSnapshot();
                startSnapshot();
                return;
            case 2:
                generateElasticChangeEvent();
                return;
            case 3:
                buildIndex();
                return;
            case 4:
                preMigrationWork();
                migrateData();
                return;
        }
    }

    private void preMigrationWork() {
        try {
            SyncCallback syncCallback = new SyncCallback();
            this.m_ci.getInternalConnectionHandler().callProcedure(this.m_ci.getInternalUser(), true, -1, (ProcedureCallback) syncCallback, "@RestartDRConsumerNT", new Object[0]);
            syncCallback.waitForResponse();
            ClientResponse response = syncCallback.getResponse();
            if (response.getStatus() != 1) {
                log.error("Failed to restart DR consumer " + (response.getStatusString() != null ? response.getStatusString() : ""));
                return;
            }
            if (response.getResults().length == 1) {
                VoltTable voltTable = response.getResults()[0];
                HashSet hashSet = new HashSet();
                while (voltTable.advanceRow()) {
                    if (voltTable.getLong(ExportStatsBase.Columns.STATUS) != 1) {
                        hashSet.add(Integer.valueOf((int) voltTable.getLong(VoltSystemProcedure.CNAME_HOST_ID)));
                    }
                }
                if (hashSet.isEmpty()) {
                    return;
                }
                log.error("Failed to restart DR consumer on the following hosts: " + hashSet);
            }
        } catch (Throwable th) {
            log.error("Failed to restart DR consumer: " + th.getMessage(), th);
        }
    }

    private void activateJoin() {
        try {
            if (CoreZK.isPartitionCleanupInProgress(this.m_zk)) {
                checkAndAbortJoin();
                return;
            }
            String createActionBlocker = VoltZK.createActionBlocker(this.m_zk, VoltZK.elasticJoinInProgress, CreateMode.EPHEMERAL, log, "elastic join");
            if (createActionBlocker != null) {
                abortBlockedJoin(createActionBlocker);
                return;
            }
            Stat stat = new Stat();
            this.m_topo = new JSONObject(new String(this.m_zk.getData(VoltZK.topology, false, stat), Charsets.UTF_8));
            if (log.isDebugEnabled()) {
                log.debug("Topology before joining:" + this.m_topo.toString(4));
            }
            this.m_topoVersion = stat.getVersion();
            AbstractTopology abstractTopology = AbstractTopology.topologyFromJSON(this.m_topo);
            if (log.isDebugEnabled()) {
                log.debug("Topology before joining:" + abstractTopology.topologyToJSON().toString(4));
            }
            Map<Integer, HostMessenger.HostInfo> hostInfoMapFromZK = this.m_messenger.getHostInfoMapFromZK();
            HashMap newHashMap = Maps.newHashMap();
            HashMap newHashMap2 = Maps.newHashMap();
            hostInfoMapFromZK.forEach((num, hostInfo) -> {
                newHashMap.put(num, hostInfo.m_group);
                newHashMap2.put(num, Integer.valueOf(hostInfo.m_localSitesCount));
            });
            LongAdder longAdder = new LongAdder();
            Stream<Integer> parallelStream = newHashMap2.values().parallelStream();
            longAdder.getClass();
            parallelStream.forEach((v1) -> {
                r1.add(v1);
            });
            int intValue = longAdder.intValue() / (abstractTopology.getReplicationFactor() + 1);
            this.m_partitionsToAdd = ImmutableList.copyOf((Collection) Cartographer.getPartitionsToAdd(this.m_zk, intValue));
            if (!$assertionsDisabled && !Ordering.natural().isOrdered(this.m_partitionsToAdd)) {
                throw new AssertionError();
            }
            log.info("Start joining host with new partitions to add " + this.m_partitionsToAdd);
            this.m_goalHashinator = new ElasticHashinator(TheHashinator.getConfigureBytes(intValue), false);
            this.m_snapshotNonce = JoinCoordinator.makeSnapshotNonce("Join", 0L);
            Multimap<Integer, Integer> partitionToHosts = getPartitionToHosts(newHashMap2);
            if (log.isDebugEnabled()) {
                log.debug("new partitions to hosts:" + partitionToHosts.toString());
            }
            ArrayListMultimap create = ArrayListMultimap.create();
            Multimaps.invertFrom(partitionToHosts, create);
            if (log.isDebugEnabled()) {
                log.debug("new hosts to partitions:" + create.toString());
            }
            AbstractTopology.HostDescription[] hostDescriptionArr = new AbstractTopology.HostDescription[create.asMap().size()];
            int i = 0;
            for (Map.Entry entry : create.asMap().entrySet()) {
                int intValue2 = ((Integer) entry.getKey()).intValue();
                int i2 = i;
                i++;
                hostDescriptionArr[i2] = new AbstractTopology.HostDescription(intValue2, ((Collection) entry.getValue()).size(), (String) newHashMap.get(Integer.valueOf(intValue2)));
            }
            AbstractTopology mutateAddHosts = AbstractTopology.mutateAddHosts(abstractTopology, hostDescriptionArr);
            if (log.isDebugEnabled()) {
                log.debug("Topology with new hosts:" + mutateAddHosts.topologyToJSON().toString(4));
            }
            AbstractTopology.PartitionDescription[] partitionDescriptionArr = new AbstractTopology.PartitionDescription[this.m_partitionsToAdd.size()];
            for (int i3 = 0; i3 < this.m_partitionsToAdd.size(); i3++) {
                partitionDescriptionArr[i3] = new AbstractTopology.PartitionDescription(this.m_expectedHostCount - 1);
            }
            this.m_topo = AbstractTopology.mutateAddPartitionsToEmptyHosts(mutateAddHosts, new HashSet(), partitionDescriptionArr).topologyToJSON();
            if (log.isDebugEnabled()) {
                log.debug("Topology with new partition:" + this.m_topo.toString(4));
            }
            notifyPartitionAssignmentAndSnapshotNonce();
            schedulePingWork();
        } catch (Exception e) {
            log.error("Failed to activate elastic join. Aborting", e);
            checkAndAbortJoin();
        }
    }

    private void createZKNode() {
        try {
            log.debug("Creating elastic join ZK node");
            this.m_zk.create(zkNode, new ElasticJoinNodeInfo().toBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            this.m_zk.exists(zkNode, this.m_watcher);
            this.m_zk.getChildren(zkNode, this.m_childrenWatcher);
            log.debug("Created ZK elastic join node");
        } catch (InterruptedException e) {
            VoltDB.crashLocalVoltDB("Interrupted during join", true, e);
        } catch (KeeperException e2) {
            VoltDB.crashLocalVoltDB("Failed to create joining node to ZooKeeper", true, e2);
        } catch (JSONException e3) {
            VoltDB.crashLocalVoltDB("Failed to serialize joining node info", true, e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Multimap<Integer, ElasticRangeOwnership> calculateRanges() {
        if (this.m_mpiHashinator == null) {
            this.m_mpiHashinator = null;
            while (this.m_mpiHashinator == null) {
                this.m_mpiHashinator = ElasticOperationUtils.retrieveMPIHashinator(this.m_ci, this.m_adapter.connectionId(), this.m_adapter.getSyncCallbackSupplier());
                if (this.m_mpiHashinator == null) {
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e) {
                        Throwables.propagate(e);
                    }
                }
            }
        }
        return ElasticOperationUtils.diffHashinators(this.m_mpiHashinator, this.m_goalHashinator);
    }

    private void buildIndex() {
        updateHostCountInClusterSettings();
        try {
            Multimap<Integer, ElasticRangeOwnership> calculateRanges = calculateRanges();
            String indexSnapshotNonce = getIndexSnapshotNonce();
            final ListenableFuture<SnapshotCompletionInterest.SnapshotCompletionEvent> watchSnapshot = SnapshotUtil.watchSnapshot(indexSnapshotNonce);
            this.m_indexBuildFuture = watchSnapshot;
            requestIndexSnapshot(indexSnapshotNonce, false, VoltDB.instance().getCatalogContext().database, calculateRanges, this.m_snapshotResponseHandler);
            watchSnapshot.addListener(new Runnable() { // from class: org.voltdb.join.ElasticJoinCoordinator.6
                @Override // java.lang.Runnable
                public void run() {
                    if (ElasticJoinCoordinator.this.m_indexBuildFuture != watchSnapshot) {
                        return;
                    }
                    if (watchSnapshot.isCancelled()) {
                        ElasticJoinCoordinator.log.error("Timed out waiting for index snapshot to complete, going to attempt migrating");
                        ElasticJoinCoordinator.this.m_state = 4;
                        ElasticJoinCoordinator.this.handleStateTransitioned();
                        return;
                    }
                    try {
                        watchSnapshot.get();
                    } catch (InterruptedException e) {
                        VoltDB.crashLocalVoltDB("Don't expect to get interrupted", true, e);
                    } catch (ExecutionException e2) {
                        ElasticJoinCoordinator.log.error("Failed to wait for the index snapshot to finish", e2.getCause());
                        ElasticJoinCoordinator.this.m_snapshotFailed = true;
                    }
                    if (ElasticJoinCoordinator.this.m_state != 4) {
                        ElasticJoinCoordinator.this.m_state = 4;
                        ElasticJoinCoordinator.this.handleStateTransitioned();
                    }
                }
            }, this.m_es);
            VoltDB.instance().getSES(true).schedule(new Runnable() { // from class: org.voltdb.join.ElasticJoinCoordinator.7
                @Override // java.lang.Runnable
                public void run() {
                    watchSnapshot.cancel(false);
                }
            }, 2L, TimeUnit.HOURS);
        } catch (Exception e) {
            log.error("Failed to request elastic index snapshot", e);
            this.m_snapshotFailed = true;
        }
        if (this.m_snapshotFailed) {
            this.m_state = 4;
            handleStateTransitioned();
        }
    }

    private void migrateData() {
        boolean z = true;
        try {
            try {
                final AtomicReference atomicReference = new AtomicReference(calculateRanges());
                long j = 0;
                TreeSet newTreeSet = Sets.newTreeSet();
                Iterator it = ((Multimap) atomicReference.get()).values().iterator();
                while (it.hasNext()) {
                    j += r0.rangeEnd - r0.rangeStart;
                    newTreeSet.add(Integer.valueOf(((ElasticRangeOwnership) it.next()).partition));
                }
                final long j2 = j;
                log.info("Start rebalancing from partitions " + Sets.newTreeSet(((Multimap) atomicReference.get()).keySet()) + " to partitions " + newTreeSet);
                final AtomicReference atomicReference2 = new AtomicReference(null);
                this.m_migrateFuture = CoreUtils.retryHelper(VoltDB.instance().getSES(true), this.m_es, new Callable<Object>() { // from class: org.voltdb.join.ElasticJoinCoordinator.8
                    @Override // java.util.concurrent.Callable
                    public Object call() throws Exception {
                        String createActionBlocker;
                        VoltZK.removeActionBlocker(ElasticJoinCoordinator.this.m_zk, VoltZK.elasticJoinInProgress, ElasticJoinCoordinator.log);
                        if (!VoltZK.zkNodeExists(ElasticJoinCoordinator.this.m_zk, VoltZK.elasticJoinMigration) && (createActionBlocker = VoltZK.createActionBlocker(ElasticJoinCoordinator.this.m_zk, VoltZK.elasticJoinMigration, CreateMode.EPHEMERAL, ElasticJoinCoordinator.log, "elastic join")) != null) {
                            ElasticJoinCoordinator.log.warn("Failed to create elasticJoinMigration blocker: " + createActionBlocker);
                            throw new CoreUtils.RetryException(createActionBlocker);
                        }
                        ElasticJoinCoordinator.this.m_stats.initialize(j2);
                        if (atomicReference2.get() != ElasticJoinCoordinator.this.m_migrateFuture || ElasticJoinCoordinator.this.m_migrateFuture == null || ElasticJoinCoordinator.this.m_migrateFuture.isCancelled()) {
                            return null;
                        }
                        try {
                            try {
                                try {
                                    if (!ElasticOperationUtils.migrateData(ElasticJoinCoordinator.this.m_ci, ElasticJoinCoordinator.this.m_adapter.connectionId(), ElasticJoinCoordinator.this.m_adapter.getSyncCallbackSupplier(), ElasticJoinCoordinator.this.m_mpiHashinator, ElasticJoinCoordinator.this.m_goalHashinator, (Multimap) atomicReference.get(), ElasticJoinCoordinator.this.m_stats)) {
                                        ElasticJoinCoordinator.log.warn("Data migration failed, retrying...");
                                        ElasticJoinCoordinator.this.m_mpiHashinator = null;
                                        atomicReference.set(ElasticJoinCoordinator.this.calculateRanges());
                                        throw new CoreUtils.RetryException();
                                    }
                                    StringBuilder sb = new StringBuilder();
                                    if (ElasticJoinCoordinator.this.m_joiningHosts.isEmpty()) {
                                        sb.append("all hosts");
                                    } else {
                                        Iterator it2 = ElasticJoinCoordinator.this.m_joiningHosts.iterator();
                                        while (it2.hasNext()) {
                                            int intValue = ((Integer) it2.next()).intValue();
                                            if (sb.length() > 0) {
                                                sb.append(", ");
                                            }
                                            sb.append(ElasticJoinCoordinator.this.m_messenger.getHostnameForHostID(intValue));
                                        }
                                        sb.insert(0, "joining host(s) [");
                                        sb.append("]");
                                    }
                                    ElasticJoinCoordinator.log.info(String.format("Data migration completed to %s", sb.toString()));
                                    ElasticJoinCoordinator.this.writeNewTopoToZK();
                                    if (1 == 0) {
                                        return null;
                                    }
                                    ElasticJoinCoordinator.this.reset();
                                    ElasticJoinCoordinator.this.m_stats.initialize(0L);
                                    return null;
                                } catch (ElasticOperationUtils.NoDestPartitionException e) {
                                    ElasticJoinCoordinator.log.warn("@BalancePartitions found the destination partition didn't exist, aborting migration");
                                    if (1 != 0) {
                                        ElasticJoinCoordinator.this.reset();
                                        ElasticJoinCoordinator.this.m_stats.initialize(0L);
                                    }
                                    return null;
                                }
                            } catch (CoreUtils.RetryException e2) {
                                throw new CoreUtils.RetryException(e2);
                            } catch (Exception e3) {
                                ElasticJoinCoordinator.log.error("Failed to complete rebalancing. There will be a data skew in the cluster. To redistribute data, do a snapshot save and recover the cluster.", e3);
                                if (0 == 0) {
                                    return null;
                                }
                                ElasticJoinCoordinator.this.reset();
                                ElasticJoinCoordinator.this.m_stats.initialize(0L);
                                return null;
                            }
                        } catch (Throwable th) {
                            if (0 != 0) {
                                ElasticJoinCoordinator.this.reset();
                                ElasticJoinCoordinator.this.m_stats.initialize(0L);
                            }
                            throw th;
                        }
                    }
                }, 0L, 10L, TimeUnit.SECONDS, 1L, TimeUnit.HOURS);
                atomicReference2.set(this.m_migrateFuture);
                z = false;
                if (0 != 0) {
                    reset();
                }
            } catch (Exception e) {
                log.error("Failed to complete rebalancing. There will be a data skew in the cluster. To redistribute data, do a snapshot save and recover the cluster.", e);
                if (z) {
                    reset();
                }
            }
        } catch (Throwable th) {
            if (z) {
                reset();
            }
            throw th;
        }
    }

    private void preSnapshot() {
        pingPartitions(true);
    }

    private void startSnapshot() {
        try {
            if (!requestSnapshot(this.m_snapshotNonce, this.m_clSnapshotPath, ElasticOperationUtils.calculateRemoteHosts(this.m_cartographer, this.m_joiningHosts), VoltDB.instance().getCatalogContext().database, this.m_snapshotResponseHandler)) {
                VoltDB.crashLocalVoltDB("Failed to request snapshot to transfer replicated table data", false, null);
            }
        } catch (Exception e) {
            VoltDB.crashLocalVoltDB("Failed to request snapshot to transfer replicated table data", true, e);
        }
    }

    private void checkParticipantsReady(ElasticJoinNodeInfo elasticJoinNodeInfo) {
        Set<Integer> readyHosts = elasticJoinNodeInfo.getReadyHosts();
        if (readyHosts.size() == this.m_expectedHostCount) {
            log.debug("Handling all joining nodes ready");
            this.m_joiningHosts.clear();
            this.m_joiningHosts.addAll(readyHosts);
            this.m_state = 0;
        }
    }

    private void checkResumeMigration(ElasticJoinNodeInfo elasticJoinNodeInfo) {
        if (elasticJoinNodeInfo.getResumeMigration()) {
            this.m_goalHashinator = new ElasticHashinator(elasticJoinNodeInfo.getHashinatorConfig(), true);
            this.m_state = 4;
        }
    }

    private Multimap<Integer, Integer> getPartitionToHosts(Map<Integer, Integer> map) throws JSONException {
        Integer num;
        ArrayDeque arrayDeque = new ArrayDeque();
        Iterator<Integer> it = this.m_joiningHosts.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            for (int i = 0; i < map.get(Integer.valueOf(intValue)).intValue(); i++) {
                arrayDeque.add(Integer.valueOf(intValue));
            }
        }
        ArrayListMultimap create = ArrayListMultimap.create();
        Iterator<Integer> it2 = this.m_partitionsToAdd.iterator();
        while (!arrayDeque.isEmpty() && (num = (Integer) arrayDeque.poll()) != null) {
            if (!it2.hasNext()) {
                it2 = this.m_partitionsToAdd.iterator();
            }
            create.put(it2.next(), num);
        }
        return create;
    }

    private void notifyPartitionAssignmentAndSnapshotNonce() {
        try {
            ElasticJoinNodeInfo elasticJoinNodeInfo = new ElasticJoinNodeInfo(this.m_zk.getData(zkNode, (Watcher) null, (Stat) null));
            elasticJoinNodeInfo.setHashinatorConfig(this.m_goalHashinator.getCookedBytes());
            elasticJoinNodeInfo.setSnapshotNonce(this.m_snapshotNonce);
            elasticJoinNodeInfo.setTopology(this.m_topo);
            this.m_zk.setData(zkNode, elasticJoinNodeInfo.toBytes(), -1);
            if (log.isDebugEnabled()) {
                log.debug("Elastic join coordinator leader notified partition assignment: " + this.m_partitionsToAdd);
            }
        } catch (Exception e) {
            VoltDB.crashLocalVoltDB("Failed to notify partition assignment", true, e);
        }
    }

    private void checkNodeInitialized(ElasticJoinNodeInfo elasticJoinNodeInfo) {
        if (elasticJoinNodeInfo.getInitializedElasticCoordinatorHSIds().size() == this.m_expectedHostCount) {
            if (log.isDebugEnabled()) {
                log.debug("Handling all joining nodes initialized");
            }
            this.m_snapshotSinkHSIds = elasticJoinNodeInfo.getSnapshotSinkHSIds();
            this.m_lowestSiteSinkHSIds = elasticJoinNodeInfo.getLowestSiteSinkHSIds();
            this.m_state = 1;
        }
    }

    private void checkParticipantsFinished(ElasticJoinNodeInfo elasticJoinNodeInfo) {
        if (elasticJoinNodeInfo.getFinishedElasticCoordinatorHSIds().size() == this.m_expectedHostCount) {
            if (log.isDebugEnabled()) {
                log.debug("Handling all joining nodes finished snapshotting");
            }
            this.m_state = 2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeNewTopoToZK() {
        if (this.m_topo == null) {
            return;
        }
        try {
            this.m_zk.setData(VoltZK.topology, this.m_topo.toString().getBytes(Constants.UTF8ENCODING), this.m_topoVersion);
            if (log.isDebugEnabled()) {
                log.debug("Updated topology in ZK: " + this.m_topo.toString(2));
            }
        } catch (KeeperException.BadVersionException e) {
            VoltDB.crashLocalVoltDB("Concurrent join of nodes is not supported", false, null);
        } catch (Exception e2) {
            VoltDB.crashLocalVoltDB("Failed to write new topology to ZooKeeper", true, e2);
        }
    }

    private boolean requestSnapshot(String str, String str2, Map<Integer, Integer> map, Database database, SnapshotUtil.SnapshotResponseHandler snapshotResponseHandler) throws InterruptedException {
        String makeSnapshotRequest = JoinCoordinator.makeSnapshotRequest(DataMigrationSnapshotPlanner.planForPartitions(map, this.m_snapshotSinkHSIds, this.m_lowestSiteSinkHSIds, str2 != null, database, this.m_cartographer));
        if (makeSnapshotRequest == null) {
            log.fatal("Failed to generate data migration snapshot request");
            return false;
        }
        if (log.isDebugEnabled()) {
            String str3 = makeSnapshotRequest;
            try {
                str3 = new JSONObject(makeSnapshotRequest).toString(4);
            } catch (JSONException e) {
            }
            log.debug("Data migration snapshot request: " + str3);
        }
        SnapshotPathType snapshotPathType = SnapshotPathType.SNAP_CL;
        if (str2 == null) {
            str2 = "";
        }
        SnapshotUtil.requestSnapshot(0L, str2, str, false, SnapshotFormat.STREAM, snapshotPathType, makeSnapshotRequest, snapshotResponseHandler, true);
        return true;
    }

    private void generateElasticChangeEvent() {
        log.debug("Generating DR Events for Elastic Add and Stream Start");
        ByteBuffer allocate = ByteBuffer.allocate(12);
        allocate.putInt(ExecutionEngine.TaskType.ELASTIC_CHANGE.ordinal());
        allocate.putInt(this.m_partitionsToAdd.get(0).intValue());
        allocate.putInt(this.m_partitionsToAdd.get(this.m_partitionsToAdd.size() - 1).intValue() + 1);
        try {
            this.m_ci.callExecuteTask(30000L, allocate.array());
        } catch (Exception e) {
            log.warn("Failed to create Elastic Add Events for DR", e);
        }
        this.m_state = 3;
        handleStateTransitioned();
    }

    public static boolean requestIndexSnapshot(String str, boolean z, Database database, Multimap<Integer, ElasticRangeOwnership> multimap, SnapshotUtil.SnapshotResponseHandler snapshotResponseHandler) throws InterruptedException {
        String makeSnapshotRequest = JoinCoordinator.makeSnapshotRequest(IndexSnapshotPlanner.plan(CatalogUtil.getNormalTables(database, false), multimap));
        if (makeSnapshotRequest == null) {
            log.error("Failed to generate index snapshot request");
            return false;
        }
        if (log.isDebugEnabled()) {
            String str2 = makeSnapshotRequest;
            try {
                str2 = new JSONObject(makeSnapshotRequest).toString(4);
            } catch (JSONException e) {
            }
            log.debug("Index snapshot request: " + str2);
        }
        SnapshotUtil.requestSnapshot(0L, "", str, z, SnapshotFormat.INDEX, SnapshotPathType.SNAP_CL, makeSnapshotRequest, snapshotResponseHandler, true);
        return true;
    }

    private String getIndexSnapshotNonce() {
        return this.m_snapshotNonce + "_index";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void pingPartitions(boolean z) {
        StoredProcedureInvocation storedProcedureInvocation = new StoredProcedureInvocation();
        storedProcedureInvocation.setProcName("@PingPartitions");
        Object[] objArr = new Object[1];
        objArr[0] = Byte.valueOf(z ? (byte) 1 : (byte) 0);
        storedProcedureInvocation.setParams(objArr);
        storedProcedureInvocation.setClientHandle(this.m_adapter.registerCallback(SimpleClientResponseAdapter.NULL_CALLBACK));
        this.m_ci.createTransaction(this.m_adapter.connectionId(), storedProcedureInvocation, false, false, false, 16383, 0, System.currentTimeMillis());
    }

    private boolean isDrElasticChangeEventsComplete() {
        if (!VoltDB.instance().getNodeDRGateway().isActive()) {
            return true;
        }
        try {
            ByteBuffer allocate = ByteBuffer.allocate(4);
            allocate.putInt(ExecutionEngine.TaskType.GET_DR_TUPLESTREAM_STATE.ordinal());
            ClientResponse callExecuteTask = this.m_ci.callExecuteTask(30000L, allocate.array());
            if (callExecuteTask == null || callExecuteTask.getStatus() != 1) {
                return false;
            }
            VoltTable[] results = callExecuteTask.getResults();
            if (!$assertionsDisabled && (results == null || results.length != 1)) {
                throw new AssertionError();
            }
            VoltTable voltTable = results[0];
            while (voltTable.advanceRow()) {
                if (this.m_partitionsToAdd.contains(Integer.valueOf(Long.valueOf(voltTable.getLong("PARTITION_ID")).intValue()))) {
                    return voltTable.getLong("SEQUENCE_NUMBER") >= 0;
                }
            }
            return false;
        } catch (Exception e) {
            log.warn("Failed to get current DRIDs during recovery of Elastic Join", e);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedulePingWork() {
        VoltDB.instance().scheduleWork(this.m_pingWork, PING_INTERVAL_MS, -1L, TimeUnit.MILLISECONDS);
    }

    @Override // org.voltdb.join.ElasticJoinService
    public void updateConfig(CatalogContext catalogContext) {
        ElasticOperationUtils.updateSettings(catalogContext);
    }

    private void updateHostCountInClusterSettings() {
        try {
            int hostCount = AbstractTopology.topologyFromJSON(this.m_topo).getHostCount();
            if (hostCount > this.m_clusterSettings.get().hostcount()) {
                Properties asProperties = this.m_clusterSettings.get().asProperties();
                asProperties.setProperty(ClusterSettings.HOST_COUNT, Integer.toString(hostCount));
                byte[] propertiesToBytes = Settings.propertiesToBytes(asProperties);
                SyncCallback syncCallback = new SyncCallback();
                try {
                    this.m_ci.getInternalConnectionHandler().callProcedure(this.m_ci.getInternalUser(), false, -1, (ProcedureCallback) syncCallback, "@UpdateSettings", propertiesToBytes);
                    syncCallback.waitForResponse();
                    ClientResponse response = syncCallback.getResponse();
                    if (response.getStatus() != 1) {
                        log.error("Invocation to @UpdateSettings failed:\n    " + response.getStatusString());
                    }
                } catch (InterruptedException e) {
                    log.error("Interrupted while waiting for @UpdateSettings responses", e);
                }
            }
        } catch (JSONException e2) {
            throw new SettingsException("failed to parse topology json", e2);
        }
    }

    static {
        $assertionsDisabled = !ElasticJoinCoordinator.class.desiredAssertionStatus();
        log = new VoltLogger("JOIN");
    }
}
