/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MultipleIOException;

@InterfaceAudience.Private
public class HLogSplitter {
    static final Log LOG = LogFactory.getLog(HLogSplitter.class);
    protected final Path rootDir;
    protected final FileSystem fs;
    protected final Configuration conf;
    OutputSink outputSink;
    EntryBuffers entryBuffers;
    private Set<TableName> disablingOrDisabledTables = new HashSet<TableName>();
    private ZooKeeperWatcher watcher;
    protected AtomicReference<Throwable> thrown = new AtomicReference();
    final Object dataAvailable = new Object();
    private MonitoredTask status;
    protected final LastSequenceId sequenceIdChecker;
    protected boolean distributedLogReplay;
    protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
    protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<String, Map<byte[], Long>>();
    protected String failedServerName = "";
    private final int numWriterThreads;
    private final int minBatchSize;

    HLogSplitter(Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, ZooKeeperProtos.SplitLogTask.RecoveryMode mode) {
        this.conf = HBaseConfiguration.create((Configuration)conf);
        String codecClassName = conf.get("hbase.regionserver.wal.codec", WALCellCodec.class.getName());
        this.conf.set("hbase.client.rpc.codec", codecClassName);
        this.rootDir = rootDir;
        this.fs = fs;
        this.sequenceIdChecker = idChecker;
        this.watcher = zkw;
        this.entryBuffers = new EntryBuffers(this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 0x8000000));
        this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
        this.distributedLogReplay = ZooKeeperProtos.SplitLogTask.RecoveryMode.LOG_REPLAY == mode;
        this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
        if (zkw != null && this.distributedLogReplay) {
            this.outputSink = new LogReplayOutputSink(this.numWriterThreads);
        } else {
            if (this.distributedLogReplay) {
                LOG.info((Object)"ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
            }
            this.distributedLogReplay = false;
            this.outputSink = new LogRecoveredEditsOutputSink(this.numWriterThreads);
        }
    }

