/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

@Category(value={SmallTests.class})
public class TestIPC {
    public static final Log LOG = LogFactory.getLog(TestIPC.class);
    static byte[] CELL_BYTES = Bytes.toBytes((String)"xyz");
    static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
    static byte[] BIG_CELL_BYTES = new byte[10240];
    static Cell BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
    private static final Configuration CONF = HBaseConfiguration.create();
    private static final BlockingService SERVICE = TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface(){

        @Override
        public TestProtos.EmptyResponseProto ping(RpcController controller, TestProtos.EmptyRequestProto request) throws ServiceException {
            return null;
        }

        @Override
        public TestProtos.EmptyResponseProto error(RpcController controller, TestProtos.EmptyRequestProto request) throws ServiceException {
            return null;
        }

        @Override
        public TestProtos.EchoResponseProto echo(RpcController controller, TestProtos.EchoRequestProto request) throws ServiceException {
            if (controller instanceof PayloadCarryingRpcController) {
                PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
                CellScanner cellScanner = pcrc.cellScanner();
                ArrayList<Cell> list = null;
                if (cellScanner != null) {
                    list = new ArrayList<Cell>();
                    try {
                        while (cellScanner.advance()) {
                            list.add(cellScanner.current());
                        }
                    }
                    catch (IOException e) {
                        throw new ServiceException((Throwable)e);
                    }
                }
                cellScanner = CellUtil.createCellScanner(list);
                ((PayloadCarryingRpcController)controller).setCellScanner(cellScanner);
            }
            return TestProtos.EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
        }
    });

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNoCodec() throws InterruptedException, IOException {
        Configuration conf = HBaseConfiguration.create();
        RpcClient client = new RpcClient(conf, "default-cluster"){

            Codec getCodec() {
                return null;
            }
        };
        TestRpcServer rpcServer = new TestRpcServer();
        try {
            rpcServer.start();
            InetSocketAddress address = rpcServer.getListenerAddress();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            String message = "hello";
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            Pair r = client.call(md, (Message)param, null, (Message)md.getOutputType().toProto(), User.getCurrent(), address, 0);
            Assert.assertTrue((r.getSecond() == null ? 1 : 0) != 0);
            Assert.assertTrue((boolean)((Message)r.getFirst()).toString().contains("hello"));
        }
        finally {
            client.stop();
            rpcServer.stop();
        }
    }

    @Test
    public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException, NoSuchMethodException {
        Configuration conf = new Configuration(HBaseConfiguration.create());
        conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
        this.doSimpleTest(conf, new RpcClient(conf, "default-cluster"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSimpleTest(Configuration conf, RpcClient client) throws InterruptedException, IOException {
        TestRpcServer rpcServer = new TestRpcServer();
        ArrayList<Cell> cells = new ArrayList<Cell>();
        int count = 3;
        for (int i = 0; i < count; ++i) {
            cells.add(CELL);
        }
        try {
            rpcServer.start();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            InetSocketAddress address = rpcServer.getListenerAddress();
            if (address == null) {
                throw new IOException("Listener channel is closed");
            }
            Pair r = client.call(md, (Message)param, CellUtil.createCellScanner(cells), (Message)md.getOutputType().toProto(), User.getCurrent(), address, 0);
            int index = 0;
            while (((CellScanner)r.getSecond()).advance()) {
                Assert.assertTrue((boolean)CELL.equals(((CellScanner)r.getSecond()).current()));
                ++index;
            }
            Assert.assertEquals((long)count, (long)index);
        }
        finally {
            client.stop();
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRTEDuringConnectionSetup() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        SocketFactory spyFactory = (SocketFactory)Mockito.spy((Object)NetUtils.getDefaultSocketFactory((Configuration)conf));
        ((SocketFactory)Mockito.doAnswer((Answer)new Answer<Socket>(){

            public Socket answer(InvocationOnMock invocation) throws Throwable {
                Socket s = (Socket)Mockito.spy((Object)((Socket)invocation.callRealMethod()));
                ((Socket)Mockito.doThrow((Throwable)new RuntimeException("Injected fault")).when((Object)s)).setSoTimeout(Matchers.anyInt());
                return s;
            }
        }).when((Object)spyFactory)).createSocket();
        TestRpcServer rpcServer = new TestRpcServer();
        RpcClient client = new RpcClient(conf, "default-cluster", spyFactory);
        try {
            rpcServer.start();
            InetSocketAddress address = rpcServer.getListenerAddress();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            client.call(md, (Message)param, null, null, User.getCurrent(), address, 0);
            Assert.fail((String)"Expected an exception to have been thrown!");
        }
        catch (Exception e) {
            LOG.info((Object)("Caught expected exception: " + e.toString()));
            Assert.assertTrue((boolean)StringUtils.stringifyException((Throwable)e).contains("Injected fault"));
        }
        finally {
            client.stop();
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRpcScheduler() throws IOException, InterruptedException {
        RpcScheduler scheduler = (RpcScheduler)Mockito.spy((Object)new FifoRpcScheduler(CONF, 1));
        TestRpcServer rpcServer = new TestRpcServer(scheduler);
        ((RpcScheduler)Mockito.verify((Object)scheduler)).init((RpcScheduler.Context)Matchers.anyObject());
        RpcClient client = new RpcClient(CONF, "default-cluster");
        try {
            rpcServer.start();
            ((RpcScheduler)Mockito.verify((Object)scheduler)).start();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            for (int i = 0; i < 10; ++i) {
                client.call(md, (Message)param, CellUtil.createCellScanner((Iterable)ImmutableList.of((Object)CELL)), (Message)md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
            }
            ((RpcScheduler)Mockito.verify((Object)scheduler, (VerificationMode)VerificationModeFactory.times((int)10))).dispatch((CallRunner)Matchers.anyObject());
        }
        finally {
            rpcServer.stop();
            ((RpcScheduler)Mockito.verify((Object)scheduler)).stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException, ServiceException {
        FifoRpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
        TestRpcServer1 rpcServer = new TestRpcServer1((RpcScheduler)scheduler);
        InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
        RpcClient client = new RpcClient(CONF, "default-cluster", (SocketAddress)localAddr, null);
        try {
            rpcServer.start();
            InetSocketAddress isa = rpcServer.getListenerAddress();
            if (isa == null) {
                throw new IOException("Listener channel is closed");
            }
            BlockingRpcChannel channel = client.createBlockingRpcChannel(ServerName.valueOf((String)isa.getHostName(), (int)isa.getPort(), (long)System.currentTimeMillis()), User.getCurrent(), 0);
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
            TestProtos.EchoRequestProto echoRequest = TestProtos.EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
            TestProtos.EchoResponseProto echoResponse = stub.echo(null, echoRequest);
            Assert.assertEquals((Object)localAddr.getAddress().getHostAddress(), (Object)echoResponse.getMessage());
        }
        finally {
            client.stop();
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws IOException, SecurityException, NoSuchMethodException, InterruptedException {
        if (args.length != 2) {
            System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
            return;
        }
        int cycles = Integer.parseInt(args[0]);
        int cellcount = Integer.parseInt(args[1]);
        Configuration conf = HBaseConfiguration.create();
        TestRpcServer rpcServer = new TestRpcServer();
        Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
        TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
        RpcClient client = new RpcClient(conf, "default-cluster");
        KeyValue kv = KeyValueUtil.ensureKeyValue((Cell)BIG_CELL);
        Put p = new Put(kv.getRow());
        for (int i = 0; i < cellcount; ++i) {
            p.add((Cell)kv);
        }
        RowMutations rm = new RowMutations(kv.getRow());
        rm.add(p);
        try {
            rpcServer.start();
            long startTime = System.currentTimeMillis();
            User user = User.getCurrent();
            InetSocketAddress address = rpcServer.getListenerAddress();
            if (address == null) {
                throw new IOException("Listener channel is closed");
            }
            for (int i = 0; i < cycles; ++i) {
                ArrayList cells = new ArrayList();
                ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction((byte[])HConstants.EMPTY_BYTE_ARRAY, (RowMutations)rm, cells, (ClientProtos.RegionAction.Builder)ClientProtos.RegionAction.newBuilder(), (ClientProtos.Action.Builder)ClientProtos.Action.newBuilder(), (ClientProtos.MutationProto.Builder)ClientProtos.MutationProto.newBuilder());
                builder.setRegion(HBaseProtos.RegionSpecifier.newBuilder().setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).setValue(ByteString.copyFrom((byte[])HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
                if (i % 100000 == 0) {
                    LOG.info((Object)("" + i));
                }
                CellScanner cellScanner = CellUtil.createCellScanner(cells);
                Pair response = client.call(md, (Message)builder.build(), cellScanner, (Message)param, user, address, 0, 0, new MetricsConnection.CallStats());
            }
            LOG.info((Object)("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " + (System.currentTimeMillis() - startTime) + "ms"));
        }
        finally {
            client.stop();
            rpcServer.stop();
        }
    }

    static class TestRpcServer1
    extends RpcServer {
        private static TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface SERVICE1 = new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface(){

            @Override
            public TestProtos.EmptyResponseProto ping(RpcController unused, TestProtos.EmptyRequestProto request) throws ServiceException {
                return TestProtos.EmptyResponseProto.newBuilder().build();
            }

            @Override
            public TestProtos.EchoResponseProto echo(RpcController unused, TestProtos.EchoRequestProto request) throws ServiceException {
                InetAddress remoteAddr = TestRpcServer1.getRemoteAddress();
                String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress();
                return TestProtos.EchoResponseProto.newBuilder().setMessage(message).build();
            }

            @Override
            public TestProtos.EmptyResponseProto error(RpcController unused, TestProtos.EmptyRequestProto request) throws ServiceException {
                throw new ServiceException("error", (Throwable)new IOException("error"));
            }
        };

        TestRpcServer1() throws IOException {
            this((RpcScheduler)new FifoRpcScheduler(CONF, 1));
        }

        TestRpcServer1(RpcScheduler scheduler) throws IOException {
            super(null, "testRemoteAddressInCallObject", (List)Lists.newArrayList((Object[])new RpcServer.BlockingServiceAndInterface[]{new RpcServer.BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(SERVICE1), null)}), new InetSocketAddress("localhost", 0), CONF, scheduler);
        }
    }

    private static class TestRpcServer
    extends RpcServer {
        TestRpcServer() throws IOException {
            this((RpcScheduler)new FifoRpcScheduler(CONF, 1));
        }

        TestRpcServer(RpcScheduler scheduler) throws IOException {
            super(null, "testRpcServer", (List)Lists.newArrayList((Object[])new RpcServer.BlockingServiceAndInterface[]{new RpcServer.BlockingServiceAndInterface(SERVICE, null)}), new InetSocketAddress("localhost", 0), CONF, scheduler);
        }

        public Pair<Message, CellScanner> call(BlockingService service, Descriptors.MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException {
            return super.call(service, md, param, cellScanner, receiveTime, status);
        }
    }
}

