/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AsyncProcess;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HTableMultiplexer {
    private static final Log LOG = LogFactory.getLog((String)HTableMultiplexer.class.getName());
    public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = "hbase.tablemultiplexer.flush.period.ms";
    public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
    public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = "hbase.client.max.retries.in.queue";
    private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = new ConcurrentHashMap<HRegionLocation, FlushWorker>();
    private final Configuration workerConf;
    private final HConnection conn;
    private final ExecutorService pool;
    private final int retryNum;
    private final int perRegionServerBufferQueueSize;
    private final int maxKeyValueSize;
    private final ScheduledExecutorService executor;
    private final long flushPeriod;

    public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) throws IOException {
        this.conn = HConnectionManager.createConnection(conf);
        this.pool = HTable.getDefaultExecutor(conf);
        this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
        this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
        this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
        this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100L);
        int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
        this.executor = Executors.newScheduledThreadPool(initThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
        this.workerConf = HBaseConfiguration.create((Configuration)conf);
        this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
    }

    public boolean put(TableName tableName, Put put) {
        return this.put(tableName, put, this.retryNum);
    }

    public List<Put> put(TableName tableName, List<Put> puts) {
        if (puts == null) {
            return null;
        }
        ArrayList<Put> failedPuts = null;
        for (Put put : puts) {
            boolean result = this.put(tableName, put, this.retryNum);
            if (result) continue;
            if (failedPuts == null) {
                failedPuts = new ArrayList<Put>();
            }
            failedPuts.add(put);
        }
        return failedPuts;
    }

    @Deprecated
    public List<Put> put(byte[] tableName, List<Put> puts) {
        return this.put(TableName.valueOf((byte[])tableName), puts);
    }

    public boolean put(TableName tableName, Put put, int retry) {
        if (retry <= 0) {
            return false;
        }
        try {
            HTable.validatePut(put, this.maxKeyValueSize);
            HRegionLocation loc = this.conn.getRegionLocation(tableName, put.getRow(), false);
            if (loc != null) {
                LinkedBlockingQueue<PutStatus> queue = this.getQueue(loc);
                PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
                return queue.offer(s);
            }
        }
        catch (IOException e) {
            LOG.debug((Object)("Cannot process the put " + put), (Throwable)e);
        }
        return false;
    }

    @Deprecated
    public boolean put(byte[] tableName, Put put, int retry) {
        return this.put(TableName.valueOf((byte[])tableName), put, retry);
    }

    @Deprecated
    public boolean put(byte[] tableName, Put put) {
        return this.put(TableName.valueOf((byte[])tableName), put);
    }

    public HTableMultiplexerStatus getHTableMultiplexerStatus() {
        return new HTableMultiplexerStatus(this.serverToFlushWorkerMap);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
        FlushWorker worker = this.serverToFlushWorkerMap.get(addr);
        if (worker == null) {
            Map<HRegionLocation, FlushWorker> map = this.serverToFlushWorkerMap;
            synchronized (map) {
                worker = this.serverToFlushWorkerMap.get(addr);
                if (worker == null) {
                    worker = new FlushWorker(this.workerConf, this.conn, addr, this, this.perRegionServerBufferQueueSize, this.pool, this.executor);
                    this.serverToFlushWorkerMap.put(addr, worker);
                    this.executor.scheduleAtFixedRate(worker, this.flushPeriod, this.flushPeriod, TimeUnit.MILLISECONDS);
                }
            }
        }
        return worker.getQueue();
    }

    private static class FlushWorker
    implements Runnable,
    AsyncProcess.AsyncProcessCallback<Object> {
        private final HRegionLocation addr;
        private final LinkedBlockingQueue<PutStatus> queue;
        private final HTableMultiplexer multiplexer;
        private final AtomicLong totalFailedPutCount = new AtomicLong(0L);
        private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
        private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
        private final AtomicLong maxLatency = new AtomicLong(0L);
        private final AsyncProcess<Object> ap;
        private final List<Object> results = new ArrayList<Object>();
        private final List<PutStatus> processingList = new ArrayList<PutStatus>();
        private final ScheduledExecutorService executor;
        private final int maxRetryInQueue;
        private final AtomicInteger retryInQueue = new AtomicInteger(0);

        public FlushWorker(Configuration conf, HConnection conn, HRegionLocation addr, HTableMultiplexer multiplexer, int perRegionServerBufferQueueSize, ExecutorService pool, ScheduledExecutorService executor) {
            this.addr = addr;
            this.multiplexer = multiplexer;
            this.queue = new LinkedBlockingQueue(perRegionServerBufferQueueSize);
            RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
            RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
            this.ap = new AsyncProcess<Object>(conn, null, pool, this, conf, rpcCallerFactory, rpcControllerFactory);
            this.executor = executor;
            this.maxRetryInQueue = conf.getInt(HTableMultiplexer.TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
        }

        protected LinkedBlockingQueue<PutStatus> getQueue() {
            return this.queue;
        }

        public long getTotalFailedCount() {
            return this.totalFailedPutCount.get();
        }

        public long getTotalBufferedCount() {
            return this.queue.size() + this.currentProcessingCount.get();
        }

        public AtomicAverageCounter getAverageLatencyCounter() {
            return this.averageLatency;
        }

        public long getMaxLatency() {
            return this.maxLatency.getAndSet(0L);
        }

        private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
            final int retryCount = ps.retryCount - 1;
            if (retryCount <= 0) {
                return false;
            }
            int cnt = this.retryInQueue.incrementAndGet();
            if (cnt > this.maxRetryInQueue) {
                this.retryInQueue.decrementAndGet();
                return false;
            }
            final Put failedPut = ps.put;
            final TableName tableName = ps.regionInfo.getTable();
            this.ap.hConnection.clearCaches(oldLoc.getServerName());
            long delayMs = ConnectionUtils.getPauseTime(this.multiplexer.flushPeriod, this.multiplexer.retryNum - retryCount - 1);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("resubmitting after " + delayMs + "ms: " + retryCount));
            }
            this.executor.schedule(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    boolean succ = false;
                    try {
                        succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount);
                    }
                    finally {
                        FlushWorker.this.retryInQueue.decrementAndGet();
                        if (!succ) {
                            FlushWorker.this.totalFailedPutCount.incrementAndGet();
                        }
                    }
                }
            }, delayMs, TimeUnit.MILLISECONDS);
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block21: {
                int failedCount = 0;
                try {
                    ArrayList<PutStatus> failed;
                    long start;
                    block22: {
                        start = EnvironmentEdgeManager.currentTimeMillis();
                        this.processingList.clear();
                        failedCount = 0;
                        this.queue.drainTo(this.processingList);
                        this.currentProcessingCount.set(this.processingList.size());
                        if (this.processingList.size() <= 0) break block21;
                        this.results.clear();
                        ArrayList<Action<Row>> retainedActions = new ArrayList<Action<Row>>(this.processingList.size());
                        MultiAction actions = new MultiAction();
                        for (int i = 0; i < this.processingList.size(); ++i) {
                            PutStatus putStatus = this.processingList.get(i);
                            Action action = new Action(putStatus.put, i);
                            actions.add(putStatus.regionInfo.getRegionName(), action);
                            retainedActions.add(action);
                            this.results.add(null);
                        }
                        failed = null;
                        Map<HRegionLocation, MultiAction<Row>> actionsByServer = Collections.singletonMap(this.addr, actions);
                        try {
                            HConnectionManager.ServerErrorTracker errorsByServer = new HConnectionManager.ServerErrorTracker(1L, 10);
                            this.ap.sendMultiAction(retainedActions, actionsByServer, 10, errorsByServer);
                            this.ap.waitUntilDone();
                            if (!this.ap.hasError()) break block22;
                            LOG.debug((Object)("Caught some exceptions when flushing puts to region server " + this.addr.getHostnamePort()), (Throwable)this.ap.getErrors());
                        }
                        catch (Throwable throwable) {
                            for (int i = 0; i < this.results.size(); ++i) {
                                if (this.results.get(i) instanceof Result) {
                                    --failedCount;
                                    continue;
                                }
                                if (failed == null) {
                                    failed = new ArrayList<PutStatus>();
                                }
                                failed.add(this.processingList.get(i));
                            }
                            throw throwable;
                        }
                    }
                    for (int i = 0; i < this.results.size(); ++i) {
                        if (this.results.get(i) instanceof Result) {
                            --failedCount;
                            continue;
                        }
                        if (failed == null) {
                            failed = new ArrayList();
                        }
                        failed.add(this.processingList.get(i));
                    }
                    if (failed != null) {
                        for (PutStatus putStatus : failed) {
                            if (!this.resubmitFailedPut(putStatus, this.addr)) continue;
                            --failedCount;
                        }
                        this.totalFailedPutCount.addAndGet(failedCount);
                    }
                    long elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
                    this.averageLatency.add(elapsed);
                    if (elapsed > this.maxLatency.get()) {
                        this.maxLatency.set(elapsed);
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Processed " + this.currentProcessingCount + " put requests for " + this.addr.getHostnamePort() + " and " + failedCount + " failed" + ", latency for this send: " + elapsed));
                    }
                    this.currentProcessingCount.set(0);
                }
                catch (RuntimeException e) {
                    LOG.debug((Object)("Caught some exceptions " + e + " when flushing puts to region server " + this.addr.getHostnamePort()), (Throwable)e);
                }
                catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    LOG.debug((Object)("Caught some exceptions " + e + " when flushing puts to region server " + this.addr.getHostnamePort()), (Throwable)e);
                }
                finally {
                    this.totalFailedPutCount.addAndGet(failedCount);
                }
            }
        }

        @Override
        public void success(int originalIndex, byte[] region, Row row, Object result) {
            if (this.results == null || originalIndex >= this.results.size()) {
                return;
            }
            this.results.set(originalIndex, result);
        }

        @Override
        public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) {
            return false;
        }

        @Override
        public boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception) {
            return false;
        }
    }

    private static class AtomicAverageCounter {
        private long sum = 0L;
        private int count = 0;

        public synchronized long getAndReset() {
            long result = this.get();
            this.reset();
            return result;
        }

        public synchronized long get() {
            if (this.count == 0) {
                return 0L;
            }
            return this.sum / (long)this.count;
        }

        public synchronized AbstractMap.SimpleEntry<Long, Integer> getComponents() {
            return new AbstractMap.SimpleEntry<Long, Integer>(this.sum, this.count);
        }

        public synchronized void reset() {
            this.sum = 0L;
            this.count = 0;
        }

        public synchronized void add(long value) {
            this.sum += value;
            ++this.count;
        }
    }

    private static class PutStatus {
        public final HRegionInfo regionInfo;
        public final Put put;
        public final int retryCount;

        public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
            this.regionInfo = regionInfo;
            this.put = put;
            this.retryCount = retryCount;
        }
    }

    public static class HTableMultiplexerStatus {
        private long totalFailedPutCounter = 0L;
        private long totalBufferedPutCounter = 0L;
        private long maxLatency = 0L;
        private long overallAverageLatency = 0L;
        private Map<String, Long> serverToFailedCounterMap;
        private Map<String, Long> serverToBufferedCounterMap = new HashMap<String, Long>();
        private Map<String, Long> serverToAverageLatencyMap;
        private Map<String, Long> serverToMaxLatencyMap;

        public HTableMultiplexerStatus(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
            this.serverToFailedCounterMap = new HashMap<String, Long>();
            this.serverToAverageLatencyMap = new HashMap<String, Long>();
            this.serverToMaxLatencyMap = new HashMap<String, Long>();
            this.initialize(serverToFlushWorkerMap);
        }

        private void initialize(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
            if (serverToFlushWorkerMap == null) {
                return;
            }
            long averageCalcSum = 0L;
            int averageCalcCount = 0;
            for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap.entrySet()) {
                HRegionLocation addr = entry.getKey();
                FlushWorker worker = entry.getValue();
                long bufferedCounter = worker.getTotalBufferedCount();
                long failedCounter = worker.getTotalFailedCount();
                long serverMaxLatency = worker.getMaxLatency();
                AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
                AbstractMap.SimpleEntry<Long, Integer> averageComponents = averageCounter.getComponents();
                long serverAvgLatency = averageCounter.getAndReset();
                this.totalBufferedPutCounter += bufferedCounter;
                this.totalFailedPutCounter += failedCounter;
                if (serverMaxLatency > this.maxLatency) {
                    this.maxLatency = serverMaxLatency;
                }
                averageCalcSum += averageComponents.getKey().longValue();
                averageCalcCount += averageComponents.getValue().intValue();
                this.serverToBufferedCounterMap.put(addr.getHostnamePort(), bufferedCounter);
                this.serverToFailedCounterMap.put(addr.getHostnamePort(), failedCounter);
                this.serverToAverageLatencyMap.put(addr.getHostnamePort(), serverAvgLatency);
                this.serverToMaxLatencyMap.put(addr.getHostnamePort(), serverMaxLatency);
            }
            this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum / (long)averageCalcCount : 0L;
        }

        public long getTotalBufferedCounter() {
            return this.totalBufferedPutCounter;
        }

        public long getTotalFailedCounter() {
            return this.totalFailedPutCounter;
        }

        public long getMaxLatency() {
            return this.maxLatency;
        }

        public long getOverallAverageLatency() {
            return this.overallAverageLatency;
        }

        public Map<String, Long> getBufferedCounterForEachRegionServer() {
            return this.serverToBufferedCounterMap;
        }

        public Map<String, Long> getFailedCounterForEachRegionServer() {
            return this.serverToFailedCounterMap;
        }

        public Map<String, Long> getMaxLatencyForEachRegionServer() {
            return this.serverToMaxLatencyMap;
        }

        public Map<String, Long> getAverageLatencyForEachRegionServer() {
            return this.serverToAverageLatencyMap;
        }
    }
}