    public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, ZooKeeperWatcher zkw, ZooKeeperProtos.SplitLogTask.RecoveryMode mode) throws IOException {
        HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, mode);
        return s.splitLogFile(logfile, reporter);
    }

    public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, FileSystem fs, Configuration conf) throws IOException {
        FileStatus[] logfiles = fs.listStatus(logDir);
        ArrayList<Path> splits = new ArrayList<Path>();
        if (logfiles != null && logfiles.length > 0) {
            FileStatus[] arr$ = logfiles;
            int len$ = arr$.length;
            for (int i$ = 0; i$ < len$; ++i$) {
                HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, ZooKeeperProtos.SplitLogTask.RecoveryMode.LOG_SPLITTING);
                FileStatus logfile = arr$[i$];
                if (!s.splitLogFile(logfile, null)) continue;
                HLogSplitter.finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
                if (s.outputSink.splits == null) continue;
                splits.addAll(s.outputSink.splits);
            }
        }
        if (!fs.delete(logDir, true)) {
            throw new IOException("Unable to delete src dir: " + logDir);
        }
        return splits;
    }

    /*
     * Exception decompiling
     */
    boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [7[TRYBLOCK]], but top level block is 44[WHILELOOP]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
        Path rootdir = FSUtils.getRootDir(conf);
        Path oldLogDir = new Path(rootdir, "oldWALs");
        Path logPath = FSUtils.isStartingWithPath(rootdir, logfile) ? new Path(logfile) : new Path(rootdir, logfile);
        HLogSplitter.finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
    }

    static void finishSplitLogFile(Path rootdir, Path oldLogDir, Path logPath, Configuration conf) throws IOException {
        ArrayList<Path> processedLogs = new ArrayList<Path>();
        ArrayList<Path> corruptedLogs = new ArrayList<Path>();
        FileSystem fs = rootdir.getFileSystem(conf);
        if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
            corruptedLogs.add(logPath);
        } else {
            processedLogs.add(logPath);
        }
        HLogSplitter.archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
        Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
        fs.delete(stagingDir, true);
    }

    private static void archiveLogs(List<Path> corruptedLogs, List<Path> processedLogs, Path oldLogDir, FileSystem fs, Configuration conf) throws IOException {
        Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", "corrupt"));
        if (!fs.mkdirs(corruptDir)) {
            LOG.info((Object)("Unable to mkdir " + corruptDir));
        }
        fs.mkdirs(oldLogDir);
        for (Path corrupted : corruptedLogs) {
            Path p = new Path(corruptDir, corrupted.getName());
            if (!fs.exists(corrupted)) continue;
            if (!fs.rename(corrupted, p)) {
                LOG.warn((Object)("Unable to move corrupted log " + corrupted + " to " + p));
                continue;
            }
            LOG.warn((Object)("Moved corrupted log " + corrupted + " to " + p));
        }
        for (Path p : processedLogs) {
            Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
            if (!fs.exists(p)) continue;
            if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
                LOG.warn((Object)("Unable to move  " + p + " to " + newPath));
                continue;
            }
            LOG.debug((Object)("Archived processed log " + p + " to " + newPath));
        }
    }

    static Path getRegionSplitEditsPath(FileSystem fs, HLog.Entry logEntry, Path rootDir, boolean isCreate) throws IOException {
        Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
        String encodedRegionName = Bytes.toString((byte[])logEntry.getKey().getEncodedRegionName());
        Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
        Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
        if (!fs.exists(regiondir)) {
            LOG.info((Object)("This region's directory doesn't exist: " + regiondir.toString() + ". It is very likely that it was" + " already split so it's safe to discard those edits."));
            return null;
        }
        if (fs.exists(dir) && fs.isFile(dir)) {
            Path tmp = new Path("/tmp");
            if (!fs.exists(tmp)) {
                fs.mkdirs(tmp);
            }
            tmp = new Path(tmp, "recovered.edits_" + encodedRegionName);
            LOG.warn((Object)("Found existing old file: " + dir + ". It could be some " + "leftover of an old installation. It should be a folder instead. " + "So moving it to " + tmp));
            if (!fs.rename(dir, tmp)) {
                LOG.warn((Object)("Failed to sideline old file " + dir));
            }
        }
        if (isCreate && !fs.exists(dir) && !fs.mkdirs(dir)) {
            LOG.warn((Object)("mkdir failed on " + dir));
        }
        String fileName = HLogSplitter.formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
        fileName = HLogSplitter.getTmpRecoveredEditsFileName(fileName);
        return new Path(dir, fileName);
    }

    static String getTmpRecoveredEditsFileName(String fileName) {
        return fileName + ".temp";
    }

    static Path getCompletedRecoveredEditsFilePath(Path srcPath, Long maximumEditLogSeqNum) {
        String fileName = HLogSplitter.formatRecoveredEditsFileName(maximumEditLogSeqNum);
        return new Path(srcPath.getParent(), fileName);
    }

    static String formatRecoveredEditsFileName(long seqid) {
        return String.format("%019d", seqid);
    }

    protected HLog.Reader getReader(FileSystem fs, FileStatus file, Configuration conf, boolean skipErrors, CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
        HLog.Reader in;
        Path path = file.getPath();
        long length = file.getLen();
        if (length <= 0L) {
            LOG.warn((Object)("File " + path + " might be still open, length is 0"));
        }
        try {
            FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
            try {
                in = this.getReader(fs, path, conf, reporter);
            }
            catch (EOFException e) {
                if (length <= 0L) {
                    LOG.warn((Object)("Could not open " + path + " for reading. File is empty"), (Throwable)e);
                    return null;
                }
                return null;
            }
        }
        catch (IOException e) {
            if (e instanceof FileNotFoundException) {
                LOG.warn((Object)("File " + path + " doesn't exist anymore."), (Throwable)e);
                return null;
            }
            if (!skipErrors || e instanceof InterruptedIOException) {
                throw e;
            }
            CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Could not open hlog " + path + " ignoring");
            t.initCause(e);
            throw t;
        }
        return in;
    }

    private static HLog.Entry getNextLogLine(HLog.Reader in, Path path, boolean skipErrors) throws CorruptedLogFileException, IOException {
        try {
            return in.next();
        }
        catch (EOFException eof) {
            LOG.info((Object)("EOF from hlog " + path + ".  continuing"));
            return null;
        }
        catch (IOException e) {
            if (e.getCause() != null && (e.getCause() instanceof ParseException || e.getCause() instanceof ChecksumException)) {
                LOG.warn((Object)("Parse exception " + e.getCause().toString() + " from hlog " + path + ".  continuing"));
                return null;
            }
            if (!skipErrors) {
                throw e;
            }
            CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Ignoring exception while parsing hlog " + path + ". Marking as corrupted");
            t.initCause(e);
            throw t;
        }
    }

    private void writerThreadError(Throwable t) {
        this.thrown.compareAndSet(null, t);
    }

    private void checkForErrors() throws IOException {
        Throwable thrown = this.thrown.get();
        if (thrown == null) {
            return;
        }
        if (thrown instanceof IOException) {
            throw new IOException(thrown);
        }
        throw new RuntimeException(thrown);
    }

    protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException {
        return HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
    }

    protected HLog.Reader getReader(FileSystem fs, Path curLogFile, Configuration conf, CancelableProgressable reporter) throws IOException {
        return HLogFactory.createReader(fs, curLogFile, conf, reporter);
    }

    private int getNumOpenWriters() {
        int result = 0;
        if (this.outputSink != null) {
            result += this.outputSink.getNumOpenWriters();
        }
        return result;
    }

    private static Cell tagReplayLogSequenceNumber(AdminProtos.WALEntry entry, Cell cell) {
        Tag tmpTag;
        boolean needAddRecoveryTag = true;
        if (cell.getTagsLengthUnsigned() > 0 && (tmpTag = Tag.getTag((byte[])cell.getTagsArray(), (int)cell.getTagsOffset(), (int)cell.getTagsLengthUnsigned(), (byte)3)) != null) {
            needAddRecoveryTag = false;
        }
        if (needAddRecoveryTag) {
            ArrayList<Tag> newTags = new ArrayList<Tag>();
            Tag replayTag = new Tag(3, Bytes.toBytes((long)entry.getKey().getLogSequenceNumber()));
            newTags.add(replayTag);
            return KeyValue.cloneAndAddTags((Cell)cell, newTags);
        }
        return cell;
    }

    public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry, CellScanner cells, Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException {
        if (entry == null) {
            return new ArrayList<MutationReplay>();
        }
        int count = entry.getAssociatedCellCount();
        ArrayList<MutationReplay> mutations = new ArrayList<MutationReplay>();
        Cell previousCell = null;
        Delete m = null;
        HLogKey key = null;
        WALEdit val = null;
        if (logEntry != null) {
            val = new WALEdit();
        }
        for (int i = 0; i < count; ++i) {
            boolean isNewRowOrType;
            if (!cells.advance()) {
                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
            }
            Cell cell = cells.current();
            if (val != null) {
                val.add(KeyValueUtil.ensureKeyValue((Cell)cell));
            }
            boolean bl = isNewRowOrType = previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || !CellUtil.matchingRow(previousCell, (Cell)cell);
            if (isNewRowOrType) {
                if (CellUtil.isDelete((Cell)cell)) {
                    m = new Delete(cell.getRowArray(), cell.getRowOffset(), (int)cell.getRowLength());
                    mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, (Mutation)m, 0L, 0L));
                } else {
                    m = new Put(cell.getRowArray(), cell.getRowOffset(), (int)cell.getRowLength());
                    long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : 0L;
                    long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : 0L;
                    mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, (Mutation)m, nonceGroup, nonce));
                }
            }
            if (CellUtil.isDelete((Cell)cell)) {
                ((Delete)m).addDeleteMarker((Cell)KeyValueUtil.ensureKeyValue((Cell)cell));
            } else {
                Cell tmpNewCell = cell;
                if (addLogReplayTag) {
                    tmpNewCell = HLogSplitter.tagReplayLogSequenceNumber(entry, cell);
                }
                ((Put)m).add((Cell)KeyValueUtil.ensureKeyValue((Cell)tmpNewCell));
            }
            previousCell = cell;
        }
        if (logEntry != null) {
            WALProtos.WALKey walKey = entry.getKey();
            ArrayList<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
            for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
                clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
            }
            key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf((byte[])walKey.getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce());
            logEntry.setFirst((Object)key);
            logEntry.setSecond((Object)val);
        }
        return mutations;
    }

    public static class MutationReplay {
        public final ClientProtos.MutationProto.MutationType type;
        public final Mutation mutation;
        public final long nonceGroup;
        public final long nonce;

        public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation, long nonceGroup, long nonce) {
            this.type = type;
            this.mutation = mutation;
            this.nonceGroup = nonceGroup;
            this.nonce = nonce;
        }
    }

    static class CorruptedLogFileException
    extends Exception {
        private static final long serialVersionUID = 1L;

        CorruptedLogFileException(String s) {
            super(s);
        }
    }

    private static final class RegionServerWriter
    extends SinkWriter {
        final WALEditsReplaySink sink;

        RegionServerWriter(Configuration conf, TableName tableName, HConnection conn) throws IOException {
            this.sink = new WALEditsReplaySink(conf, tableName, conn);
        }

        void close() throws IOException {
        }
    }

    class LogReplayOutputSink
    extends OutputSink {
        private static final double BUFFER_THRESHOLD = 0.35;
        private static final String KEY_DELIMITER = "#";
        private long waitRegionOnlineTimeOut;
        private final Set<String> recoveredRegions;
        private final Map<String, RegionServerWriter> writers;
        private final Map<String, HRegionLocation> onlineRegions;
        private Map<TableName, HConnection> tableNameToHConnectionMap;
        private Map<String, List<Pair<HRegionLocation, HLog.Entry>>> serverToBufferQueueMap;
        private List<Throwable> thrown;
        private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
        private boolean hasEditsInDisablingOrDisabledTables;

        public LogReplayOutputSink(int numWriters) {
            super(numWriters);
            this.recoveredRegions = Collections.synchronizedSet(new HashSet());
            this.writers = new ConcurrentHashMap<String, RegionServerWriter>();
            this.onlineRegions = new ConcurrentHashMap<String, HRegionLocation>();
            this.tableNameToHConnectionMap = Collections.synchronizedMap(new TreeMap());
            this.serverToBufferQueueMap = new ConcurrentHashMap<String, List<Pair<HRegionLocation, HLog.Entry>>>();
            this.thrown = new ArrayList<Throwable>();
            this.hasEditsInDisablingOrDisabledTables = false;
            this.waitRegionOnlineTimeOut = HLogSplitter.this.conf.getInt("hbase.splitlog.manager.timeout", 120000);
            this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
            this.logRecoveredEditsOutputSink.setReporter(this.reporter);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        void append(RegionEntryBuffer buffer) throws IOException {
            List<HLog.Entry> entries = buffer.entryBuffer;
            if (entries.isEmpty()) {
                LOG.warn((Object)"got an empty buffer, skipping");
                return;
            }
            if (HLogSplitter.this.disablingOrDisabledTables.contains(buffer.tableName)) {
                this.logRecoveredEditsOutputSink.append(buffer);
                this.hasEditsInDisablingOrDisabledTables = true;
                this.addToRecoveredRegions(Bytes.toString((byte[])buffer.encodedRegionName));
                return;
            }
            this.groupEditsByServer(entries);
            String maxLocKey = null;
            int maxSize = 0;
            List<Pair<HRegionLocation, HLog.Entry>> maxQueue = null;
            Map<String, List<Pair<HRegionLocation, HLog.Entry>>> map = this.serverToBufferQueueMap;
            synchronized (map) {
                for (String key : this.serverToBufferQueueMap.keySet()) {
                    List<Pair<HRegionLocation, HLog.Entry>> curQueue = this.serverToBufferQueueMap.get(key);
                    if (curQueue.size() <= maxSize) continue;
                    maxSize = curQueue.size();
                    maxQueue = curQueue;
                    maxLocKey = key;
                }
                if (maxSize < HLogSplitter.this.minBatchSize && (double)HLogSplitter.this.entryBuffers.totalBuffered < 0.35 * (double)HLogSplitter.this.entryBuffers.maxHeapUsage) {
                    return;
                }
                if (maxSize > 0) {
                    this.serverToBufferQueueMap.remove(maxLocKey);
                }
            }
            if (maxSize > 0) {
                this.processWorkItems(maxLocKey, maxQueue);
            }
        }

        private void addToRecoveredRegions(String encodedRegionName) {
            if (!this.recoveredRegions.contains(encodedRegionName)) {
                this.recoveredRegions.add(encodedRegionName);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void groupEditsByServer(List<HLog.Entry> entries) throws IOException {
            TreeSet<TableName> nonExistentTables = null;
            Long cachedLastFlushedSequenceId = -1L;
            for (HLog.Entry entry : entries) {
                WALEdit edit = entry.getEdit();
                TableName table = entry.getKey().getTablename();
                entry.getKey().setScopes(null);
                String encodeRegionNameStr = Bytes.toString((byte[])entry.getKey().getEncodedRegionName());
                if (nonExistentTables != null && nonExistentTables.contains(table)) {
                    this.skippedEdits.incrementAndGet();
                    continue;
                }
                Map<byte[], Long> maxStoreSequenceIds = null;
                boolean needSkip = false;
                HRegionLocation loc = null;
                String locKey = null;
                ArrayList<KeyValue> kvs = edit.getKeyValues();
                ArrayList<KeyValue> skippedKVs = new ArrayList<KeyValue>();
                HConnection hconn = this.getConnectionByTableName(table);
                for (KeyValue kv : kvs) {
                    Long maxStoreSeqId;
                    byte[] family;
                    block18: {
                        boolean isCompactionEntry;
                        byte[] row;
                        block17: {
                            row = kv.getRow();
                            family = kv.getFamily();
                            isCompactionEntry = false;
                            if (kv.matchingFamily(WALEdit.METAFAMILY)) {
                                WALProtos.CompactionDescriptor compaction = WALEdit.getCompaction((Cell)kv);
                                if (compaction != null && compaction.hasRegionName()) {
                                    try {
                                        byte[][] regionName = HRegionInfo.parseRegionName((byte[])compaction.getRegionName().toByteArray());
                                        row = regionName[1];
                                        family = compaction.getFamilyName().toByteArray();
                                        isCompactionEntry = true;
                                        break block17;
                                    }
                                    catch (Exception ex) {
                                        LOG.warn((Object)("Unexpected exception received, ignoring " + ex));
                                        skippedKVs.add(kv);
                                        continue;
                                    }
                                }
                                skippedKVs.add(kv);
                                continue;
                            }
                        }
                        try {
                            loc = this.locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row, encodeRegionNameStr);
                            if (!isCompactionEntry || encodeRegionNameStr.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) break block18;
                            LOG.info((Object)("Not replaying a compaction marker for an older region: " + encodeRegionNameStr));
                            needSkip = true;
                        }
                        catch (TableNotFoundException ex) {
                            LOG.info((Object)("Table " + table + " doesn't exist. Skip log replay for region " + encodeRegionNameStr));
                            HLogSplitter.this.lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
                            if (nonExistentTables == null) {
                                nonExistentTables = new TreeSet<TableName>();
                            }
                            nonExistentTables.add(table);
                            this.skippedEdits.incrementAndGet();
                            needSkip = true;
                            break;
                        }
                    }
                    cachedLastFlushedSequenceId = HLogSplitter.this.lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
                    if (cachedLastFlushedSequenceId != null && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
                        this.skippedEdits.incrementAndGet();
                        needSkip = true;
                        break;
                    }
                    if (maxStoreSequenceIds == null) {
                        maxStoreSequenceIds = HLogSplitter.this.regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
                    }
                    if (maxStoreSequenceIds == null || (maxStoreSeqId = (Long)maxStoreSequenceIds.get(family)) != null && maxStoreSeqId < entry.getKey().getLogSeqNum()) continue;
                    skippedKVs.add(kv);
                }
                if (loc == null || needSkip) continue;
                if (!skippedKVs.isEmpty()) {
                    kvs.removeAll(skippedKVs);
                }
                Map<String, List<Pair<HRegionLocation, HLog.Entry>>> map = this.serverToBufferQueueMap;
                synchronized (map) {
                    locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
                    List<Object> queue = this.serverToBufferQueueMap.get(locKey);
                    if (queue == null) {
                        queue = Collections.synchronizedList(new ArrayList());
                        this.serverToBufferQueueMap.put(locKey, queue);
                    }
                    queue.add((Pair<HRegionLocation, HLog.Entry>)new Pair((Object)loc, (Object)entry));
                }
                this.addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
            }
        }

        private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
            HRegionLocation loc = this.onlineRegions.get(originalEncodedRegionName);
            if (loc != null) {
                return loc;
            }
            loc = hconn.getRegionLocation(table, row, true);
            if (loc == null) {
                throw new IOException("Can't locate location for row:" + Bytes.toString((byte[])row) + " of table:" + table);
            }
            if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
                HLogSplitter.this.lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
                HRegionLocation tmpLoc = this.onlineRegions.get(loc.getRegionInfo().getEncodedName());
                if (tmpLoc != null) {
                    return tmpLoc;
                }
            }
            Long lastFlushedSequenceId = -1L;
            AtomicBoolean isRecovering = new AtomicBoolean(true);
            loc = this.waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
            if (!isRecovering.get()) {
                HLogSplitter.this.lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
                LOG.info((Object)("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + " because it's not in recovering."));
            } else {
                Long cachedLastFlushedSequenceId = HLogSplitter.this.lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
                ZooKeeperProtos.RegionStoreSequenceIds ids = SplitLogManager.getRegionFlushedSequenceId(HLogSplitter.this.watcher, HLogSplitter.this.failedServerName, loc.getRegionInfo().getEncodedName());
                if (ids != null) {
                    lastFlushedSequenceId = ids.getLastFlushedSequenceId();
                    TreeMap<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
                    List maxSeqIdInStores = ids.getStoreSequenceIdList();
                    for (ZooKeeperProtos.StoreSequenceId id : maxSeqIdInStores) {
                        storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
                    }
                    HLogSplitter.this.regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
                }
                if (cachedLastFlushedSequenceId == null || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
                    HLogSplitter.this.lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
                }
            }
            this.onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
            return loc;
        }

        private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions) throws IOException {
            RegionServerWriter rsw = null;
            long startTime = System.nanoTime();
            try {
                rsw = this.getRegionServerWriter(key);
                rsw.sink.replayEntries(actions);
                rsw.incrementEdits(actions.size());
                rsw.incrementNanoTime(System.nanoTime() - startTime);
            }
            catch (IOException e) {
                e = RemoteExceptionHandler.checkIOException((IOException)e);
                LOG.fatal((Object)" Got while writing log entry to log", (Throwable)e);
                throw e;
            }
        }

        private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row, long timeout, AtomicBoolean isRecovering) throws IOException {
            long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
            long pause = HLogSplitter.this.conf.getLong("hbase.client.pause", 100L);
            boolean reloadLocation = false;
            TableName tableName = loc.getRegionInfo().getTable();
            int tries = 0;
            Throwable cause = null;
            while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
                block9: {
                    try {
                        HConnection hconn = this.getConnectionByTableName(tableName);
                        if (reloadLocation) {
                            loc = hconn.getRegionLocation(tableName, row, true);
                        }
                        AdminProtos.AdminService.BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
                        HRegionInfo region = loc.getRegionInfo();
                        try {
                            AdminProtos.GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest((byte[])region.getRegionName());
                            AdminProtos.GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
                            if (HRegionInfo.convert((HBaseProtos.RegionInfo)response.getRegionInfo()) != null) {
                                isRecovering.set(response.hasIsRecovering() ? response.getIsRecovering() : true);
                                return loc;
                            }
                        }
                        catch (ServiceException se) {
                            throw ProtobufUtil.getRemoteException((ServiceException)se);
                        }
                    }
                    catch (IOException e) {
                        cause = e.getCause();
                        if (cause instanceof RegionOpeningException) break block9;
                        reloadLocation = true;
                    }
                }
                long expectedSleep = ConnectionUtils.getPauseTime((long)pause, (int)tries);
                try {
                    Thread.sleep(expectedSleep);
                }
                catch (InterruptedException e) {
                    throw new IOException("Interrupted when waiting region " + loc.getRegionInfo().getEncodedName() + " online.", e);
                }
                ++tries;
            }
            throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() + " online for " + timeout + " milliseconds.", cause);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected boolean flush() throws IOException {
            String curLoc = null;
            int curSize = 0;
            List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
            Map<String, List<Pair<HRegionLocation, HLog.Entry>>> map = this.serverToBufferQueueMap;
            synchronized (map) {
                for (String locationKey : this.serverToBufferQueueMap.keySet()) {
                    curQueue = this.serverToBufferQueueMap.get(locationKey);
                    if (curQueue.isEmpty()) continue;
                    curSize = curQueue.size();
                    curLoc = locationKey;
                    break;
                }
                if (curSize > 0) {
                    this.serverToBufferQueueMap.remove(curLoc);
                }
            }
            if (curSize > 0) {
                this.processWorkItems(curLoc, curQueue);
                HLogSplitter.this.dataAvailable.notifyAll();
                return true;
            }
            return false;
        }

        void addWriterError(Throwable t) {
            this.thrown.add(t);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        List<Path> finishWritingAndClose() throws IOException {
            try {
                if (!this.finishWriting()) {
                    List<Path> list = null;
                    return list;
                }
                this.splits = this.hasEditsInDisablingOrDisabledTables ? this.logRecoveredEditsOutputSink.finishWritingAndClose() : new ArrayList();
                List list = this.splits;
                return list;
            }
            finally {
                List<IOException> thrown = this.closeRegionServerWriters();
                if (thrown != null && !thrown.isEmpty()) {
                    throw MultipleIOException.createIOException(thrown);
                }
            }
        }

        @Override
        int getNumOpenWriters() {
            return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private List<IOException> closeRegionServerWriters() throws IOException {
            ArrayList result = null;
            if (this.writersClosed) return result;
            result = Lists.newArrayList();
            try {
                for (WriterThread t : this.writerThreads) {
                    while (t.isAlive()) {
                        t.shouldStop = true;
                        t.interrupt();
                        try {
                            t.join(10L);
                        }
                        catch (InterruptedException e) {
                            InterruptedIOException iie = new InterruptedIOException();
                            iie.initCause(e);
                            throw iie;
                        }
                    }
                    continue;
                    return result;
                }
            }
            finally {
                Map<String, RegionServerWriter> map = this.writers;
                synchronized (map) {
                    for (String locationKey : this.writers.keySet()) {
                        RegionServerWriter tmpW = this.writers.get(locationKey);
                        try {
                            tmpW.close();
                        }
                        catch (IOException ioe) {
                            LOG.error((Object)("Couldn't close writer for region server:" + locationKey), (Throwable)ioe);
                            result.add(ioe);
                        }
                    }
                }
                map = this.tableNameToHConnectionMap;
                synchronized (map) {
                    for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
                        HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
                        try {
                            hconn.clearRegionCache();
                            hconn.close();
                        }
                        catch (IOException ioe) {
                            result.add(ioe);
                        }
                    }
                }
                this.writersClosed = true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        Map<byte[], Long> getOutputCounts() {
            TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
            Map<String, RegionServerWriter> map = this.writers;
            synchronized (map) {
                for (Map.Entry<String, RegionServerWriter> entry : this.writers.entrySet()) {
                    ret.put(Bytes.toBytes((String)entry.getKey()), entry.getValue().editsWritten);
                }
            }
            return ret;
        }

        @Override
        int getNumberOfRecoveredRegions() {
            return this.recoveredRegions.size();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
            RegionServerWriter ret = this.writers.get(loc);
            if (ret != null) {
                return ret;
            }
            TableName tableName = this.getTableFromLocationStr(loc);
            if (tableName == null) {
                throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
            }
            HConnection hconn = this.getConnectionByTableName(tableName);
            Map<String, RegionServerWriter> map = this.writers;
            synchronized (map) {
                ret = this.writers.get(loc);
                if (ret == null) {
                    ret = new RegionServerWriter(HLogSplitter.this.conf, tableName, hconn);
                    this.writers.put(loc, ret);
                }
            }
            return ret;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private HConnection getConnectionByTableName(TableName tableName) throws IOException {
            HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
            if (hconn == null) {
                Map<TableName, HConnection> map = this.tableNameToHConnectionMap;
                synchronized (map) {
                    hconn = this.tableNameToHConnectionMap.get(tableName);
                    if (hconn == null) {
                        hconn = HConnectionManager.getConnection((Configuration)HLogSplitter.this.conf);
                        this.tableNameToHConnectionMap.put(tableName, hconn);
                    }
                }
            }
            return hconn;
        }

        private TableName getTableFromLocationStr(String loc) {
            String[] splits = loc.split(KEY_DELIMITER);
            if (splits.length != 2) {
                return null;
            }
            return TableName.valueOf((String)splits[1]);
        }
    }

    private static final class WriterAndPath
    extends SinkWriter {
        final Path p;
        final HLog.Writer w;

        WriterAndPath(Path p, HLog.Writer w) {
            this.p = p;
            this.w = w;
        }
    }

    private static abstract class SinkWriter {
        long editsWritten = 0L;
        long nanosSpent = 0L;

        private SinkWriter() {
        }

        void incrementEdits(int edits) {
            this.editsWritten += (long)edits;
        }

        void incrementNanoTime(long nanos) {
            this.nanosSpent += nanos;
        }
    }

    class LogRecoveredEditsOutputSink
    extends OutputSink {
        public LogRecoveredEditsOutputSink(int numWriters) {
            super(numWriters);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        List<Path> finishWritingAndClose() throws IOException {
            boolean isSuccessful = false;
            List<Path> result = null;
            try {
                isSuccessful = this.finishWriting();
            }
            finally {
                result = this.close();
                List<IOException> thrown = this.closeLogWriters(null);
                if (thrown != null && !thrown.isEmpty()) {
                    throw MultipleIOException.createIOException(thrown);
                }
            }
            if (isSuccessful) {
                this.splits = result;
            }
            return this.splits;
        }

        private List<Path> close() throws IOException {
            Preconditions.checkState((!this.closeAndCleanCompleted ? 1 : 0) != 0);
            final ArrayList<Path> paths = new ArrayList<Path>();
            final ArrayList thrown = Lists.newArrayList();
            ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool((int)this.numThreads, (long)30L, (TimeUnit)TimeUnit.SECONDS, (ThreadFactory)new ThreadFactory(){
                private int count = 1;

                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "split-log-closeStream-" + this.count++);
                    return t;
                }
            });
            ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(closeThreadPool);
            for (final Map.Entry writersEntry : this.writers.entrySet()) {
                LOG.debug((Object)("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p));
                completionService.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        WriterAndPath wap = (WriterAndPath)writersEntry.getValue();
                        LOG.debug((Object)("Closing " + wap.p));
                        try {
                            wap.w.close();
                        }
                        catch (IOException ioe) {
                            LOG.error((Object)("Couldn't close log at " + wap.p), (Throwable)ioe);
                            thrown.add(ioe);
                            return null;
                        }
                        LOG.info((Object)("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in " + wap.nanosSpent / 1000L / 1000L + "ms)"));
                        if (wap.editsWritten == 0L) {
                            if (HLogSplitter.this.fs.exists(wap.p) && !HLogSplitter.this.fs.delete(wap.p, false)) {
                                LOG.warn((Object)("Failed deleting empty " + wap.p));
                                throw new IOException("Failed deleting empty  " + wap.p);
                            }
                            return null;
                        }
                        Path dst = HLogSplitter.getCompletedRecoveredEditsFilePath(wap.p, (Long)LogRecoveredEditsOutputSink.this.regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
                        try {
                            if (!dst.equals((Object)wap.p) && HLogSplitter.this.fs.exists(dst)) {
                                LOG.warn((Object)("Found existing old edits file. It could be the result of a previous failed split attempt. Deleting " + dst + ", length=" + HLogSplitter.this.fs.getFileStatus(dst).getLen()));
                                if (!HLogSplitter.this.fs.delete(dst, false)) {
                                    LOG.warn((Object)("Failed deleting of old " + dst));
                                    throw new IOException("Failed deleting of old " + dst);
                                }
                            }
                            if (HLogSplitter.this.fs.exists(wap.p)) {
                                if (!HLogSplitter.this.fs.rename(wap.p, dst)) {
                                    throw new IOException("Failed renaming " + wap.p + " to " + dst);
                                }
                                LOG.debug((Object)("Rename " + wap.p + " to " + dst));
                            }
                        }
                        catch (IOException ioe) {
                            LOG.error((Object)("Couldn't rename " + wap.p + " to " + dst), (Throwable)ioe);
                            thrown.add(ioe);
                            return null;
                        }
                        paths.add(dst);
                        return null;
                    }
                });
            }
            boolean progress_failed = false;
            try {
                int n = this.writers.size();
                for (int i = 0; i < n; ++i) {
                    Future future = completionService.take();
                    future.get();
                    if (progress_failed || this.reporter == null || this.reporter.progress()) continue;
                    progress_failed = true;
                }
            }
            catch (InterruptedException e) {
                InterruptedIOException iie = new InterruptedIOException();
                iie.initCause(e);
                throw iie;
            }
            catch (ExecutionException e) {
                throw new IOException(e.getCause());
            }
            finally {
                closeThreadPool.shutdownNow();
            }
            if (!thrown.isEmpty()) {
                throw MultipleIOException.createIOException((List)thrown);
            }
            this.writersClosed = true;
            this.closeAndCleanCompleted = true;
            if (progress_failed) {
                return null;
            }
            return paths;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
            Map map;
            if (this.writersClosed) {
                return thrown;
            }
            if (thrown == null) {
                thrown = Lists.newArrayList();
            }
            try {
                for (WriterThread t : this.writerThreads) {
                    while (t.isAlive()) {
                        t.shouldStop = true;
                        t.interrupt();
                        try {
                            t.join(10L);
                        }
                        catch (InterruptedException e) {
                            InterruptedIOException iie = new InterruptedIOException();
                            iie.initCause(e);
                            throw iie;
                        }
                    }
                }
                map = this.writers;
            }
            catch (Throwable throwable) {
                Map map2 = this.writers;
                synchronized (map2) {
                    WriterAndPath wap = null;
                    for (SinkWriter tmpWAP : this.writers.values()) {
                        try {
                            wap = (WriterAndPath)tmpWAP;
                            wap.w.close();
                        }
                        catch (IOException ioe) {
                            LOG.error((Object)("Couldn't close log at " + wap.p), (Throwable)ioe);
                            thrown.add(ioe);
                            continue;
                        }
                        LOG.info((Object)("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + wap.nanosSpent / 1000L / 1000L + "ms)"));
                    }
                }
                this.writersClosed = true;
                throw throwable;
            }
            synchronized (map) {
                WriterAndPath wap = null;
                for (SinkWriter tmpWAP : this.writers.values()) {
                    try {
                        wap = (WriterAndPath)tmpWAP;
                        wap.w.close();
                    }
                    catch (IOException ioe) {
                        LOG.error((Object)("Couldn't close log at " + wap.p), (Throwable)ioe);
                        thrown.add(ioe);
                        continue;
                    }
                    LOG.info((Object)("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + wap.nanosSpent / 1000L / 1000L + "ms)"));
                }
            }
            this.writersClosed = true;
            return thrown;
        }

        private WriterAndPath getWriterAndPath(HLog.Entry entry) throws IOException {
            byte[] region = entry.getKey().getEncodedRegionName();
            WriterAndPath ret = (WriterAndPath)this.writers.get(region);
            if (ret != null) {
                return ret;
            }
            if (this.blacklistedRegions.contains(region)) {
                return null;
            }
            ret = this.createWAP(region, entry, HLogSplitter.this.rootDir, HLogSplitter.this.fs, HLogSplitter.this.conf);
            if (ret == null) {
                this.blacklistedRegions.add(region);
                return null;
            }
            this.writers.put(region, ret);
            return ret;
        }

        private WriterAndPath createWAP(byte[] region, HLog.Entry entry, Path rootdir, FileSystem fs, Configuration conf) throws IOException {
            Path regionedits = HLogSplitter.getRegionSplitEditsPath(fs, entry, rootdir, true);
            if (regionedits == null) {
                return null;
            }
            if (fs.exists(regionedits)) {
                LOG.warn((Object)("Found old edits file. It could be the result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()));
                if (!fs.delete(regionedits, false)) {
                    LOG.warn((Object)("Failed delete of old " + regionedits));
                }
            }
            HLog.Writer w = HLogSplitter.this.createWriter(fs, regionedits, conf);
            LOG.debug((Object)("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary((byte[])region)));
            return new WriterAndPath(regionedits, w);
        }

        @Override
        void append(RegionEntryBuffer buffer) throws IOException {
            List<HLog.Entry> entries = buffer.entryBuffer;
            if (entries.isEmpty()) {
                LOG.warn((Object)"got an empty buffer, skipping");
                return;
            }
            SinkWriter wap = null;
            long startTime = System.nanoTime();
            try {
                int editsCount = 0;
                for (HLog.Entry logEntry : entries) {
                    if (wap == null && (wap = this.getWriterAndPath(logEntry)) == null) {
                        return;
                    }
                    ((WriterAndPath)wap).w.append(logEntry);
                    this.updateRegionMaximumEditLogSeqNum(logEntry);
                    ++editsCount;
                }
                wap.incrementEdits(editsCount);
                wap.incrementNanoTime(System.nanoTime() - startTime);
            }
            catch (IOException e) {
                e = RemoteExceptionHandler.checkIOException((IOException)e);
                LOG.fatal((Object)" Got while writing log entry to log", (Throwable)e);
                throw e;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        Map<byte[], Long> getOutputCounts() {
            TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
            Map map = this.writers;
            synchronized (map) {
                for (Map.Entry entry : this.writers.entrySet()) {
                    ret.put((byte[])entry.getKey(), ((SinkWriter)entry.getValue()).editsWritten);
                }
            }
            return ret;
        }

        @Override
        int getNumberOfRecoveredRegions() {
            return this.writers.size();
        }
    }

    abstract class OutputSink {
        protected Map<byte[], SinkWriter> writers = Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
        protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
        protected final List<WriterThread> writerThreads = Lists.newArrayList();
        protected final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(new TreeSet(Bytes.BYTES_COMPARATOR));
        protected boolean closeAndCleanCompleted = false;
        protected boolean writersClosed = false;
        protected final int numThreads;
        protected CancelableProgressable reporter = null;
        protected AtomicLong skippedEdits = new AtomicLong();
        protected List<Path> splits = null;

        public OutputSink(int numWriters) {
            this.numThreads = numWriters;
        }

        void setReporter(CancelableProgressable reporter) {
            this.reporter = reporter;
        }

        synchronized void startWriterThreads() {
            for (int i = 0; i < this.numThreads; ++i) {
                WriterThread t = new WriterThread(this, i);
                t.start();
                this.writerThreads.add(t);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void updateRegionMaximumEditLogSeqNum(HLog.Entry entry) {
            Map<byte[], Long> map = this.regionMaximumEditLogSeqNum;
            synchronized (map) {
                Long currentMaxSeqNum = this.regionMaximumEditLogSeqNum.get(entry.getKey().getEncodedRegionName());
                if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
                    this.regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey().getLogSeqNum());
                }
            }
        }

        Long getRegionMaximumEditLogSeqNum(byte[] region) {
            return this.regionMaximumEditLogSeqNum.get(region);
        }

        int getNumOpenWriters() {
            return this.writers.size();
        }

        long getSkippedEdits() {
            return this.skippedEdits.get();
        }

        protected boolean finishWriting() throws IOException {
            LOG.info((Object)"Waiting for split writer threads to finish");
            boolean progress_failed = false;
            for (WriterThread t : this.writerThreads) {
                t.finish();
            }
            for (WriterThread t : this.writerThreads) {
                if (!progress_failed && this.reporter != null && !this.reporter.progress()) {
                    progress_failed = true;
                }
                try {
                    t.join();
                }
                catch (InterruptedException ie) {
                    InterruptedIOException iie = new InterruptedIOException();
                    iie.initCause(ie);
                    throw iie;
                }
            }
            HLogSplitter.this.checkForErrors();
            LOG.info((Object)"Split writers finished");
            return !progress_failed;
        }

        abstract List<Path> finishWritingAndClose() throws IOException;

        abstract Map<byte[], Long> getOutputCounts();

        abstract int getNumberOfRecoveredRegions();

        abstract void append(RegionEntryBuffer var1) throws IOException;

        protected boolean flush() throws IOException {
            return false;
        }
    }

    class WriterThread
    extends Thread {
        private volatile boolean shouldStop;
        private OutputSink outputSink;

        WriterThread(OutputSink sink, int i) {
            super(Thread.currentThread().getName() + "-Writer-" + i);
            this.shouldStop = false;
            this.outputSink = null;
            this.outputSink = sink;
        }

        @Override
        public void run() {
            try {
                this.doRun();
            }
            catch (Throwable t) {
                LOG.error((Object)"Exiting thread", t);
                HLogSplitter.this.writerThreadError(t);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doRun() throws IOException {
            LOG.debug((Object)("Writer thread " + this + ": starting"));
            while (true) {
                RegionEntryBuffer buffer;
                if ((buffer = HLogSplitter.this.entryBuffers.getChunkToWrite()) == null) {
                    Object object = HLogSplitter.this.dataAvailable;
                    synchronized (object) {
                        block12: {
                            if (this.shouldStop && !this.outputSink.flush()) {
                                return;
                            }
                            try {
                                HLogSplitter.this.dataAvailable.wait(500L);
                            }
                            catch (InterruptedException ie) {
                                if (this.shouldStop) break block12;
                                throw new RuntimeException(ie);
                            }
                        }
                    }
                }
                assert (buffer != null);
                try {
                    this.writeBuffer(buffer);
                    continue;
                }
                finally {
                    HLogSplitter.this.entryBuffers.doneWriting(buffer);
                    continue;
                }
                break;
            }
        }

        private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
            this.outputSink.append(buffer);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void finish() {
            Object object = HLogSplitter.this.dataAvailable;
            synchronized (object) {
                this.shouldStop = true;
                HLogSplitter.this.dataAvailable.notifyAll();
            }
        }
    }

    static class RegionEntryBuffer
    implements HeapSize {
        long heapInBuffer = 0L;
        List<HLog.Entry> entryBuffer;
        TableName tableName;
        byte[] encodedRegionName;

        RegionEntryBuffer(TableName tableName, byte[] region) {
            this.tableName = tableName;
            this.encodedRegionName = region;
            this.entryBuffer = new LinkedList<HLog.Entry>();
        }

        long appendEntry(HLog.Entry entry) {
            this.internify(entry);
            this.entryBuffer.add(entry);
            long incrHeap = entry.getEdit().heapSize() + (long)ClassSize.align((int)(2 * ClassSize.REFERENCE)) + 0L;
            this.heapInBuffer += incrHeap;
            return incrHeap;
        }

        private void internify(HLog.Entry entry) {
            HLogKey k = entry.getKey();
            k.internTableName(this.tableName);
            k.internEncodedRegionName(this.encodedRegionName);
        }

        public long heapSize() {
            return this.heapInBuffer;
        }
    }

    class EntryBuffers {
        Map<byte[], RegionEntryBuffer> buffers = new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
        Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
        long totalBuffered = 0L;
        long maxHeapUsage;

        EntryBuffers(long maxHeapUsage) {
            this.maxHeapUsage = maxHeapUsage;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void appendEntry(HLog.Entry entry) throws InterruptedException, IOException {
            long incrHeap;
            HLogKey key = entry.getKey();
            Object object = this;
            synchronized (object) {
                RegionEntryBuffer buffer = this.buffers.get(key.getEncodedRegionName());
                if (buffer == null) {
                    buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
                    this.buffers.put(key.getEncodedRegionName(), buffer);
                }
                incrHeap = buffer.appendEntry(entry);
            }
            object = HLogSplitter.this.dataAvailable;
            synchronized (object) {
                this.totalBuffered += incrHeap;
                while (this.totalBuffered > this.maxHeapUsage && HLogSplitter.this.thrown.get() == null) {
                    LOG.debug((Object)("Used " + this.totalBuffered + " bytes of buffered edits, waiting for IO threads..."));
                    HLogSplitter.this.dataAvailable.wait(2000L);
                }
                HLogSplitter.this.dataAvailable.notifyAll();
            }
            HLogSplitter.this.checkForErrors();
        }

        synchronized RegionEntryBuffer getChunkToWrite() {
            long biggestSize = 0L;
            byte[] biggestBufferKey = null;
            for (Map.Entry<byte[], RegionEntryBuffer> entry : this.buffers.entrySet()) {
                long size = entry.getValue().heapSize();
                if (size <= biggestSize || this.currentlyWriting.contains(entry.getKey())) continue;
                biggestSize = size;
                biggestBufferKey = entry.getKey();
            }
            if (biggestBufferKey == null) {
                return null;
            }
            RegionEntryBuffer buffer = this.buffers.remove(biggestBufferKey);
            this.currentlyWriting.add(biggestBufferKey);
            return buffer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void doneWriting(RegionEntryBuffer buffer) {
            EntryBuffers entryBuffers = this;
            synchronized (entryBuffers) {
                boolean removed = this.currentlyWriting.remove(buffer.encodedRegionName);
                assert (removed);
            }
            long size = buffer.heapSize();
            Object object = HLogSplitter.this.dataAvailable;
            synchronized (object) {
                this.totalBuffered -= size;
                HLogSplitter.this.dataAvailable.notifyAll();
            }
        }

        synchronized boolean isRegionCurrentlyWriting(byte[] region) {
            return this.currentlyWriting.contains(region);
        }
    }
}

