/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.qjournal.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.UncaughtExceptionHandlers;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannelMetrics;
import org.apache.hadoop.hdfs.qjournal.client.LoggerTooFarBehindException;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;

@InterfaceAudience.Private
public class IPCLoggerChannel
implements AsyncLogger {
    private final Configuration conf;
    protected final InetSocketAddress addr;
    private QJournalProtocol proxy;
    private final ListeningExecutorService executor;
    private long ipcSerial = 0L;
    private long epoch = -1L;
    private long committedTxId = -12345L;
    private final String journalId;
    private final NamespaceInfo nsInfo;
    private int httpPort = -1;
    private final IPCLoggerChannelMetrics metrics;
    private int queuedEditsSizeBytes = 0;
    private long highestAckedTxId = 0L;
    private long lastAckNanos = 0L;
    private long lastCommitNanos = 0L;
    private final int queueSizeLimitBytes;
    private boolean outOfSync = false;
    private Stopwatch lastHeartbeatStopwatch = new Stopwatch();
    private static final long HEARTBEAT_INTERVAL_MILLIS = 1000L;
    private static final long WARN_JOURNAL_MILLIS_THRESHOLD = 1000L;
    static final AsyncLogger.Factory FACTORY = new AsyncLogger.Factory(){

        @Override
        public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) {
            return new IPCLoggerChannel(conf, nsInfo, journalId, addr);
        }
    };

    public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) {
        this.conf = conf;
        this.nsInfo = nsInfo;
        this.journalId = journalId;
        this.addr = addr;
        this.queueSizeLimitBytes = 0x100000 * conf.getInt("dfs.qjournal.queued-edits.limit.mb", 10);
        this.executor = MoreExecutors.listeningDecorator((ExecutorService)this.createExecutor());
        this.metrics = IPCLoggerChannelMetrics.create(this);
    }

    @Override
    public synchronized void setEpoch(long epoch) {
        this.epoch = epoch;
    }

    @Override
    public synchronized void setCommittedTxId(long txid) {
        Preconditions.checkArgument((txid >= this.committedTxId ? 1 : 0) != 0, (String)"Trying to move committed txid backwards in client old: %s new: %s", (Object[])new Object[]{this.committedTxId, txid});
        this.committedTxId = txid;
        this.lastCommitNanos = System.nanoTime();
    }

    @Override
    public void close() {
        this.executor.shutdown();
        if (this.proxy != null) {
            RPC.stopProxy(this.proxy);
        }
    }

    protected QJournalProtocol getProxy() throws IOException {
        if (this.proxy != null) {
            return this.proxy;
        }
        this.proxy = this.createProxy();
        return this.proxy;
    }

    protected QJournalProtocol createProxy() throws IOException {
        final Configuration confCopy = new Configuration(this.conf);
        confCopy.setBoolean("ipc.client.tcpnodelay", true);
        RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, ProtobufRpcEngine.class);
        return SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<QJournalProtocol>(){

            @Override
            public QJournalProtocol run() throws IOException {
                RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, ProtobufRpcEngine.class);
                QJournalProtocolPB pbproxy = RPC.getProxy(QJournalProtocolPB.class, RPC.getProtocolVersion(QJournalProtocolPB.class), IPCLoggerChannel.this.addr, confCopy);
                return new QJournalProtocolTranslatorPB(pbproxy);
            }
        });
    }

    @VisibleForTesting
    protected ExecutorService createExecutor() {
        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Logger channel to " + this.addr).setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit()).build());
    }

    @Override
    public URL buildURLToFetchLogs(long segmentTxId) {
        Preconditions.checkArgument((segmentTxId > 0L ? 1 : 0) != 0, (String)"Invalid segment: %s", (Object[])new Object[]{segmentTxId});
        Preconditions.checkState((this.httpPort != -1 ? 1 : 0) != 0, (Object)"HTTP port not set yet");
        try {
            String path = GetJournalEditServlet.buildPath(this.journalId, segmentTxId, this.nsInfo);
            return new URL("http", this.addr.getHostName(), this.httpPort, path.toString());
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    private synchronized RequestInfo createReqInfo() {
        Preconditions.checkState((this.epoch > 0L ? 1 : 0) != 0, (Object)("bad epoch: " + this.epoch));
        return new RequestInfo(this.journalId, this.epoch, this.ipcSerial++, this.committedTxId);
    }

    @VisibleForTesting
    synchronized long getNextIpcSerial() {
        return this.ipcSerial;
    }

    public synchronized int getQueuedEditsSize() {
        return this.queuedEditsSizeBytes;
    }

    public InetSocketAddress getRemoteAddress() {
        return this.addr;
    }

    public synchronized boolean isOutOfSync() {
        return this.outOfSync;
    }

    @VisibleForTesting
    void waitForAllPendingCalls() throws InterruptedException {
        try {
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                }
            }).get();
        }
        catch (ExecutionException e) {
            throw new AssertionError((Object)e);
        }
    }

    @Override
    public ListenableFuture<Boolean> isFormatted() {
        return this.executor.submit((Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws IOException {
                return IPCLoggerChannel.this.getProxy().isFormatted(IPCLoggerChannel.this.journalId);
            }
        });
    }

    @Override
    public ListenableFuture<QJournalProtocolProtos.GetJournalStateResponseProto> getJournalState() {
        return this.executor.submit((Callable)new Callable<QJournalProtocolProtos.GetJournalStateResponseProto>(){

            @Override
            public QJournalProtocolProtos.GetJournalStateResponseProto call() throws IOException {
                QJournalProtocolProtos.GetJournalStateResponseProto ret = IPCLoggerChannel.this.getProxy().getJournalState(IPCLoggerChannel.this.journalId);
                IPCLoggerChannel.this.httpPort = ret.getHttpPort();
                return ret;
            }
        });
    }

    @Override
    public ListenableFuture<QJournalProtocolProtos.NewEpochResponseProto> newEpoch(final long epoch) {
        return this.executor.submit((Callable)new Callable<QJournalProtocolProtos.NewEpochResponseProto>(){

            @Override
            public QJournalProtocolProtos.NewEpochResponseProto call() throws IOException {
                return IPCLoggerChannel.this.getProxy().newEpoch(IPCLoggerChannel.this.journalId, IPCLoggerChannel.this.nsInfo, epoch);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<Void> sendEdits(final long segmentTxId, final long firstTxnId, final int numTxns, final byte[] data) {
        ListenableFuture ret;
        block6: {
            try {
                this.reserveQueueSpace(data.length);
            }
            catch (LoggerTooFarBehindException e) {
                return Futures.immediateFailedFuture((Throwable)e);
            }
            final long submitNanos = System.nanoTime();
            ret = null;
            try {
                ret = this.executor.submit((Callable)new Callable<Void>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public Void call() throws IOException {
                        IPCLoggerChannel.this.throwIfOutOfSync();
                        long rpcSendTimeNanos = System.nanoTime();
                        try {
                            IPCLoggerChannel.this.getProxy().journal(IPCLoggerChannel.this.createReqInfo(), segmentTxId, firstTxnId, numTxns, data);
                        }
                        catch (IOException e) {
                            QuorumJournalManager.LOG.warn((Object)("Remote journal " + IPCLoggerChannel.this + " failed to " + "write txns " + firstTxnId + "-" + (firstTxnId + (long)numTxns - 1L) + ". Will try to write to this JN again after the next " + "log roll."), (Throwable)e);
                            IPCLoggerChannel iPCLoggerChannel = IPCLoggerChannel.this;
                            synchronized (iPCLoggerChannel) {
                                IPCLoggerChannel.this.outOfSync = true;
                            }
                            throw e;
                        }
                        finally {
                            long now = System.nanoTime();
                            long rpcTime = TimeUnit.MICROSECONDS.convert(now - rpcSendTimeNanos, TimeUnit.NANOSECONDS);
                            long endToEndTime = TimeUnit.MICROSECONDS.convert(now - submitNanos, TimeUnit.NANOSECONDS);
                            IPCLoggerChannel.this.metrics.addWriteEndToEndLatency(endToEndTime);
                            IPCLoggerChannel.this.metrics.addWriteRpcLatency(rpcTime);
                            if (rpcTime / 1000L > 1000L) {
                                QuorumJournalManager.LOG.warn((Object)("Took " + rpcTime / 1000L + "ms to send a batch of " + numTxns + " edits (" + data.length + " bytes) to " + "remote journal " + IPCLoggerChannel.this));
                            }
                        }
                        IPCLoggerChannel iPCLoggerChannel = IPCLoggerChannel.this;
                        synchronized (iPCLoggerChannel) {
                            IPCLoggerChannel.this.highestAckedTxId = firstTxnId + (long)numTxns - 1L;
                            IPCLoggerChannel.this.lastAckNanos = submitNanos;
                        }
                        return null;
                    }
                });
                if (ret != null) break block6;
                this.unreserveQueueSpace(data.length);
            }
            catch (Throwable throwable) {
                if (ret == null) {
                    this.unreserveQueueSpace(data.length);
                } else {
                    Futures.addCallback(ret, (FutureCallback)new FutureCallback<Void>(data){
                        final /* synthetic */ byte[] val$data;
                        {
                            this.val$data = byArray;
                        }

                        public void onFailure(Throwable t) {
                            IPCLoggerChannel.this.unreserveQueueSpace(this.val$data.length);
                        }

                        public void onSuccess(Void t) {
                            IPCLoggerChannel.this.unreserveQueueSpace(this.val$data.length);
                        }
                    });
                }
                throw throwable;
            }
        }
        Futures.addCallback((ListenableFuture)ret, (FutureCallback)new /* invalid duplicate definition of identical inner class */);
        return ret;
    }

    private void throwIfOutOfSync() throws JournalOutOfSyncException, IOException {
        if (this.isOutOfSync()) {
            this.heartbeatIfNecessary();
            throw new JournalOutOfSyncException("Journal disabled until next roll");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void heartbeatIfNecessary() throws IOException {
        if (this.lastHeartbeatStopwatch.elapsedMillis() > 1000L || !this.lastHeartbeatStopwatch.isRunning()) {
            try {
                this.getProxy().heartbeat(this.createReqInfo());
            }
            finally {
                this.lastHeartbeatStopwatch.reset().start();
            }
        }
    }

    private synchronized void reserveQueueSpace(int size) throws LoggerTooFarBehindException {
        Preconditions.checkArgument((size >= 0 ? 1 : 0) != 0);
        if (this.queuedEditsSizeBytes + size > this.queueSizeLimitBytes && this.queuedEditsSizeBytes > 0) {
            throw new LoggerTooFarBehindException();
        }
        this.queuedEditsSizeBytes += size;
    }

    private synchronized void unreserveQueueSpace(int size) {
        Preconditions.checkArgument((size >= 0 ? 1 : 0) != 0);
        this.queuedEditsSizeBytes -= size;
    }

    @Override
    public ListenableFuture<Void> format(final NamespaceInfo nsInfo) {
        return this.executor.submit((Callable)new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                IPCLoggerChannel.this.getProxy().format(IPCLoggerChannel.this.journalId, nsInfo);
                return null;
            }
        });
    }

    @Override
    public ListenableFuture<Void> startLogSegment(final long txid) {
        return this.executor.submit((Callable)new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() throws IOException {
                IPCLoggerChannel.this.getProxy().startLogSegment(IPCLoggerChannel.this.createReqInfo(), txid);
                IPCLoggerChannel iPCLoggerChannel = IPCLoggerChannel.this;
                synchronized (iPCLoggerChannel) {
                    if (IPCLoggerChannel.this.outOfSync) {
                        IPCLoggerChannel.this.outOfSync = false;
                        QuorumJournalManager.LOG.info((Object)("Restarting previously-stopped writes to " + IPCLoggerChannel.this + " in segment starting at txid " + txid));
                    }
                }
                return null;
            }
        });
    }

    @Override
    public ListenableFuture<Void> finalizeLogSegment(final long startTxId, final long endTxId) {
        return this.executor.submit((Callable)new Callable<Void>(){

            @Override
            public Void call() throws IOException {
                IPCLoggerChannel.this.throwIfOutOfSync();
                IPCLoggerChannel.this.getProxy().finalizeLogSegment(IPCLoggerChannel.this.createReqInfo(), startTxId, endTxId);
                return null;
            }
        });
    }

    @Override
    public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
        return this.executor.submit((Callable)new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                IPCLoggerChannel.this.getProxy().purgeLogsOlderThan(IPCLoggerChannel.this.createReqInfo(), minTxIdToKeep);
                return null;
            }
        });
    }

    @Override
    public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(final long fromTxnId, final boolean forReading, final boolean inProgressOk) {
        return this.executor.submit((Callable)new Callable<RemoteEditLogManifest>(){

            @Override
            public RemoteEditLogManifest call() throws IOException {
                QJournalProtocolProtos.GetEditLogManifestResponseProto ret = IPCLoggerChannel.this.getProxy().getEditLogManifest(IPCLoggerChannel.this.journalId, fromTxnId, forReading, inProgressOk);
                IPCLoggerChannel.this.httpPort = ret.getHttpPort();
                return PBHelper.convert(ret.getManifest());
            }
        });
    }

    @Override
    public ListenableFuture<QJournalProtocolProtos.PrepareRecoveryResponseProto> prepareRecovery(final long segmentTxId) {
        return this.executor.submit((Callable)new Callable<QJournalProtocolProtos.PrepareRecoveryResponseProto>(){

            @Override
            public QJournalProtocolProtos.PrepareRecoveryResponseProto call() throws IOException {
                if (IPCLoggerChannel.this.httpPort < 0) {
                    IPCLoggerChannel.this.httpPort = IPCLoggerChannel.this.getProxy().getJournalState(IPCLoggerChannel.this.journalId).getHttpPort();
                }
                return IPCLoggerChannel.this.getProxy().prepareRecovery(IPCLoggerChannel.this.createReqInfo(), segmentTxId);
            }
        });
    }

    @Override
    public ListenableFuture<Void> acceptRecovery(final QJournalProtocolProtos.SegmentStateProto log, final URL url) {
        return this.executor.submit((Callable)new Callable<Void>(){

            @Override
            public Void call() throws IOException {
                IPCLoggerChannel.this.getProxy().acceptRecovery(IPCLoggerChannel.this.createReqInfo(), log, url);
                return null;
            }
        });
    }

    public String toString() {
        return InetAddresses.toAddrString((InetAddress)this.addr.getAddress()) + ':' + this.addr.getPort();
    }

    @Override
    public synchronized void appendHtmlReport(StringBuilder sb) {
        sb.append("Written txid ").append(this.highestAckedTxId);
        long behind = this.getLagTxns();
        if (behind > 0L) {
            if (this.lastAckNanos != 0L) {
                long lagMillis = this.getLagTimeMillis();
                sb.append(" (" + behind + " txns/" + lagMillis + "ms behind)");
            } else {
                sb.append(" (never written");
            }
        }
        if (this.outOfSync) {
            sb.append(" (will try to re-sync on next segment)");
        }
    }

    public synchronized long getLagTxns() {
        return Math.max(this.committedTxId - this.highestAckedTxId, 0L);
    }

    public synchronized long getLagTimeMillis() {
        return TimeUnit.MILLISECONDS.convert(Math.max(this.lastCommitNanos - this.lastAckNanos, 0L), TimeUnit.NANOSECONDS);
    }
}

