package org.voltdb.join;

import com.google_voltpatches.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper_voltpatches.CreateMode;
import org.apache.zookeeper_voltpatches.KeeperException;
import org.apache.zookeeper_voltpatches.WatchedEvent;
import org.apache.zookeeper_voltpatches.Watcher;
import org.apache.zookeeper_voltpatches.ZooDefs;
import org.apache.zookeeper_voltpatches.ZooKeeper;
import org.apache.zookeeper_voltpatches.data.Stat;
import org.json_voltpatches.JSONException;
import org.json_voltpatches.JSONObject;
import org.voltcore.logging.VoltLogger;
import org.voltcore.messaging.HostMessenger;
import org.voltcore.messaging.VoltMessage;
import org.voltcore.utils.CoreUtils;
import org.voltcore.utils.DBBPool;
import org.voltcore.zk.ZKUtil;
import org.voltdb.AbstractTopology;
import org.voltdb.VoltDB;
import org.voltdb.catalog.Database;
import org.voltdb.iv2.DeterminismHash;
import org.voltdb.messaging.RejoinMessage;
import org.voltdb.rejoin.JoinCoordinator;

/* loaded from: input_file:org/voltdb/join/ElasticJoinNodeCoordinator.class */
public class ElasticJoinNodeCoordinator extends JoinCoordinator {
    private static final VoltLogger log = new VoltLogger("JOIN");
    private static int SNAPSHOT_BUFFER_POOL_SIZE = Integer.getInteger("JOIN_RECEIVE_BUFFER_POOL_SIZE", 3).intValue();
    private final ZooKeeper m_zk;
    private final int m_hostId;
    private volatile JSONObject m_topo;
    private volatile List<Integer> m_partitionsToAdd;
    private final CountDownLatch m_partitionsAssigned;
    private volatile String m_snapshotNonce;
    private CountDownLatch m_initiatedLatch;
    private Map<Integer, Long> m_localSites;
    private final Set<Long> m_sitesWithFirstFragments;
    private final Map<Long, Long> m_snapshotSinks;
    private AtomicInteger m_lowestSite;
    private Long m_lowestSiteSinkHSId;
    private final Queue<DBBPool.BBContainer> m_snapshotDataBufPool;
    private final Queue<DBBPool.BBContainer> m_snapshotCompressedDataBufPool;
    private CountDownLatch m_sitesNotFinishedReplay;
    private final ExecutorService m_es;
    private final Watcher m_watcher;

