package org.apache.hama.bsp.sync;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPJobID;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.util.ZKUtil;
import org.apache.hama.zookeeper.QuorumPeer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.class */
public class ZooKeeperSyncClientImpl extends ZKSyncClient implements PeerSyncClient {
    public static final Log LOG = LogFactory.getLog(ZooKeeperSyncClientImpl.class);
    private volatile Integer mutex = 0;
    private String quorumServers;
    private ZooKeeper zk;
    private String bspRoot;
    private InetSocketAddress peerAddress;
    private int numBSPTasks;
    private String[] allPeers;

    /* loaded from: input_file:org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl$BarrierWatcher.class */
    private class BarrierWatcher implements Watcher {
        private boolean complete;

        private BarrierWatcher() {
            this.complete = false;
        }

        boolean isComplete() {
            return this.complete;
        }

        public void process(WatchedEvent watchedEvent) {
            this.complete = true;
            synchronized (ZooKeeperSyncClientImpl.this.mutex) {
                ZooKeeperSyncClientImpl.this.mutex.notifyAll();
            }
        }
    }

    @Override // org.apache.hama.bsp.sync.PeerSyncClient
    public void init(Configuration configuration, BSPJobID bSPJobID, TaskAttemptID taskAttemptID) throws Exception {
        this.quorumServers = QuorumPeer.getZKQuorumServersString(configuration);
        this.zk = new ZooKeeper(this.quorumServers, configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
        this.bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT, Constants.DEFAULT_ZOOKEEPER_ROOT);
        String str = configuration.get(Constants.PEER_HOST, "0.0.0.0");
        int i = configuration.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
        initialize(this.zk, this.bspRoot);
        this.peerAddress = new InetSocketAddress(str, i);
        LOG.info("Start connecting to Zookeeper! At " + this.peerAddress);
        this.numBSPTasks = configuration.getInt(Constants.JOB_PEERS_COUNT, 1);
    }

