/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.util.BoundedCompletionService;
import org.apache.hadoop.hbase.util.Pair;

@InterfaceAudience.Private
class ScannerCallableWithReplicas
implements RetryingCallable<Result[]> {
    private final Log LOG = LogFactory.getLog(this.getClass());
    volatile ScannerCallable currentScannerCallable;
    AtomicBoolean replicaSwitched = new AtomicBoolean(false);
    final ClusterConnection cConnection;
    protected final ExecutorService pool;
    protected final int timeBeforeReplicas;
    private final Scan scan;
    private final int retries;
    private Result lastResult;
    private final RpcRetryingCaller<Result[]> caller;
    private final TableName tableName;
    private Configuration conf;
    private int scannerTimeout;
    private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();

    public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, int retries, int scannerTimeout, int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
        this.currentScannerCallable = baseCallable;
        this.cConnection = cConnection;
        this.pool = pool;
        if (timeBeforeReplicas < 0) {
            throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
        }
        this.timeBeforeReplicas = timeBeforeReplicas;
        this.scan = scan;
        this.retries = retries;
        this.tableName = tableName;
        this.conf = conf;
        this.scannerTimeout = scannerTimeout;
        this.caller = caller;
    }

    public void setClose() {
        this.currentScannerCallable.setClose();
    }

    public void setCaching(int caching) {
        this.currentScannerCallable.setCaching(caching);
    }

    public int getCaching() {
        return this.currentScannerCallable.getCaching();
    }

    public HRegionInfo getHRegionInfo() {
        return this.currentScannerCallable.getHRegionInfo();
    }

    @Override
    public Result[] call(int timeout) throws IOException {
        ArrayList<ExecutionException> exceptions;
        block21: {
            Future f;
            if (this.currentScannerCallable != null && this.currentScannerCallable.closed) {
                if (this.LOG.isTraceEnabled()) {
                    this.LOG.trace((Object)("Closing scanner id=" + this.currentScannerCallable.scannerId));
                }
                Result[] r = this.currentScannerCallable.call(timeout);
                this.currentScannerCallable = null;
                return r;
            }
            RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, 0, this.cConnection, this.tableName, this.currentScannerCallable.getRow());
            BoundedCompletionService cs = new BoundedCompletionService((Executor)this.pool, rl.size() * 5);
            exceptions = null;
            int submitted = 0;
            int completed = 0;
            AtomicBoolean done = new AtomicBoolean(false);
            this.replicaSwitched.set(false);
            submitted += this.addCallsForCurrentReplica((BoundedCompletionService<Pair<Result[], ScannerCallable>>)cs, rl);
            try {
                f = cs.poll((long)this.timeBeforeReplicas, TimeUnit.MICROSECONDS);
                if (f != null) {
                    Pair r = (Pair)f.get();
                    if (r != null && r.getSecond() != null) {
                        this.updateCurrentlyServingReplica((ScannerCallable)r.getSecond(), (Result[])r.getFirst(), done, this.pool);
                    }
                    return r == null ? null : (Result[])r.getFirst();
                }
            }
            catch (ExecutionException e) {
                exceptions = new ArrayList<ExecutionException>(rl.size());
                exceptions.add(e);
                ++completed;
            }
            catch (CancellationException e) {
                throw new InterruptedIOException(e.getMessage());
            }
            catch (InterruptedException e) {
                throw new InterruptedIOException(e.getMessage());
            }
            submitted += this.addCallsForOtherReplicas((BoundedCompletionService<Pair<Result[], ScannerCallable>>)cs, rl, 0, rl.size() - 1);
            while (true) {
                if (completed >= submitted) break block21;
                try {
                    f = cs.take();
                    Pair r = (Pair)f.get();
                    if (r != null && r.getSecond() != null) {
                        this.updateCurrentlyServingReplica((ScannerCallable)r.getSecond(), (Result[])r.getFirst(), done, this.pool);
                    }
                    Result[] resultArray = r == null ? null : (Result[])r.getFirst();
                    return resultArray;
                }
                catch (ExecutionException e) {
                    try {
                        if (exceptions == null) {
                            exceptions = new ArrayList(rl.size());
                        }
                        exceptions.add(e);
                        ++completed;
                    }
                    catch (CancellationException e2) {
                        throw new InterruptedIOException(e2.getMessage());
                    }
                    catch (InterruptedException e3) {
                        throw new InterruptedIOException(e3.getMessage());
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                }
            }
            finally {
                cs.cancelAll(true);
            }
        }
        if (exceptions != null && !exceptions.isEmpty()) {
            RpcRetryingCallerWithReadReplicas.throwEnrichedException((ExecutionException)exceptions.get(0), this.retries);
        }
        return null;
    }

    private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, AtomicBoolean done, ExecutorService pool) {
        if (done.compareAndSet(false, true)) {
            if (this.currentScannerCallable != scanner) {
                this.replicaSwitched.set(true);
            }
            this.currentScannerCallable = scanner;
            if (result != null && result.length != 0) {
                this.lastResult = result[result.length - 1];
            }
            if (this.LOG.isTraceEnabled()) {
                this.LOG.trace((Object)("Setting current scanner as id=" + this.currentScannerCallable.scannerId + " associated with replica=" + this.currentScannerCallable.getHRegionInfo().getReplicaId()));
            }
            this.outstandingCallables.remove(scanner);
            for (ScannerCallable s : this.outstandingCallables) {
                if (this.LOG.isTraceEnabled()) {
                    this.LOG.trace((Object)("Closing scanner id=" + s.scannerId + ", replica=" + s.getHRegionInfo().getRegionId() + " because slow and replica=" + this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded"));
                }
                s.setClose();
                RetryingRPC r = new RetryingRPC(s);
                pool.submit(r);
            }
            this.outstandingCallables.clear();
        }
    }

    public boolean switchedToADifferentReplica() {
        return this.replicaSwitched.get();
    }

    private int addCallsForCurrentReplica(BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
        RetryingRPC retryingOnReplica = new RetryingRPC(this.currentScannerCallable);
        this.outstandingCallables.add(this.currentScannerCallable);
        cs.submit((Callable)retryingOnReplica);
        return 1;
    }

    private int addCallsForOtherReplicas(BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, int min, int max) {
        if (this.scan.getConsistency() == Consistency.STRONG) {
            return 0;
        }
        for (int id = min; id <= max; ++id) {
            if (this.currentScannerCallable.getHRegionInfo().getReplicaId() == id) continue;
            ScannerCallable s = this.currentScannerCallable.getScannerCallableForReplica(id);
            if (this.lastResult != null) {
                s.getScan().setStartRow(this.lastResult.getRow());
            }
            this.outstandingCallables.add(s);
            RetryingRPC retryingOnReplica = new RetryingRPC(s);
            cs.submit((Callable)retryingOnReplica);
        }
        return max - min + 1;
    }

    @Override
    public void prepare(boolean reload) throws IOException {
    }

    @Override
    public void throwable(Throwable t, boolean retrying) {
        this.currentScannerCallable.throwable(t, retrying);
    }

    @Override
    public String getExceptionMessageAdditionalDetail() {
        return this.currentScannerCallable.getExceptionMessageAdditionalDetail();
    }

    @Override
    public long sleep(long pause, int tries) {
        return this.currentScannerCallable.sleep(pause, tries);
    }

    class RetryingRPC
    implements Callable<Pair<Result[], ScannerCallable>> {
        final ScannerCallable callable;

        RetryingRPC(ScannerCallable callable) {
            this.callable = callable;
        }

        @Override
        public Pair<Result[], ScannerCallable> call() throws IOException {
            RpcRetryingCaller<Result[]> caller = ScannerCallableWithReplicas.this.caller;
            if (ScannerCallableWithReplicas.this.scan.getConsistency() == Consistency.TIMELINE) {
                caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).newCaller();
            }
            Result[] res = caller.callWithRetries(this.callable, ScannerCallableWithReplicas.this.scannerTimeout);
            return new Pair((Object)res, (Object)this.callable);
        }
    }
}

