package org.apache.hama.ipc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

/* loaded from: input_file:org/apache/hama/ipc/AsyncServer.class */
public abstract class AsyncServer {
    private SaslRpcServer.AuthMethod authMethod;
    UserGroupInformation user;
    static final byte CURRENT_VERSION = 4;
    static final int HEADER_LENGTH = 10;
    private Configuration conf;
    private final boolean tcpNoDelay;
    private int backlogLength;
    InetSocketAddress address;
    private final int maxRespSize;
    static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY = "ipc.server.max.response.size";
    static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1048576;
    private int port;
    private Class<? extends Writable> paramClass;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ExceptionsHandler exceptionsHandler;
    static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
    static int INITIAL_RESP_BUF_SIZE = 1024;
    private static final Log LOG = LogFactory.getLog(AsyncServer.class);
    private static int NIO_BUFFER_LIMIT = 8192;
    private static final ThreadLocal<AsyncServer> SERVER = new ThreadLocal<>();
    private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/ipc/AsyncServer$Call.class */
    public static class Call {
        private int id;
        private Writable param;
        private ChannelInboundHandlerAdapter connection;
        private long timestamp = System.currentTimeMillis();
        private ByteBuffer response = null;

        public Call(int i, Writable writable, ChannelInboundHandlerAdapter channelInboundHandlerAdapter) {
            this.id = i;
            this.param = writable;
            this.connection = channelInboundHandlerAdapter;
        }

        public String toString() {
            return this.param.toString() + " from " + this.connection.toString();
        }

