package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;

/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.class */
public class ReplicationSourceManager {
    private static final Log LOG = LogFactory.getLog(ReplicationSourceManager.class);
    private final AtomicBoolean replicating;
    private final ReplicationZookeeper zkHelper;
    private final Stoppable stopper;
    private final Configuration conf;
    private final FileSystem fs;
    private Path latestPath;
    private final List<String> otherRegionServers;
    private final Path logDir;
    private final Path oldLogDir;
    private final long sleepBeforeFailover;
    private final ThreadPoolExecutor executor;
    private final List<ReplicationSourceInterface> sources = new ArrayList();
    private final SortedSet<String> hlogs = new TreeSet();
    private final List<ReplicationSourceInterface> oldsources = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager$NodeFailoverWorker.class */
    public class NodeFailoverWorker extends Thread {
        private String rsZnode;

        public NodeFailoverWorker(String str) {
            super("Failover-for-" + str);
            this.rsZnode = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            ReplicationSourceInterface replicationSource;
            try {
                Thread.sleep(ReplicationSourceManager.this.sleepBeforeFailover);
            } catch (InterruptedException e) {
                ReplicationSourceManager.LOG.warn("Interrupted while waiting before transferring a queue.");
                Thread.currentThread().interrupt();
            }
            if (ReplicationSourceManager.this.stopper.isStopped()) {
                ReplicationSourceManager.LOG.info("Not transferring queue since we are shutting down");
                return;
            }
            if (ReplicationSourceManager.this.zkHelper.lockOtherRS(this.rsZnode)) {
                ReplicationSourceManager.LOG.info("Moving " + this.rsZnode + "'s hlogs to my queue");
                SortedMap<String, SortedSet<String>> copyQueuesFromRS = ReplicationSourceManager.this.zkHelper.copyQueuesFromRS(this.rsZnode);
                ReplicationSourceManager.this.zkHelper.deleteRsQueues(this.rsZnode);
                if (copyQueuesFromRS == null || copyQueuesFromRS.size() == 0) {
                    return;
                }
                for (Map.Entry<String, SortedSet<String>> entry : copyQueuesFromRS.entrySet()) {
                    try {
                        replicationSource = ReplicationSourceManager.this.getReplicationSource(ReplicationSourceManager.this.conf, ReplicationSourceManager.this.fs, ReplicationSourceManager.this, ReplicationSourceManager.this.stopper, ReplicationSourceManager.this.replicating, entry.getKey());
                    } catch (IOException e2) {
                        ReplicationSourceManager.LOG.error("Failed creating a source", e2);
                    }
                    if (!ReplicationSourceManager.this.zkHelper.getPeerClusters().containsKey(replicationSource.getPeerClusterId())) {
                        replicationSource.terminate("Recovered queue doesn't belong to any current peer");
                        return;
                    }
                    ReplicationSourceManager.this.oldsources.add(replicationSource);
                    Iterator<String> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        replicationSource.enqueueLog(new Path(ReplicationSourceManager.this.oldLogDir, it.next()));
                    }
                    replicationSource.setSourceEnabled(true);
                    replicationSource.startup();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager$OtherRegionServerWatcher.class */
    public class OtherRegionServerWatcher extends ZooKeeperListener {
        public OtherRegionServerWatcher(ZooKeeperWatcher zooKeeperWatcher) {
            super(zooKeeperWatcher);
        }

        @Override // org.apache.hadoop.hbase.zookeeper.ZooKeeperListener
        public void nodeCreated(String str) {
            refreshRegionServersList(str);
        }

        @Override // org.apache.hadoop.hbase.zookeeper.ZooKeeperListener
        public void nodeDeleted(String str) {
            if (!ReplicationSourceManager.this.stopper.isStopped() && refreshRegionServersList(str)) {
                ReplicationSourceManager.LOG.info(str + " znode expired, trying to lock it");
                ReplicationSourceManager replicationSourceManager = ReplicationSourceManager.this;
                ReplicationZookeeper unused = ReplicationSourceManager.this.zkHelper;
                replicationSourceManager.transferQueues(ReplicationZookeeper.getZNodeName(str));
            }
        }

        @Override // org.apache.hadoop.hbase.zookeeper.ZooKeeperListener
        public void nodeChildrenChanged(String str) {
            if (ReplicationSourceManager.this.stopper.isStopped()) {
                return;
            }
            refreshRegionServersList(str);
        }

        private boolean refreshRegionServersList(String str) {
            List<String> registeredRegionServers;
            if (!str.startsWith(ReplicationSourceManager.this.zkHelper.getZookeeperWatcher().rsZNode) || (registeredRegionServers = ReplicationSourceManager.this.zkHelper.getRegisteredRegionServers()) == null) {
                return false;
            }
            synchronized (ReplicationSourceManager.this.otherRegionServers) {
                ReplicationSourceManager.this.otherRegionServers.clear();
                ReplicationSourceManager.this.otherRegionServers.addAll(registeredRegionServers);
            }
            return true;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager$PeersWatcher.class */
    public class PeersWatcher extends ZooKeeperListener {
        public PeersWatcher(ZooKeeperWatcher zooKeeperWatcher) {
            super(zooKeeperWatcher);
        }

        @Override // org.apache.hadoop.hbase.zookeeper.ZooKeeperListener
        public void nodeDeleted(String str) {
            if (refreshPeersList(str) == null) {
                return;
            }
            ReplicationZookeeper unused = ReplicationSourceManager.this.zkHelper;
            ReplicationSourceManager.this.removePeer(ReplicationZookeeper.getZNodeName(str));
        }

        @Override // org.apache.hadoop.hbase.zookeeper.ZooKeeperListener
        public void nodeChildrenChanged(String str) {
            List<String> refreshPeersList = refreshPeersList(str);
            if (refreshPeersList == null) {
                return;
            }
            for (String str2 : refreshPeersList) {
                try {
                    if (ReplicationSourceManager.this.zkHelper.connectToPeer(str2)) {
                        ReplicationSourceManager.this.addSource(str2);
                    }
                } catch (IOException e) {
                    ReplicationSourceManager.LOG.error("Error while adding a new peer", e);
                } catch (KeeperException e2) {
                    ReplicationSourceManager.LOG.error("Error while adding a new peer", e2);
                }
            }
        }

        private List<String> refreshPeersList(String str) {
            if (str.startsWith(ReplicationSourceManager.this.zkHelper.getPeersZNode())) {
                return ReplicationSourceManager.this.zkHelper.listPeersIdsAndWatch();
            }
            return null;
        }
    }

    public ReplicationSourceManager(ReplicationZookeeper replicationZookeeper, Configuration configuration, Stoppable stoppable, FileSystem fileSystem, AtomicBoolean atomicBoolean, Path path, Path path2) {
        this.replicating = atomicBoolean;
        this.zkHelper = replicationZookeeper;
        this.stopper = stoppable;
        this.conf = configuration;
        this.fs = fileSystem;
        this.logDir = path;
        this.oldLogDir = path2;
        this.sleepBeforeFailover = configuration.getLong("replication.sleep.before.failover", 2000L);
        this.zkHelper.registerRegionServerListener(new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
        List<String> registeredRegionServers = this.zkHelper.getRegisteredRegionServers();
        this.zkHelper.registerRegionServerListener(new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
        this.zkHelper.listPeersIdsAndWatch();
        this.otherRegionServers = registeredRegionServers == null ? new ArrayList<>() : registeredRegionServers;
        int i = configuration.getInt("replication.executor.workers", 1);
        this.executor = new ThreadPoolExecutor(i, i, 100L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
        threadFactoryBuilder.setNameFormat("ReplicationExecutor-%d");
        this.executor.setThreadFactory(threadFactoryBuilder.build());
    }

    public void logPositionAndCleanOldLogs(Path path, String str, long j, boolean z) {
        String name = path.getName();
        LOG.info("Going to report log #" + name + " for position " + j + " in " + path);
        this.zkHelper.writeReplicationStatus(name.toString(), str, j);
        synchronized (this.hlogs) {
            if (!z) {
                if (this.hlogs.first() != name) {
                    SortedSet<String> headSet = this.hlogs.headSet(name);
                    LOG.info("Removing " + headSet.size() + " logs in the list: " + headSet);
                    Iterator<String> it = headSet.iterator();
                    while (it.hasNext()) {
                        this.zkHelper.removeLogFromList(it.next().toString(), str);
                    }
                    headSet.clear();
                }
            }
        }
    }

    public void init() throws IOException {
        Iterator<String> it = this.zkHelper.getPeerClusters().keySet().iterator();
        while (it.hasNext()) {
            addSource(it.next());
        }
        List<String> listOfReplicators = this.zkHelper.getListOfReplicators();
        if (listOfReplicators == null || listOfReplicators.size() == 0) {
            return;
        }
        synchronized (this.otherRegionServers) {
            LOG.info("Current list of replicators: " + listOfReplicators + " other RSs: " + this.otherRegionServers);
        }
        for (String str : listOfReplicators) {
            synchronized (this.otherRegionServers) {
                if (!this.otherRegionServers.contains(str)) {
                    transferQueues(str);
                }
            }
        }
    }

    public ReplicationSourceInterface addSource(String str) throws IOException {
        ReplicationSourceInterface replicationSource = getReplicationSource(this.conf, this.fs, this, this.stopper, this.replicating, str);
        replicationSource.setSourceEnabled(true);
        synchronized (this.hlogs) {
            this.sources.add(replicationSource);
            if (this.hlogs.size() > 0) {
                this.zkHelper.addLogToList(this.hlogs.last(), this.sources.get(0).getPeerClusterZnode());
                replicationSource.enqueueLog(this.latestPath);
            }
        }
        replicationSource.startup();
        return replicationSource;
    }

    public void join() {
        this.executor.shutdown();
        if (this.sources.size() == 0) {
            this.zkHelper.deleteOwnRSZNode();
        }
        Iterator<ReplicationSourceInterface> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().terminate("Region server is closing");
        }
    }

    protected SortedSet<String> getHLogs() {
        return new TreeSet((SortedSet) this.hlogs);
    }

    public List<ReplicationSourceInterface> getSources() {
        return this.sources;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void logRolled(Path path) {
        if (!this.replicating.get()) {
            LOG.warn("Replication stopped, won't add new log");
            return;
        }
        synchronized (this.hlogs) {
            if (this.sources.size() > 0) {
                this.zkHelper.addLogToList(path.getName(), this.sources.get(0).getPeerClusterZnode());
            } else {
                this.hlogs.clear();
            }
            this.hlogs.add(path.getName());
        }
        this.latestPath = path;
        Iterator<ReplicationSourceInterface> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().enqueueLog(path);
        }
    }

    public ReplicationZookeeper getRepZkWrapper() {
        return this.zkHelper;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v9, types: [org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface] */
    public ReplicationSourceInterface getReplicationSource(Configuration configuration, FileSystem fileSystem, ReplicationSourceManager replicationSourceManager, Stoppable stoppable, AtomicBoolean atomicBoolean, String str) throws IOException {
        ReplicationSource replicationSource;
        try {
            replicationSource = (ReplicationSourceInterface) Class.forName(configuration.get("replication.replicationsource.implementation", ReplicationSource.class.getCanonicalName())).newInstance();
        } catch (Exception e) {
            LOG.warn("Passed replication source implemention throws errors, defaulting to ReplicationSource", e);
            replicationSource = new ReplicationSource();
        }
        replicationSource.init(configuration, fileSystem, replicationSourceManager, stoppable, atomicBoolean, str);
        return replicationSource;
    }

    public void transferQueues(String str) {
        try {
            this.executor.execute(new NodeFailoverWorker(str));
        } catch (RejectedExecutionException e) {
            LOG.info("Cancelling the transfer of " + str + " because of " + e.getMessage());
        }
    }

    public void closeRecoveredQueue(ReplicationSourceInterface replicationSourceInterface) {
        LOG.info("Done with the recovered queue " + replicationSourceInterface.getPeerClusterZnode());
        this.oldsources.remove(replicationSourceInterface);
        this.zkHelper.deleteSource(replicationSourceInterface.getPeerClusterZnode(), false);
    }

    public void removePeer(String str) {
        LOG.info("Closing the following queue " + str + ", currently have " + this.sources.size() + " and another " + this.oldsources.size() + " that were recovered");
        ReplicationSourceInterface replicationSourceInterface = null;
        ArrayList arrayList = new ArrayList();
        for (ReplicationSourceInterface replicationSourceInterface2 : this.oldsources) {
            if (str.equals(replicationSourceInterface2.getPeerClusterId())) {
                arrayList.add(replicationSourceInterface2);
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            closeRecoveredQueue((ReplicationSourceInterface) it.next());
        }
        LOG.info("Number of deleted recovered sources for " + str + ": " + arrayList.size());
        Iterator<ReplicationSourceInterface> it2 = this.sources.iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            ReplicationSourceInterface next = it2.next();
            if (str.equals(next.getPeerClusterId())) {
                replicationSourceInterface = next;
                break;
            }
        }
        if (replicationSourceInterface == null) {
            LOG.error("The queue we wanted to close is missing " + str);
            return;
        }
        replicationSourceInterface.terminate("Replication stream was removed by a user");
        this.sources.remove(replicationSourceInterface);
        this.zkHelper.deleteSource(str, true);
    }

    public Path getOldLogDir() {
        return this.oldLogDir;
    }

    public Path getLogDir() {
        return this.logDir;
    }

    public FileSystem getFs() {
        return this.fs;
    }
}
