package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Service;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.ipc.RemoteException;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.class */
public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
    private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
    private HConnection conn;
    private Configuration conf;
    private long sleepForRetries;
    private int maxRetriesMultiplier;
    private int socketTimeoutMultiplier;
    private MetricsSource metrics;
    private ReplicationSinkManager replicationSinkMgr;
    private boolean peersSelected = false;
    private ThreadPoolExecutor exec;
    private int maxThreads;

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint$Replicator.class */
    public class Replicator implements Callable<Integer> {
        private List<WAL.Entry> entries;
        private int ordinal;

        public Replicator(List<WAL.Entry> list, int i) {
            this.entries = list;
            this.ordinal = i;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws IOException {
            ReplicationSinkManager.SinkPeer sinkPeer = null;
            try {
                sinkPeer = HBaseInterClusterReplicationEndpoint.this.replicationSinkMgr.getReplicationSink();
                ReplicationProtbufUtil.replicateWALEntry(sinkPeer.getRegionServer(), (WAL.Entry[]) this.entries.toArray(new WAL.Entry[this.entries.size()]));
                HBaseInterClusterReplicationEndpoint.this.replicationSinkMgr.reportSinkSuccess(sinkPeer);
                return Integer.valueOf(this.ordinal);
            } catch (IOException e) {
                if (sinkPeer != null) {
                    HBaseInterClusterReplicationEndpoint.this.replicationSinkMgr.reportBadSink(sinkPeer);
                }
                throw e;
            }
        }
    }

    @Override // org.apache.hadoop.hbase.replication.BaseReplicationEndpoint, org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = HBaseConfiguration.create(this.ctx.getConfiguration());
        decorateConf();
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", this.maxRetriesMultiplier);
        this.conn = HConnectionManager.createConnection(this.conf);
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.metrics = context.getMetrics();
        this.replicationSinkMgr = new ReplicationSinkManager(this.conn, this.ctx.getPeerId(), this, this.conf);
        this.maxThreads = this.conf.getInt("hbase.replication.source.maxthreads", 10);
        this.exec = new ThreadPoolExecutor(this.maxThreads, this.maxThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.exec.allowCoreThreadTimeOut(true);
    }

    private void decorateConf() {
        String str = this.conf.get("hbase.replication.rpc.codec");
        if (StringUtils.isNotEmpty(str)) {
            this.conf.set("hbase.client.rpc.codec", str);
        }
    }

    private void connectToPeers() {
        getRegionServers();
        int i = 1;
        while (isRunning() && this.replicationSinkMgr.getNumSinks() == 0) {
            this.replicationSinkMgr.chooseSinks();
            if (isRunning() && this.replicationSinkMgr.getNumSinks() == 0 && sleepForRetries("Waiting for peers", i)) {
                i++;
            }
        }
    }

    protected boolean sleepForRetries(String str, int i) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace(str + ", sleeping " + this.sleepForRetries + " times " + i);
            }
            Thread.sleep(this.sleepForRetries * i);
        } catch (InterruptedException e) {
            LOG.debug("Interrupted while sleeping between retries");
        }
        return i < this.maxRetriesMultiplier;
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.exec);
        List<WAL.Entry> entries = replicateContext.getEntries();
        String walGroupId = replicateContext.getWalGroupId();
        int i = 1;
        int i2 = 0;
        if (!this.peersSelected && isRunning()) {
            connectToPeers();
            this.peersSelected = true;
        }
        int numSinks = this.replicationSinkMgr.getNumSinks();
        if (numSinks == 0) {
            LOG.warn("No replication sinks found, returning without replicating. The source should retry with the same set of edits.");
            return false;
        }
        int min = Math.min(Math.min(this.maxThreads, (entries.size() / 100) + 1), numSinks);
        ArrayList arrayList = new ArrayList(min);
        if (min == 1) {
            arrayList.add(entries);
        } else {
            for (int i3 = 0; i3 < min; i3++) {
                arrayList.add(new ArrayList((entries.size() / min) + 1));
            }
            for (WAL.Entry entry : entries) {
                ((List) arrayList.get(Math.abs(Bytes.hashCode(entry.getKey().getEncodedRegionName()) % min))).add(entry);
            }
        }
        while (isRunning()) {
            if (isPeerEnabled()) {
                try {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Replicating " + entries.size() + " entries of total size " + replicateContext.getSize());
                    }
                    int i4 = 0;
                    for (int i5 = 0; i5 < arrayList.size(); i5++) {
                        if (!((List) arrayList.get(i5)).isEmpty()) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("Submitting " + ((List) arrayList.get(i5)).size() + " entries of total size " + replicateContext.getSize());
                            }
                            executorCompletionService.submit(createReplicator((List) arrayList.get(i5), i5));
                            i4++;
                        }
                    }
                    IOException iOException = null;
                    for (int i6 = 0; i6 < i4; i6++) {
                        try {
                            int intValue = ((Integer) executorCompletionService.take().get()).intValue();
                            int size = ((List) arrayList.get(intValue)).size();
                            arrayList.set(intValue, Collections.emptyList());
                            i2 += size;
                        } catch (InterruptedException e) {
                            iOException = new IOException(e);
                        } catch (ExecutionException e2) {
                            iOException = (IOException) e2.getCause();
                        }
                    }
                    if (iOException != null) {
                        throw iOException;
                    }
                    if (i2 != entries.size()) {
                        LOG.warn("The number of edits replicated is different from the number received, failing for now.");
                        return false;
                    }
                    this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
                    return true;
                } catch (IOException e3) {
                    this.metrics.refreshAgeOfLastShippedOp(walGroupId);
                    if (e3 instanceof RemoteException) {
                        IOException unwrapRemoteException = e3.unwrapRemoteException();
                        LOG.warn("Can't replicate because of an error on the remote cluster: ", unwrapRemoteException);
                        if ((unwrapRemoteException instanceof TableNotFoundException) && sleepForRetries("A table is missing in the peer cluster. Replication cannot proceed without losing data.", i)) {
                            i++;
                        }
                    } else if (e3 instanceof SocketTimeoutException) {
                        sleepForRetries("Encountered a SocketTimeoutException. Since the call to the remote cluster timed out, which is usually caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier);
                    } else if (e3 instanceof ConnectException) {
                        LOG.warn("Peer is unavailable, rechecking all sinks: ", e3);
                        this.replicationSinkMgr.chooseSinks();
                    } else {
                        LOG.warn("Can't replicate because of a local or network error: ", e3);
                    }
                    if (sleepForRetries("Since we are unable to replicate", i)) {
                        i++;
                    }
                }
            } else if (sleepForRetries("Replication is disabled", i)) {
                i++;
            }
        }
        return false;
    }

    protected boolean isPeerEnabled() {
        return this.ctx.getReplicationPeer().getPeerState() == ReplicationPeer.PeerState.ENABLED;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint
    public void doStop() {
        disconnect();
        if (this.conn != null) {
            try {
                this.conn.close();
                this.conn = null;
            } catch (IOException e) {
                LOG.warn("Failed to close the connection");
            }
        }
        this.exec.shutdownNow();
        notifyStopped();
    }

    public Service.State stopAndWait() {
        doStop();
        return super.stopAndWait();
    }

    @VisibleForTesting
    protected Replicator createReplicator(List<WAL.Entry> list, int i) {
        return new Replicator(list, i);
    }
}
