/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.apache.arrow.driver.jdbc.shaded.com.google.common.base.Strings;
import org.apache.arrow.driver.jdbc.shaded.io.grpc.stub.ServerCallStreamObserver;
import org.apache.arrow.driver.jdbc.shaded.io.grpc.stub.StreamObserver;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.Action;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.ActionType;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.ArrowMessage;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.CallStatus;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.Criteria;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.FlightServerMiddleware;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.FlightStream;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.OutboundStreamListenerImpl;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.PollInfo;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.PutResult;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.RequestContext;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.Result;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.SchemaResult;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.StreamPipe;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.Ticket;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.auth.AuthConstants;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.auth.ServerAuthWrapper;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.grpc.ContextPropagatingExecutorService;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.grpc.RequestContextAdapter;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.grpc.ServerInterceptorAdapter;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.grpc.StatusUtils;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.driver.jdbc.shaded.org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.driver.jdbc.shaded.org.slf4j.Logger;
import org.apache.arrow.driver.jdbc.shaded.org.slf4j.LoggerFactory;

class FlightService
extends FlightServiceGrpc.FlightServiceImplBase {
    private static final Logger logger = LoggerFactory.getLogger(FlightService.class);
    private static final int PENDING_REQUESTS = 5;
    private final BufferAllocator allocator;
    private final FlightProducer producer;
    private final ServerAuthHandler authHandler;
    private final ExecutorService executors;

    FlightService(BufferAllocator allocator, FlightProducer producer, ServerAuthHandler authHandler, ExecutorService executors) {
        this.allocator = allocator;
        this.producer = producer;
        this.authHandler = authHandler;
        this.executors = new ContextPropagatingExecutorService(executors);
    }

    private CallContext makeContext(ServerCallStreamObserver<?> responseObserver) {
        RequestContext context = RequestContextAdapter.REQUEST_CONTEXT_KEY.get();
        String peerIdentity = null;
        if (context != null) {
            peerIdentity = context.get("arrow-flight-peer-identity");
        }
        if (Strings.isNullOrEmpty(peerIdentity)) {
            peerIdentity = AuthConstants.PEER_IDENTITY_KEY.get();
        }
        return new CallContext(peerIdentity, responseObserver::isCancelled);
    }

    @Override
    public StreamObserver<Flight.HandshakeRequest> handshake(StreamObserver<Flight.HandshakeResponse> responseObserver) {
        return ServerAuthWrapper.wrapHandshake(this.authHandler, responseObserver, this.executors);
    }

    @Override
    public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightInfo> responseObserver) {
        StreamPipe<FlightInfo, Flight.FlightInfo> listener = StreamPipe.wrap(responseObserver, FlightInfo::toProtocol, this::handleExceptionWithMiddleware);
        try {
            CallContext context = this.makeContext((ServerCallStreamObserver)responseObserver);
            this.producer.listFlights(context, new Criteria(criteria), listener);
        }
        catch (Exception ex) {
            listener.onError(ex);
        }
    }

    public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> responseObserverSimple) {
        ServerCallStreamObserver responseObserver = (ServerCallStreamObserver)responseObserverSimple;
        GetListener listener = new GetListener(responseObserver, this::handleExceptionWithMiddleware);
        try {
            this.producer.getStream(this.makeContext(responseObserver), new Ticket(ticket), listener);
        }
        catch (Exception ex) {
            listener.error(ex);
        }
    }

    @Override
    public void doAction(Flight.Action request, StreamObserver<Flight.Result> responseObserver) {
        StreamPipe<Result, Flight.Result> listener = StreamPipe.wrap(responseObserver, Result::toProtocol, this::handleExceptionWithMiddleware);
        try {
            CallContext context = this.makeContext((ServerCallStreamObserver)responseObserver);
            this.producer.doAction(context, new Action(request), listener);
        }
        catch (Exception ex) {
            listener.onError(ex);
        }
    }

    @Override
    public void listActions(Flight.Empty request, StreamObserver<Flight.ActionType> responseObserver) {
        StreamPipe<ActionType, Flight.ActionType> listener = StreamPipe.wrap(responseObserver, ActionType::toProtocol, this::handleExceptionWithMiddleware);
        try {
            CallContext context = this.makeContext((ServerCallStreamObserver)responseObserver);
            this.producer.listActions(context, listener);
        }
        catch (Exception ex) {
            listener.onError(ex);
        }
    }

    public StreamObserver<ArrowMessage> doPutCustom(StreamObserver<Flight.PutResult> responseObserverSimple) {
        ServerCallStreamObserver responseObserver = (ServerCallStreamObserver)responseObserverSimple;
        responseObserver.disableAutoInboundFlowControl();
        responseObserver.request(1);
        StreamPipe<PutResult, Flight.PutResult> ackStream = StreamPipe.wrap(responseObserver, PutResult::toProtocol, this::handleExceptionWithMiddleware);
        FlightStream fs = new FlightStream(this.allocator, 5, null, responseObserver::request);
        ackStream.setAutoCloseable(fs);
        StreamObserver<ArrowMessage> observer = fs.asObserver();
        Future<?> unused = this.executors.submit(() -> {
            try {
                this.producer.acceptPut(this.makeContext(responseObserver), fs, ackStream).run();
            }
            catch (Throwable ex) {
                ackStream.onError(ex);
            }
            finally {
                ackStream.ensureCompleted();
            }
        });
        return observer;
    }

    @Override
    public void getFlightInfo(Flight.FlightDescriptor request, StreamObserver<Flight.FlightInfo> responseObserver) {
        FlightInfo info;
        try {
            info = this.producer.getFlightInfo(this.makeContext((ServerCallStreamObserver)responseObserver), new FlightDescriptor(request));
        }
        catch (Exception ex) {
            responseObserver.onError(StatusUtils.toGrpcException(ex));
            return;
        }
        responseObserver.onNext(info.toProtocol());
        responseObserver.onCompleted();
    }

    @Override
    public void pollFlightInfo(Flight.FlightDescriptor request, StreamObserver<Flight.PollInfo> responseObserver) {
        PollInfo info;
        try {
            info = this.producer.pollFlightInfo(this.makeContext((ServerCallStreamObserver)responseObserver), new FlightDescriptor(request));
        }
        catch (Exception ex) {
            responseObserver.onError(StatusUtils.toGrpcException(ex));
            return;
        }
        responseObserver.onNext(info.toProtocol());
        responseObserver.onCompleted();
    }

    private void handleExceptionWithMiddleware(Throwable t) {
        Map<FlightServerMiddleware.Key<?>, FlightServerMiddleware> middleware = ServerInterceptorAdapter.SERVER_MIDDLEWARE_KEY.get();
        if (middleware == null || middleware.isEmpty()) {
            logger.error("Uncaught exception in Flight method body", t);
            return;
        }
        middleware.forEach((k, v) -> v.onCallErrored(t));
    }

    @Override
    public void getSchema(Flight.FlightDescriptor request, StreamObserver<Flight.SchemaResult> responseObserver) {
        try {
            SchemaResult result = this.producer.getSchema(this.makeContext((ServerCallStreamObserver)responseObserver), new FlightDescriptor(request));
            responseObserver.onNext(result.toProtocol());
            responseObserver.onCompleted();
        }
        catch (Exception ex) {
            responseObserver.onError(StatusUtils.toGrpcException(ex));
        }
    }

    public StreamObserver<ArrowMessage> doExchangeCustom(StreamObserver<ArrowMessage> responseObserverSimple) {
        ServerCallStreamObserver responseObserver = (ServerCallStreamObserver)responseObserverSimple;
        ExchangeListener listener = new ExchangeListener(responseObserver, this::handleExceptionWithMiddleware);
        FlightStream fs = new FlightStream(this.allocator, 5, null, responseObserver::request);
        listener.resource = fs;
        responseObserver.disableAutoInboundFlowControl();
        responseObserver.request(1);
        StreamObserver<ArrowMessage> observer = fs.asObserver();
        try {
            Future<?> future = this.executors.submit(() -> {
                try {
                    this.producer.doExchange(this.makeContext(responseObserver), fs, listener);
                }
                catch (Exception ex) {
                    listener.error(ex);
                }
            });
        }
        catch (Exception ex) {
            listener.error(ex);
        }
        return observer;
    }

    static class CallContext
    implements FlightProducer.CallContext {
        private final String peerIdentity;
        private final BooleanSupplier isCancelled;

        CallContext(String peerIdentity, BooleanSupplier isCancelled) {
            this.peerIdentity = peerIdentity;
            this.isCancelled = isCancelled;
        }

        @Override
        public String peerIdentity() {
            return this.peerIdentity;
        }

        @Override
        public boolean isCancelled() {
            return this.isCancelled.getAsBoolean();
        }

        @Override
        public <T extends FlightServerMiddleware> T getMiddleware(FlightServerMiddleware.Key<T> key) {
            Map<FlightServerMiddleware.Key<?>, FlightServerMiddleware> middleware = ServerInterceptorAdapter.SERVER_MIDDLEWARE_KEY.get();
            if (middleware == null) {
                return null;
            }
            FlightServerMiddleware m = middleware.get(key);
            if (m == null) {
                return null;
            }
            FlightServerMiddleware result = m;
            return (T)result;
        }

        @Override
        public Map<FlightServerMiddleware.Key<?>, FlightServerMiddleware> getMiddleware() {
            Map<FlightServerMiddleware.Key<?>, FlightServerMiddleware> middleware = ServerInterceptorAdapter.SERVER_MIDDLEWARE_KEY.get();
            if (middleware == null) {
                return Collections.emptyMap();
            }
            return middleware;
        }
    }

    private static class GetListener
    extends OutboundStreamListenerImpl
    implements FlightProducer.ServerStreamListener {
        private final ServerCallStreamObserver<ArrowMessage> serverCallResponseObserver;
        private final Consumer<Throwable> errorHandler;
        private Runnable onCancelHandler = null;
        private Runnable onReadyHandler = null;
        private boolean completed;

        public GetListener(ServerCallStreamObserver<ArrowMessage> responseObserver, Consumer<Throwable> errorHandler) {
            super(null, responseObserver);
            this.errorHandler = errorHandler;
            this.completed = false;
            this.serverCallResponseObserver = responseObserver;
            this.serverCallResponseObserver.setOnCancelHandler(this::onCancel);
            this.serverCallResponseObserver.setOnReadyHandler(this::onReady);
            this.serverCallResponseObserver.disableAutoInboundFlowControl();
        }

        private void onCancel() {
            logger.debug("Stream cancelled by client.");
            if (this.onCancelHandler != null) {
                this.onCancelHandler.run();
            }
        }

        private void onReady() {
            if (this.onReadyHandler != null) {
                this.onReadyHandler.run();
            }
        }

        @Override
        public void setOnCancelHandler(Runnable handler) {
            this.onCancelHandler = handler;
        }

        @Override
        public void setOnReadyHandler(Runnable handler) {
            this.onReadyHandler = handler;
        }

        @Override
        public boolean isCancelled() {
            return this.serverCallResponseObserver.isCancelled();
        }

        @Override
        protected void waitUntilStreamReady() {
        }

        @Override
        public void error(Throwable ex) {
            if (!this.completed) {
                this.completed = true;
                super.error(ex);
            } else {
                this.errorHandler.accept(ex);
            }
        }

        @Override
        public void completed() {
            if (!this.completed) {
                this.completed = true;
                super.completed();
            } else {
                this.errorHandler.accept(new IllegalStateException("Tried to complete already-completed call"));
            }
        }
    }

    private static class ExchangeListener
    extends GetListener {
        private AutoCloseable resource = null;
        private boolean closed = false;
        private Runnable onCancelHandler = null;

        public ExchangeListener(ServerCallStreamObserver<ArrowMessage> responseObserver, Consumer<Throwable> errorHandler) {
            super(responseObserver, errorHandler);
            super.setOnCancelHandler(() -> {
                try {
                    if (this.onCancelHandler != null) {
                        this.onCancelHandler.run();
                    }
                }
                finally {
                    this.cleanup();
                }
            });
        }

        private void cleanup() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            try {
                AutoCloseables.close(this.resource);
            }
            catch (Exception e) {
                throw CallStatus.INTERNAL.withCause(e).withDescription("Server internal error cleaning up resources").toRuntimeException();
            }
        }

        @Override
        public void error(Throwable ex) {
            try {
                this.cleanup();
            }
            finally {
                super.error(ex);
            }
        }

        @Override
        public void completed() {
            try {
                this.cleanup();
            }
            finally {
                super.completed();
            }
        }

        @Override
        public void setOnCancelHandler(Runnable handler) {
            this.onCancelHandler = handler;
        }
    }
}

