package org.apache.drill.exec.rpc.data;

import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataProtobufLengthDecoder;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/rpc/data/DataServer.class */
public class DataServer extends BasicServer<BitData.RpcType, BitServerConnection> {
    static final Logger logger;
    private volatile ProxyCloseHandler proxyCloseHandler;
    private final BootStrapContext context;
    private final WorkEventBus workBus;
    private final DataResponseHandler dataHandler;
    private static final BitData.FragmentRecordBatch OOM_FRAGMENT;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/rpc/data/DataServer$ProxyCloseHandler.class */
    public class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
        private volatile GenericFutureListener<ChannelFuture> handler;

        public ProxyCloseHandler(GenericFutureListener<ChannelFuture> genericFutureListener) {
            this.handler = genericFutureListener;
        }

        @Override // io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            this.handler.operationComplete(channelFuture);
        }
    }

    public DataServer(BootStrapContext bootStrapContext, WorkEventBus workEventBus, DataResponseHandler dataResponseHandler) {
        super(DataRpcConfig.getMapping(bootStrapContext.getConfig()), bootStrapContext.getAllocator().getUnderlyingAllocator(), bootStrapContext.getBitLoopGroup());
        this.context = bootStrapContext;
        this.workBus = workEventBus;
        this.dataHandler = dataResponseHandler;
    }

    @Override // org.apache.drill.exec.rpc.BasicServer, org.apache.drill.exec.rpc.RpcBus
    public MessageLite getResponseDefaultInstance(int i) throws RpcException {
        return DataDefaultInstanceHandler.getResponseDefaultInstanceServer(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.drill.exec.rpc.RpcBus
    public GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel socketChannel, BitServerConnection bitServerConnection) {
        this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(socketChannel, (SocketChannel) bitServerConnection));
        return this.proxyCloseHandler;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.drill.exec.rpc.BasicServer
    public BitServerConnection initRemoteConnection(SocketChannel socketChannel) {
        super.initRemoteConnection(socketChannel);
        return new BitServerConnection(socketChannel, this.context.getAllocator());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.drill.exec.rpc.BasicServer
    public BasicServer.ServerHandshakeHandler<BitData.BitClientHandshake> getHandshakeHandler(BitServerConnection bitServerConnection) {
        return new BasicServer.ServerHandshakeHandler<BitData.BitClientHandshake>(BitData.RpcType.HANDSHAKE, BitData.BitClientHandshake.PARSER) { // from class: org.apache.drill.exec.rpc.data.DataServer.1
            @Override // org.apache.drill.exec.rpc.BasicServer.ServerHandshakeHandler
            public MessageLite getHandshakeResponse(BitData.BitClientHandshake bitClientHandshake) throws Exception {
                if (bitClientHandshake.getRpcVersion() != DataRpcConfig.RPC_VERSION) {
                    throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", Integer.valueOf(bitClientHandshake.getRpcVersion()), Integer.valueOf(DataRpcConfig.RPC_VERSION)));
                }
                if (bitClientHandshake.getChannel() != UserBitShared.RpcChannel.BIT_DATA) {
                    throw new RpcException(String.format("Invalid NodeMode.  Expected BIT_DATA but received %s.", bitClientHandshake.getChannel()));
                }
                return BitData.BitServerHandshake.newBuilder().setRpcVersion(DataRpcConfig.RPC_VERSION).build();
            }
        };
    }

    private static ExecProtos.FragmentHandle getHandle(BitData.FragmentRecordBatch fragmentRecordBatch, int i) {
        return ExecProtos.FragmentHandle.newBuilder().setQueryId(fragmentRecordBatch.getQueryId()).setMajorFragmentId(fragmentRecordBatch.getReceivingMajorFragmentId()).setMinorFragmentId(fragmentRecordBatch.getReceivingMinorFragmentId(i)).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.drill.exec.rpc.RpcBus
    public void handle(BitServerConnection bitServerConnection, int i, ByteBuf byteBuf, ByteBuf byteBuf2, ResponseSender responseSender) throws RpcException {
        if (!$assertionsDisabled && i != 3) {
            throw new AssertionError();
        }
        BitData.FragmentRecordBatch fragmentRecordBatch = (BitData.FragmentRecordBatch) get(byteBuf, BitData.FragmentRecordBatch.PARSER);
        int receivingMinorFragmentIdCount = fragmentRecordBatch.getReceivingMinorFragmentIdCount();
        AckSender ackSender = new AckSender(responseSender);
        ackSender.increment();
        try {
            try {
                if (byteBuf2 == null) {
                    for (int i2 = 0; i2 < receivingMinorFragmentIdCount; i2++) {
                        FragmentManager fragmentManager = this.workBus.getFragmentManager(getHandle(fragmentRecordBatch, i2));
                        if (fragmentManager != null) {
                            ackSender.increment();
                            this.dataHandler.handle(fragmentManager, fragmentRecordBatch, null, ackSender);
                        }
                    }
                } else {
                    for (int i3 = 0; i3 < receivingMinorFragmentIdCount; i3++) {
                        send(fragmentRecordBatch, (DrillBuf) byteBuf2, i3, ackSender);
                    }
                }
                ackSender.sendOk();
            } catch (IOException | FragmentSetupException e) {
                logger.error("Failure while getting fragment manager. {}", QueryIdHelper.getQueryIdentifiers(fragmentRecordBatch.getQueryId(), fragmentRecordBatch.getReceivingMajorFragmentId(), fragmentRecordBatch.getReceivingMinorFragmentIdList()), e);
                ackSender.clear();
                responseSender.send(new Response(BitData.RpcType.ACK, Acks.FAIL, new ByteBuf[0]));
                ackSender.sendOk();
            }
        } catch (Throwable th) {
            ackSender.sendOk();
            throw th;
        }
    }

    private void send(BitData.FragmentRecordBatch fragmentRecordBatch, DrillBuf drillBuf, int i, AckSender ackSender) throws FragmentSetupException, IOException {
        FragmentManager fragmentManager = this.workBus.getFragmentManager(getHandle(fragmentRecordBatch, i));
        if (fragmentManager == null) {
            return;
        }
        BufferAllocator allocator = fragmentManager.getFragmentContext().getAllocator();
        Pointer<DrillBuf> pointer = new Pointer<>();
        if (!allocator.takeOwnership(drillBuf, pointer)) {
            this.dataHandler.handle(fragmentManager, OOM_FRAGMENT, null, null);
        }
        ackSender.increment();
        this.dataHandler.handle(fragmentManager, fragmentRecordBatch, (DrillBuf) pointer.value, ackSender);
        ((DrillBuf) pointer.value).release();
    }

    @Override // org.apache.drill.exec.rpc.BasicServer
    public OutOfMemoryHandler getOutOfMemoryHandler() {
        return new OutOfMemoryHandler() { // from class: org.apache.drill.exec.rpc.data.DataServer.2
            @Override // org.apache.drill.exec.rpc.OutOfMemoryHandler
            public void handle() {
                DataServer.this.dataHandler.informOutOfMemory();
            }
        };
    }

    @Override // org.apache.drill.exec.rpc.BasicServer
    public ProtobufLengthDecoder getDecoder(BufferAllocator bufferAllocator, OutOfMemoryHandler outOfMemoryHandler) {
        return new DataProtobufLengthDecoder.Server(bufferAllocator, outOfMemoryHandler);
    }

    static {
        $assertionsDisabled = !DataServer.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(DataServer.class);
        OOM_FRAGMENT = BitData.FragmentRecordBatch.newBuilder().setIsOutOfMemory(true).build();
    }
}