    public ElasticJoinNodeCoordinator(HostMessenger hostMessenger, String str) {
        super(hostMessenger);
        this.m_topo = null;
        this.m_partitionsToAdd = null;
        this.m_partitionsAssigned = new CountDownLatch(1);
        this.m_snapshotNonce = null;
        this.m_localSites = null;
        this.m_sitesWithFirstFragments = Collections.synchronizedSet(new HashSet());
        this.m_snapshotSinks = Collections.synchronizedMap(new HashMap());
        this.m_lowestSite = new AtomicInteger(DeterminismHash.HASH_NOT_INCLUDE);
        this.m_lowestSiteSinkHSId = 0L;
        this.m_sitesNotFinishedReplay = null;
        this.m_es = CoreUtils.getCachedSingleThreadExecutor("Elastic join node coordinator", 15000L);
        this.m_watcher = new Watcher() { // from class: org.voltdb.join.ElasticJoinNodeCoordinator.1
            @Override // org.apache.zookeeper_voltpatches.Watcher
            public void process(final WatchedEvent watchedEvent) {
                try {
                    ElasticJoinNodeCoordinator.this.m_es.submit(new Runnable() { // from class: org.voltdb.join.ElasticJoinNodeCoordinator.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                                ElasticJoinNodeCoordinator.this.getPartitionAssignmentAndSnapshotNonce();
                            }
                        }
                    });
                } catch (RejectedExecutionException e) {
                }
            }
        };
        clearOverflowDir(str);
        this.m_zk = hostMessenger.getZK();
        this.m_hostId = hostMessenger.getHostId();
        this.m_snapshotDataBufPool = new ConcurrentLinkedQueue();
        this.m_snapshotCompressedDataBufPool = new ConcurrentLinkedQueue();
    }

    @Override // org.voltdb.rejoin.JoinCoordinator
    public void initialize(int i) throws InterruptedException {
        registerToZK(i + 1);
        this.m_partitionsAssigned.await();
    }

    @Override // org.voltdb.rejoin.JoinCoordinator
    public JSONObject getTopology() {
        return this.m_topo;
    }

    @Override // org.voltdb.rejoin.JoinCoordinator
    public boolean startJoin(Database database) {
        if (!initiateJoinOnSites(this.m_snapshotNonce)) {
            return false;
        }
        notifyNodeFinishedInitialization();
        waitForSitesToFinishReplay();
        this.m_es.shutdown();
        try {
            this.m_es.awaitTermination(356L, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            log.error("Failed to shutdown elastic join node coordinator", e);
        }
        VoltDB.instance().onExecutionSiteRejoinCompletion(0L);
        notifyNodeFinishedJoin();
        return true;
    }

    @Override // org.voltdb.rejoin.JoinCoordinator
    public void setPartitionsToHSIds(Map<Integer, Long> map) {
        this.m_localSites = ImmutableMap.copyOf((Map) map);
        this.m_sitesNotFinishedReplay = new CountDownLatch(this.m_localSites.size());
    }

    public Map<Long, Long> getSnapshotSinks() {
        return this.m_snapshotSinks;
    }

    public boolean initiateJoinOnSites(String str) {
        this.m_initiatedLatch = new CountDownLatch(this.m_localSites.size());
        initiateJoin(str);
        if (!waitForInitiationResponses()) {
            return false;
        }
        waitForSitesToSeeFragments(this.m_localSites.size());
        log.info("All sites have received the first fragment");
        return true;
    }

    private void initiateJoin(String str) {
        Iterator<Map.Entry<Integer, Long>> it = this.m_localSites.entrySet().iterator();
        while (it.hasNext()) {
            send(it.next().getValue().longValue(), new RejoinMessage(getHSId(), RejoinMessage.Type.INITIATION, str, this.m_snapshotDataBufPool, this.m_snapshotCompressedDataBufPool, false));
        }
    }

    private boolean waitForInitiationResponses() {
        try {
            if (this.m_initiatedLatch.await(5L, TimeUnit.MINUTES)) {
                return true;
            }
            log.fatal("Timed out waiting for initiation responses from local sites");
            return false;
        } catch (InterruptedException e) {
            log.fatal("Interrupted waiting for all local sites finish initialization", e);
            return false;
        }
    }

    private void waitForSitesToSeeFragments(int i) {
        log.debug("Waiting for local sites to see the first fragments");
        while (this.m_sitesWithFirstFragments.size() != i) {
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
            }
        }
        log.debug("All local sites have seen the first fragments");
    }

    public void waitForSitesToFinishReplay() {
        try {
            this.m_sitesNotFinishedReplay.await();
        } catch (InterruptedException e) {
            VoltDB.crashLocalVoltDB("Interrupted waiting for all local sites to finish replay", true, e);
        }
        while (this.m_snapshotDataBufPool.size() > 0) {
            this.m_snapshotDataBufPool.poll().discard();
        }
        while (this.m_snapshotCompressedDataBufPool.size() > 0) {
            this.m_snapshotCompressedDataBufPool.poll().discard();
        }
    }

    @Override // org.voltdb.messaging.LocalMailbox, org.voltcore.messaging.Mailbox
    public void deliver(VoltMessage voltMessage) {
        if (!(voltMessage instanceof RejoinMessage)) {
            VoltDB.crashLocalVoltDB("Unknown message type " + voltMessage.getClass().toString() + " sent to the join coordinator", false, null);
        }
        RejoinMessage rejoinMessage = (RejoinMessage) voltMessage;
        if (rejoinMessage.getType() == RejoinMessage.Type.INITIATION_RESPONSE) {
            log.info("ElasticJoinNodeCoordinator received initiation response from " + CoreUtils.hsIdToString(rejoinMessage.m_sourceHSId));
            onSiteInitialized(rejoinMessage.m_sourceHSId, rejoinMessage.getMasterHSId(), rejoinMessage.getSnapshotSinkHSId(), rejoinMessage.schemaHasNoTables());
        } else if (rejoinMessage.getType() == RejoinMessage.Type.FIRST_FRAGMENT_RECEIVED) {
            log.info("ElasticJoinNodeCoordinator received first fragment response from " + CoreUtils.hsIdToString(rejoinMessage.m_sourceHSId));
            this.m_sitesWithFirstFragments.add(Long.valueOf(rejoinMessage.m_sourceHSId));
        } else if (rejoinMessage.getType() != RejoinMessage.Type.REPLAY_FINISHED) {
            VoltDB.crashLocalVoltDB("Wrong join message of type " + rejoinMessage.getType() + " sent to the join coordinator", false, null);
        } else {
            log.info("ElasticJoinNodeCoordinator received replay finish message from " + CoreUtils.hsIdToString(rejoinMessage.m_sourceHSId));
            this.m_sitesNotFinishedReplay.countDown();
        }
    }

    private void onSiteInitialized(long j, long j2, long j3, boolean z) {
        if (log.isDebugEnabled()) {
            log.debug("Initialized site " + CoreUtils.getSiteIdFromHSId(j) + " SinkHSID " + j3 + " current lowest site is " + this.m_lowestSite.get());
        }
        while (true) {
            int i = this.m_lowestSite.get();
            if (i <= CoreUtils.getSiteIdFromHSId(j)) {
                break;
            } else if (this.m_lowestSite.compareAndSet(i, CoreUtils.getSiteIdFromHSId(j))) {
                this.m_lowestSiteSinkHSId = Long.valueOf(j3);
                break;
            }
        }
        this.m_snapshotSinks.put(Long.valueOf(j), Long.valueOf(j3));
        this.m_initiatedLatch.countDown();
    }

    private void registerToZK(int i) {
        log.debug("Registering elastic join node to ZK");
        while (true) {
            try {
                Stat stat = new Stat();
                ElasticJoinNodeInfo elasticJoinNodeInfo = new ElasticJoinNodeInfo(this.m_zk.getData("/db/elastic_join/joining", this.m_watcher, stat));
                if (elasticJoinNodeInfo.getReadyHosts().size() >= i) {
                    VoltDB.crashLocalVoltDB("The cluster is currently expanding. No more nodes can be added until the current process finishes.Use the @Statistics REBALANCE system procedure to check the status.", false, null);
                }
                elasticJoinNodeInfo.addReadyHosts(this.m_hostId);
                this.m_zk.setData("/db/elastic_join/joining", elasticJoinNodeInfo.toBytes(), stat.getVersion());
                this.m_zk.create(getZKPresenceNode(this.m_hostId), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                return;
            } catch (InterruptedException e) {
                VoltDB.crashLocalVoltDB("Interrupted during join", true, e);
            } catch (KeeperException e2) {
                if (e2.code() == KeeperException.Code.BADVERSION || e2.code() == KeeperException.Code.NONODE) {
                    log.debug("Recoverable exception thrown while registering to ZK", e2);
                } else {
                    VoltDB.crashLocalVoltDB("Failed to register joining node to ZooKeeper", true, e2);
                }
            } catch (JSONException e3) {
                VoltDB.crashLocalVoltDB("Failed to parse joining node info", true, e3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getPartitionAssignmentAndSnapshotNonce() {
        try {
            ElasticJoinNodeInfo elasticJoinNodeInfo = new ElasticJoinNodeInfo(this.m_zk.getData("/db/elastic_join/joining", this.m_watcher, (Stat) null));
            this.m_snapshotNonce = elasticJoinNodeInfo.getSnapshotNonce();
            JSONObject topology = elasticJoinNodeInfo.getTopology();
            if (topology != null) {
                this.m_topo = topology;
                this.m_partitionsToAdd = AbstractTopology.topologyFromJSON(this.m_topo).getPartitionIdList(CoreUtils.getHostIdFromHSId(getHSId()));
                this.m_partitionsAssigned.countDown();
            }
            log.debug("Received partition assignment " + this.m_partitionsToAdd + " and snapshot nonce " + this.m_snapshotNonce);
        } catch (Exception e) {
            VoltDB.crashLocalVoltDB("Failed to get partition assignment", true, e);
        }
    }

    private void notifyNodeFinishedInitialization() {
        log.debug("Notifying node initialized for elastic join");
        while (true) {
            try {
                Stat stat = new Stat();
                ElasticJoinNodeInfo elasticJoinNodeInfo = new ElasticJoinNodeInfo(this.m_zk.getData("/db/elastic_join/joining", (Watcher) null, stat));
                elasticJoinNodeInfo.addInitializedElasticCoordinator(getHSId());
                elasticJoinNodeInfo.addSnapshotSinkHSIds(getSnapshotSinks());
                elasticJoinNodeInfo.addLowestSiteSinkHSId(this.m_hostId, this.m_lowestSiteSinkHSId.longValue());
                this.m_zk.setData("/db/elastic_join/joining", elasticJoinNodeInfo.toBytes(), stat.getVersion());
                log.debug("Notified node initialized for elastic join");
                return;
            } catch (KeeperException.BadVersionException e) {
            } catch (Exception e2) {
                VoltDB.crashLocalVoltDB("Failed to add host to initialized host list", true, e2);
            }
        }
    }

    private void notifyNodeFinishedJoin() {
        log.debug("Notifying node finished elastic join");
        while (true) {
            try {
                Stat stat = new Stat();
                ElasticJoinNodeInfo elasticJoinNodeInfo = new ElasticJoinNodeInfo(this.m_zk.getData("/db/elastic_join/joining", false, stat));
                elasticJoinNodeInfo.addFinishedElasticCoordinator(getHSId());
                this.m_zk.setData("/db/elastic_join/joining", elasticJoinNodeInfo.toBytes(), stat.getVersion());
                log.debug("Notified node finished elastic join");
                return;
            } catch (KeeperException.BadVersionException e) {
            } catch (Exception e2) {
                VoltDB.crashLocalVoltDB("Failed to add host to finished host list", true, e2);
            }
        }
    }

    private static String getZKPresenceNode(int i) {
        return ZKUtil.joinZKPath("/db/elastic_join/joining", Integer.toString(i));
    }
}
