/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.qjournal.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.hadoop.shaded.com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
import org.apache.hadoop.hdfs.qjournal.client.LoggerTooFarBehindException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestIPCLoggerChannel {
    private static final Log LOG = LogFactory.getLog(TestIPCLoggerChannel.class);
    private final Configuration conf = new Configuration();
    private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(12345, "mycluster", "my-bp", 0L);
    private static final String JID = "test-journalid";
    private static final InetSocketAddress FAKE_ADDR = new InetSocketAddress(0);
    private static final byte[] FAKE_DATA = new byte[4096];
    private final QJournalProtocol mockProxy = (QJournalProtocol)Mockito.mock(QJournalProtocol.class);
    private IPCLoggerChannel ch;
    private static final int LIMIT_QUEUE_SIZE_MB = 1;
    private static final int LIMIT_QUEUE_SIZE_BYTES = 0x100000;

    @Before
    public void setupMock() {
        this.conf.setInt("dfs.qjournal.queued-edits.limit.mb", 1);
        this.ch = new IPCLoggerChannel(this.conf, FAKE_NSINFO, JID, FAKE_ADDR){

            @Override
            protected QJournalProtocol getProxy() throws IOException {
                return TestIPCLoggerChannel.this.mockProxy;
            }
        };
        this.ch.setEpoch(1L);
    }

    @Test
    public void testSimpleCall() throws Exception {
        this.ch.sendEdits(1L, 1L, 3, FAKE_DATA).get();
        ((QJournalProtocol)Mockito.verify((Object)this.mockProxy)).journal((RequestInfo)Mockito.any(), Mockito.eq((long)1L), Mockito.eq((long)1L), Mockito.eq((int)3), (byte[])Mockito.same((Object)FAKE_DATA));
    }

    @Test
    public void testQueueLimiting() throws Exception {
        GenericTestUtils.DelayAnswer delayer;
        block3: {
            delayer = new GenericTestUtils.DelayAnswer(LOG);
            ((QJournalProtocol)Mockito.doAnswer((Answer)delayer).when((Object)this.mockProxy)).journal((RequestInfo)Mockito.any(), Mockito.eq((long)1L), Mockito.eq((long)1L), Mockito.eq((int)1), (byte[])Mockito.same((Object)FAKE_DATA));
            int numToQueue = 0x100000 / FAKE_DATA.length;
            for (int i = 1; i <= numToQueue; ++i) {
                this.ch.sendEdits(1L, i, 1, FAKE_DATA);
            }
            Assert.assertEquals((long)0x100000L, (long)this.ch.getQueuedEditsSize());
            try {
                this.ch.sendEdits(1L, numToQueue + 1, 1, FAKE_DATA).get(1L, TimeUnit.SECONDS);
                Assert.fail((String)"Did not fail to queue more calls after queue was full");
            }
            catch (ExecutionException ee) {
                if (ee.getCause() instanceof LoggerTooFarBehindException) break block3;
                throw ee;
            }
        }
        delayer.proceed();
        GenericTestUtils.waitFor(new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                return TestIPCLoggerChannel.this.ch.getQueuedEditsSize() == 0;
            }
        }, 10, 1000);
    }

    @Test
    public void testStopSendingEditsWhenOutOfSync() throws Exception {
        ((QJournalProtocol)Mockito.doThrow((Throwable)new IOException("injected error")).when((Object)this.mockProxy)).journal((RequestInfo)Mockito.any(), Mockito.eq((long)1L), Mockito.eq((long)1L), Mockito.eq((int)1), (byte[])Mockito.same((Object)FAKE_DATA));
        try {
            this.ch.sendEdits(1L, 1L, 1, FAKE_DATA).get();
            Assert.fail((String)"Injected JOOSE did not cause sendEdits() to throw");
        }
        catch (ExecutionException ee) {
            GenericTestUtils.assertExceptionContains("injected", ee);
        }
        ((QJournalProtocol)Mockito.verify((Object)this.mockProxy)).journal((RequestInfo)Mockito.any(), Mockito.eq((long)1L), Mockito.eq((long)1L), Mockito.eq((int)1), (byte[])Mockito.same((Object)FAKE_DATA));
        Assert.assertTrue((boolean)this.ch.isOutOfSync());
        try {
            this.ch.sendEdits(1L, 2L, 1, FAKE_DATA).get();
            Assert.fail((String)"sendEdits() should throw until next roll");
        }
        catch (ExecutionException ee) {
            GenericTestUtils.assertExceptionContains("disabled until next roll", ee.getCause());
        }
        ((QJournalProtocol)Mockito.verify((Object)this.mockProxy, (VerificationMode)Mockito.never())).journal((RequestInfo)Mockito.any(), Mockito.eq((long)1L), Mockito.eq((long)2L), Mockito.eq((int)1), (byte[])Mockito.same((Object)FAKE_DATA));
        ((QJournalProtocol)Mockito.verify((Object)this.mockProxy)).heartbeat((RequestInfo)Mockito.any());
        this.ch.startLogSegment(3L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        Assert.assertFalse((boolean)this.ch.isOutOfSync());
        this.ch.sendEdits(3L, 3L, 1, FAKE_DATA).get();
    }
}