        public void setResponse(ByteBuffer byteBuffer) {
            this.response = byteBuffer;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/ipc/AsyncServer$ExceptionsHandler.class */
    public static class ExceptionsHandler {
        private volatile Set<String> terseExceptions = new HashSet();

        ExceptionsHandler() {
        }

        void addTerseExceptions(Class<?>... clsArr) {
            HashSet hashSet = new HashSet(this.terseExceptions);
            for (Class<?> cls : clsArr) {
                hashSet.add(cls.toString());
            }
            this.terseExceptions = Collections.unmodifiableSet(hashSet);
        }

        boolean isTerse(Class<?> cls) {
            return this.terseExceptions.contains(cls.toString());
        }
    }

    /* loaded from: input_file:org/apache/hama/ipc/AsyncServer$NioFrameDecoder.class */
    public class NioFrameDecoder extends LengthFieldBasedFrameDecoder {
        public NioFrameDecoder(int i, int i2, int i3, int i4, int i5) {
            super(i, i2, i3, i4, i5);
        }

        protected Object decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
            ByteBuf byteBuf2 = (ByteBuf) super.decode(channelHandlerContext, byteBuf);
            if (byteBuf2 == null) {
                return null;
            }
            return byteBuf2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/ipc/AsyncServer$NioServerInboundHandler.class */
    public class NioServerInboundHandler extends ChannelInboundHandlerAdapter {
        ConnectionHeader header;
        Class<?> protocol;
        private String errorClass;
        private String error;
        private boolean rpcHeaderRead;
        private boolean headerRead;

        private NioServerInboundHandler() {
            this.header = new ConnectionHeader();
            this.errorClass = null;
            this.error = null;
            this.rpcHeaderRead = false;
            this.headerRead = false;
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            AsyncServer.SERVER.set(AsyncServer.this);
        }

        /* JADX WARN: Code restructure failed: missing block: B:63:0x00ed, code lost:
        
            org.apache.hama.ipc.AsyncServer.LOG.warn("Incorrect header or version mismatch from " + r7.this$0.address.getHostName() + ":" + r7.this$0.address.getPort() + " got version " + ((int) r0) + " expected version 4");
         */
        /* JADX WARN: Code restructure failed: missing block: B:64:0x0137, code lost:
        
            io.netty.util.ReferenceCountUtil.release(r9);
         */
        /* JADX WARN: Code restructure failed: missing block: B:65:0x013b, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:76:0x0049, code lost:
        
            io.netty.util.ReferenceCountUtil.release(r9);
         */
        /* JADX WARN: Code restructure failed: missing block: B:77:0x004e, code lost:
        
            return;
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void channelRead(io.netty.channel.ChannelHandlerContext r8, java.lang.Object r9) {
            /*
                Method dump skipped, instructions count: 607
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hama.ipc.AsyncServer.NioServerInboundHandler.channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object):void");
        }

        private void sendResponse(ChannelHandlerContext channelHandlerContext, Call call) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(AsyncServer.INITIAL_RESP_BUF_SIZE);
            Writable writable = null;
            try {
                writable = AsyncServer.this.call(this.protocol, call.param, call.timestamp);
            } catch (Throwable th) {
                String str = getClass().getName() + ", call " + call + ": error: " + th;
                if ((th instanceof RuntimeException) || (th instanceof Error)) {
                    AsyncServer.LOG.warn(str, th);
                } else if (AsyncServer.this.exceptionsHandler.isTerse(th.getClass())) {
                    AsyncServer.LOG.info(str);
                } else {
                    AsyncServer.LOG.info(str, th);
                }
                this.errorClass = th.getClass().getName();
                this.error = StringUtils.stringifyException(th);
            }
            try {
                try {
                    AsyncServer.this.setupResponse(byteArrayOutputStream, call, this.error == null ? Status.SUCCESS : Status.ERROR, writable, this.errorClass, this.error);
                    if (byteArrayOutputStream.size() > AsyncServer.this.maxRespSize) {
                        AsyncServer.LOG.warn("Large response size " + byteArrayOutputStream.size() + " for call " + call.toString());
                        byteArrayOutputStream = new ByteArrayOutputStream(AsyncServer.INITIAL_RESP_BUF_SIZE);
                    }
                    AsyncServer.this.channelWrite(channelHandlerContext, call.response);
                    IOUtils.closeStream(byteArrayOutputStream);
                } catch (Exception e) {
                    AsyncServer.LOG.info(getClass().getName() + " caught: " + StringUtils.stringifyException(e));
                    this.error = null;
                    IOUtils.closeStream(byteArrayOutputStream);
                }
            } catch (Throwable th2) {
                IOUtils.closeStream(byteArrayOutputStream);
                throw th2;
            }
        }

        private Call processOneRpc(byte[] bArr) throws IOException {
            if (this.headerRead) {
                return processData(bArr);
            }
            processHeader(bArr);
            this.headerRead = true;
            return null;
        }

        private void processHeader(byte[] bArr) {
            DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bArr));
            try {
                try {
                    this.header.readFields(dataInputStream);
                    if (this.header.getProtocol() != null) {
                        this.protocol = AsyncServer.getProtocolClass(this.header.getProtocol(), AsyncServer.this.conf);
                    }
                    AsyncServer.this.user = this.header.getUgi();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } finally {
                IOUtils.closeStream(dataInputStream);
            }
        }

        private Call processData(byte[] bArr) {
            DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bArr));
            try {
                try {
                    int readInt = dataInputStream.readInt();
                    if (AsyncServer.LOG.isDebugEnabled()) {
                        AsyncServer.LOG.debug(" got #" + readInt);
                    }
                    Writable writable = (Writable) ReflectionUtils.newInstance(AsyncServer.this.paramClass, AsyncServer.this.conf);
                    writable.readFields(dataInputStream);
                    Call call = new Call(readInt, writable, this);
                    IOUtils.closeStream(dataInputStream);
                    return call;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                IOUtils.closeStream(dataInputStream);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/ipc/AsyncServer$NioServerListener.class */
    public class NioServerListener implements Callable<ChannelFuture> {
        private NioServerListener() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public ChannelFuture call() throws Exception {
            AsyncServer.SERVER.set(AsyncServer.this);
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(AsyncServer.this.bossGroup, AsyncServer.this.workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, Integer.valueOf(AsyncServer.this.backlogLength)).childOption(ChannelOption.MAX_MESSAGES_PER_READ, Integer.valueOf(AsyncServer.NIO_BUFFER_LIMIT)).childOption(ChannelOption.TCP_NODELAY, Boolean.valueOf(AsyncServer.this.tcpNoDelay)).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_RCVBUF, 31457280).childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(102400)).childHandler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.hama.ipc.AsyncServer.NioServerListener.1
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast(new ChannelHandler[]{new NioFrameDecoder(104857600, 0, 4, 0, 0)});
                    pipeline.addLast(new ChannelHandler[]{new NioServerInboundHandler()});
                }
            });
            ChannelFuture sync = serverBootstrap.bind(AsyncServer.this.port).sync();
            AsyncServer.LOG.info("AsyncServer startup");
            return sync.channel().closeFuture();
        }
    }

    static Class<?> getProtocolClass(String str, Configuration configuration) throws ClassNotFoundException {
        Class<?> cls = PROTOCOL_CACHE.get(str);
        if (cls == null) {
            cls = configuration.getClassByName(str);
            PROTOCOL_CACHE.put(str, cls);
        }
        return cls;
    }

    public InetSocketAddress getAddress() {
        return this.address;
    }

    public static AsyncServer get() {
        return SERVER.get();
    }

    protected AsyncServer(String str, int i, Class<? extends Writable> cls, int i2, Configuration configuration) throws IOException {
        this(str, i, cls, i2, configuration, Integer.toString(i), null);
    }

    protected AsyncServer(String str, int i, Class<? extends Writable> cls, int i2, Configuration configuration, String str2) throws IOException {
        this(str, i, cls, i2, configuration, str2, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AsyncServer(String str, int i, Class<? extends Writable> cls, int i2, Configuration configuration, String str2, SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
        this.user = null;
        this.bossGroup = new NioEventLoopGroup(1);
        this.workerGroup = new NioEventLoopGroup();
        this.exceptionsHandler = new ExceptionsHandler();
        this.conf = configuration;
        this.port = i;
        this.address = new InetSocketAddress(str, i);
        this.paramClass = cls;
        this.maxRespSize = configuration.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
        this.tcpNoDelay = configuration.getBoolean("ipc.server.tcpnodelay", true);
        this.backlogLength = configuration.getInt("ipc.server.listen.queue.size", 100);
    }

    public void start() throws ExecutionException, InterruptedException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            ((ChannelFuture) newSingleThreadExecutor.submit(new NioServerListener()).get()).addListener(new GenericFutureListener<Future<Void>>() { // from class: org.apache.hama.ipc.AsyncServer.1
                public void operationComplete(Future<Void> future) throws Exception {
                    AsyncServer.this.stop();
                }
            });
            newSingleThreadExecutor.shutdown();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            throw th;
        }
    }

    public void stop() {
        if (this.bossGroup != null && !this.bossGroup.isTerminated()) {
            this.bossGroup.shutdownGracefully();
        }
        if (this.workerGroup != null && !this.workerGroup.isTerminated()) {
            this.workerGroup.shutdownGracefully();
        }
        LOG.info("AsyncServer gracefully shutdown");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupResponse(ByteArrayOutputStream byteArrayOutputStream, Call call, Status status, Writable writable, String str, String str2) throws IOException {
        byteArrayOutputStream.reset();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        dataOutputStream.writeInt(call.id);
        dataOutputStream.writeInt(status.state);
        if (status == Status.SUCCESS) {
            writable.write(dataOutputStream);
        } else {
            WritableUtils.writeString(dataOutputStream, str);
            WritableUtils.writeString(dataOutputStream, str2);
        }
        call.setResponse(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
        IOUtils.closeStream(dataOutputStream);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void channelWrite(ChannelHandlerContext channelHandlerContext, ByteBuffer byteBuffer) {
        try {
            ByteBuf buffer = channelHandlerContext.alloc().buffer();
            buffer.writeBytes(byteBuffer.array());
            channelHandlerContext.writeAndFlush(buffer);
        } catch (Throwable th) {
            th.printStackTrace();
        }
    }

    public abstract Writable call(Class<?> cls, Writable writable, long j) throws IOException;
}