    @Override // org.apache.hama.bsp.sync.PeerSyncClient
    public void enterBarrier(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, long j) throws SyncException {
        LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " + j);
        try {
            synchronized (this.zk) {
                String constructKey = constructKey(taskAttemptID.getJobID(), "sync", "" + j);
                writeNode(constructKey, null, true, null);
                BarrierWatcher barrierWatcher = new BarrierWatcher();
                this.zk.exists(constructKey + "/ready", barrierWatcher);
                this.zk.create(getNodeName(taskAttemptID, j), (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                List children = this.zk.getChildren(constructKey, false);
                int size = children.size();
                boolean contains = children.contains("ready");
                if (contains) {
                    size--;
                }
                LOG.debug("===> at superstep :" + j + " current znode size: " + children.size() + " current znodes:" + children);
                LOG.debug("enterBarrier() znode size within " + constructKey + " is " + children.size() + ". Znodes include " + children);
                if (size < this.numBSPTasks) {
                    while (!barrierWatcher.isComplete()) {
                        if (!contains) {
                            synchronized (this.mutex) {
                                this.mutex.wait(1000L);
                            }
                        }
                    }
                    LOG.debug("2. at superstep: " + j + " after waiting ..." + taskAttemptID.toString());
                } else {
                    LOG.debug("---> at superstep: " + j + " task that is creating /ready znode:" + taskAttemptID.toString());
                    writeNode(constructKey + "/ready", null, false, null);
                }
            }
        } catch (Exception e) {
            throw new SyncException(e.toString());
        }
    }

    @Override // org.apache.hama.bsp.sync.PeerSyncClient
    public void leaveBarrier(BSPJobID bSPJobID, final TaskAttemptID taskAttemptID, final long j) throws SyncException {
        String constructKey;
        try {
            constructKey = constructKey(taskAttemptID.getJobID(), "sync", "" + j);
        } catch (Exception e) {
            throw new SyncException(e.getMessage());
        }
        while (true) {
            List children = this.zk.getChildren(constructKey, false);
            LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:" + j + " znode:" + children);
            if (children.contains("ready")) {
                children.remove("ready");
            }
            int size = children.size();
            LOG.debug("leaveBarrier() at superstep:" + j + " znode size: (" + size + ") znodes:" + children);
            if (null == children || children.isEmpty()) {
                return;
            }
            if (1 == size) {
                try {
                    this.zk.delete(getNodeName(taskAttemptID, j), 0);
                    return;
                } catch (KeeperException.NoNodeException e2) {
                    LOG.debug("+++ (znode size is 1). Ignore because znode may disconnect.", e2);
                    return;
                }
            }
            Collections.sort(children);
            String str = (String) children.get(0);
            String str2 = (String) children.get(size - 1);
            synchronized (this.mutex) {
                if (!getNodeName(taskAttemptID, j).equals(constructKey + ZKUtil.ZK_SEPARATOR + str)) {
                    if (null != this.zk.exists(getNodeName(taskAttemptID, j), false)) {
                        try {
                            this.zk.delete(getNodeName(taskAttemptID, j), 0);
                        } catch (KeeperException.NoNodeException e3) {
                            LOG.debug("++++ Ignore because node may be dleted.", e3);
                        }
                    }
                    if (null != this.zk.exists(constructKey + ZKUtil.ZK_SEPARATOR + str, new Watcher() { // from class: org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.2
                        public void process(WatchedEvent watchedEvent) {
                            synchronized (ZooKeeperSyncClientImpl.this.mutex) {
                                ZooKeeperSyncClientImpl.LOG.debug("leaveBarrier() at superstep: " + j + " taskid:" + taskAttemptID.toString() + " lowest notify other nodes.");
                                ZooKeeperSyncClientImpl.this.mutex.notifyAll();
                            }
                        }
                    })) {
                        LOG.debug("leaveBarrier(): superstep:" + j + " taskid:" + taskAttemptID.toString() + " wait for lowest notify.");
                        this.mutex.wait();
                    }
                } else if (null != this.zk.exists(constructKey + ZKUtil.ZK_SEPARATOR + str2, new Watcher() { // from class: org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.1
                    public void process(WatchedEvent watchedEvent) {
                        synchronized (ZooKeeperSyncClientImpl.this.mutex) {
                            ZooKeeperSyncClientImpl.LOG.debug("leaveBarrier() at superstep: " + j + " taskid:" + taskAttemptID.toString() + " highest notify lowest.");
                            ZooKeeperSyncClientImpl.this.mutex.notifyAll();
                        }
                    }
                })) {
                    LOG.debug("leaveBarrier(): superstep:" + j + " taskid:" + taskAttemptID.toString() + " wait for higest notify.");
                    this.mutex.wait();
                }
            }
            throw new SyncException(e.getMessage());
        }
    }

    @Override // org.apache.hama.bsp.sync.PeerSyncClient
    public void register(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, String str, long j) {
        try {
            String constructKey = constructKey(bSPJobID, "peers");
            if (this.zk.exists(constructKey, false) == null) {
                this.zk.create(constructKey, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (InterruptedException e) {
            LOG.error(e);
        } catch (KeeperException e2) {
            LOG.error(e2);
        }
        registerTask(bSPJobID, str, j, taskAttemptID);
    }

    public void registerTask(BSPJobID bSPJobID, String str, long j, TaskAttemptID taskAttemptID) {
        writeNode(constructKey(bSPJobID, "peers", str + ":" + j), taskAttemptID, false, null);
    }

    @Override // org.apache.hama.bsp.sync.PeerSyncClient
    public String[] getAllPeerNames(TaskAttemptID taskAttemptID) {
        if (this.allPeers == null) {
            TreeMap treeMap = new TreeMap();
            try {
                List children = this.zk.getChildren(constructKey(taskAttemptID.getJobID(), "peers"), this);
                this.allPeers = (String[]) children.toArray(new String[children.size()]);
                TreeMap treeMap2 = new TreeMap();
                for (String str : this.allPeers) {
                    byte[] data = this.zk.getData(constructKey(taskAttemptID.getJobID(), "peers", str), this, (Stat) null);
                    TaskAttemptID taskAttemptID2 = new TaskAttemptID();
                    if (getValueFromBytes(data, taskAttemptID2)) {
                        treeMap2.put(taskAttemptID2, str);
                    }
                }
                for (Map.Entry entry : treeMap2.entrySet()) {
                    TaskAttemptID taskAttemptID3 = (TaskAttemptID) entry.getKey();
                    String str2 = (String) entry.getValue();
                    LOG.debug("TASK mapping from zookeeper: " + taskAttemptID3 + " ID:" + taskAttemptID3.getTaskID().getId() + " : " + str2);
                    treeMap.put(Integer.valueOf(taskAttemptID3.getTaskID().getId()), str2);
                }
                this.allPeers = new String[treeMap.size()];
                int i = 0;
                for (Map.Entry entry2 : treeMap.entrySet()) {
                    int i2 = i;
                    i++;
                    this.allPeers[i2] = (String) entry2.getValue();
                    LOG.debug("TASK mapping from zookeeper: " + entry2.getKey() + " : " + ((String) entry2.getValue()) + " at index " + (i - 1));
                }
            } catch (Exception e) {
                LOG.error(e);
                throw new RuntimeException("All peer names could not be retrieved!");
            }
        }
        return this.allPeers;
    }

    @Override // org.apache.hama.bsp.sync.ZKSyncClient, org.apache.hama.bsp.sync.SyncClient
    public void close() throws IOException {
        try {
            this.zk.close();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    @Override // org.apache.hama.bsp.sync.PeerSyncClient
    public void deregisterFromBarrier(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, String str, long j) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.hama.bsp.sync.PeerSyncClient
    public void stopServer() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.hama.bsp.sync.ZKSyncClient
    public void process(WatchedEvent watchedEvent) {
        synchronized (this.mutex) {
            this.mutex.notify();
        }
    }

    public String getPeerName() {
        return this.peerAddress.getHostName() + ":" + this.peerAddress.getPort();
    }
}
