/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.postgresql.ConnectionResources;
import io.r2dbc.postgresql.ExceptionFactory;
import io.r2dbc.postgresql.StatementCache;
import io.r2dbc.postgresql.api.ErrorDetails;
import io.r2dbc.postgresql.client.Binding;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
import io.r2dbc.postgresql.client.QueryLogger;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.BindComplete;
import io.r2dbc.postgresql.message.backend.CloseComplete;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.NoData;
import io.r2dbc.postgresql.message.backend.ParseComplete;
import io.r2dbc.postgresql.message.backend.PortalSuspended;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.Bind;
import io.r2dbc.postgresql.message.frontend.Close;
import io.r2dbc.postgresql.message.frontend.CompositeFrontendMessage;
import io.r2dbc.postgresql.message.frontend.Describe;
import io.r2dbc.postgresql.message.frontend.Execute;
import io.r2dbc.postgresql.message.frontend.ExecutionType;
import io.r2dbc.postgresql.message.frontend.Flush;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Parse;
import io.r2dbc.postgresql.message.frontend.Sync;
import io.r2dbc.postgresql.util.Operators;
import io.r2dbc.postgresql.util.PredicateUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.SynchronousSink;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

class ExtendedFlowDelegate {
    static final Predicate<BackendMessage> RESULT_FRAME_FILTER;

    ExtendedFlowDelegate() {
    }

    public static Flux<BackendMessage> runQuery(ConnectionResources resources, ExceptionFactory factory, String query, Binding binding, List<ByteBuf> values, int fetchSize) {
        StatementCache cache = resources.getStatementCache();
        Client client = resources.getClient();
        String portal = resources.getPortalNameSupplier().get();
        boolean compatibilityMode = resources.getConfiguration().isCompatibilityMode();
        boolean implicitTransactions = resources.getClient().getTransactionStatus() == TransactionStatus.IDLE;
        ExtendedFlowOperator operator = new ExtendedFlowOperator(query, binding, cache, values, portal, resources.getConfiguration().isForceBinary());
        Flux<BackendMessage> exchange = compatibilityMode ? (fetchSize == 0 || implicitTransactions ? ExtendedFlowDelegate.fetchAll(operator, client, portal) : ExtendedFlowDelegate.fetchCursoredWithSync(operator, client, portal, fetchSize)) : (fetchSize == 0 ? ExtendedFlowDelegate.fetchAll(operator, client, portal) : ExtendedFlowDelegate.fetchCursoredWithFlush(operator, client, portal, fetchSize));
        exchange = exchange.doOnNext(message -> {
            if (message == ParseComplete.INSTANCE) {
                operator.hydrateStatementCache();
            }
        });
        return exchange.doOnSubscribe(it -> QueryLogger.logQuery(client.getContext(), query)).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release).filter(RESULT_FRAME_FILTER).handle(factory::handleErrorResponse);
    }

    private static Flux<BackendMessage> fetchAll(ExtendedFlowOperator operator, Client client, String portal) {
        UnicastProcessor requestsProcessor = UnicastProcessor.create((Queue)((Queue)Queues.small().get()));
        FluxSink requestsSink = requestsProcessor.sink();
        MessageFactory factory = () -> operator.getMessages(Arrays.asList(new Execute(portal, 0), new Close(portal, ExecutionType.PORTAL), Sync.INSTANCE));
        return (Flux)client.exchange(operator.takeUntil(), (Publisher<FrontendMessage>)Flux.just((Object)new CompositeFrontendMessage(factory.createMessages())).concatWith((Publisher)requestsProcessor)).handle(ExtendedFlowDelegate.handleReprepare((FluxSink<FrontendMessage>)requestsSink, operator, factory)).doFinally(ignore -> operator.close((FluxSink<FrontendMessage>)requestsSink)).as(Operators::discardOnCancel);
    }

    private static Flux<BackendMessage> fetchCursoredWithSync(ExtendedFlowOperator operator, Client client, String portal, int fetchSize) {
        UnicastProcessor requestsProcessor = UnicastProcessor.create((Queue)((Queue)Queues.small().get()));
        FluxSink requestsSink = requestsProcessor.sink();
        AtomicBoolean isCanceled = new AtomicBoolean(false);
        AtomicBoolean done = new AtomicBoolean(false);
        MessageFactory factory = () -> operator.getMessages(Arrays.asList(new Execute(portal, fetchSize), Sync.INSTANCE));
        Predicate<BackendMessage> takeUntil = operator.takeUntil();
        return (Flux)client.exchange(it -> done.get() && takeUntil.test((BackendMessage)it), (Publisher<FrontendMessage>)Flux.just((Object)new CompositeFrontendMessage(factory.createMessages())).concatWith((Publisher)requestsProcessor)).handle(ExtendedFlowDelegate.handleReprepare((FluxSink<FrontendMessage>)requestsSink, operator, factory)).handle((message, sink) -> {
            if (message instanceof CommandComplete) {
                requestsSink.next((Object)new Close(portal, ExecutionType.PORTAL));
                requestsSink.next((Object)Sync.INSTANCE);
                requestsSink.complete();
                sink.next(message);
            } else if (message instanceof CloseComplete) {
                requestsSink.complete();
                done.set(true);
                sink.next(message);
            } else if (message instanceof ErrorResponse) {
                done.set(true);
                requestsSink.next((Object)Sync.INSTANCE);
                requestsSink.complete();
                sink.next(message);
            } else if (message instanceof PortalSuspended) {
                if (isCanceled.get()) {
                    requestsSink.next((Object)new Close(portal, ExecutionType.PORTAL));
                    requestsSink.next((Object)Sync.INSTANCE);
                    requestsSink.complete();
                } else {
                    requestsSink.next((Object)new Execute(portal, fetchSize));
                    requestsSink.next((Object)Sync.INSTANCE);
                }
            } else if (message instanceof NoData) {
                if (isCanceled.get()) {
                    requestsSink.next((Object)new Close(portal, ExecutionType.PORTAL));
                    requestsSink.next((Object)Sync.INSTANCE);
                    requestsSink.complete();
                } else {
                    done.set(true);
                }
            } else {
                sink.next(message);
            }
        }).doFinally(ignore -> operator.close((FluxSink<FrontendMessage>)requestsSink)).as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
    }

    private static Flux<BackendMessage> fetchCursoredWithFlush(ExtendedFlowOperator operator, Client client, String portal, int fetchSize) {
        UnicastProcessor requestsProcessor = UnicastProcessor.create((Queue)((Queue)Queues.small().get()));
        FluxSink requestsSink = requestsProcessor.sink();
        AtomicBoolean isCanceled = new AtomicBoolean(false);
        MessageFactory factory = () -> operator.getMessages(Arrays.asList(new Execute(portal, fetchSize), Flush.INSTANCE));
        return (Flux)client.exchange(operator.takeUntil(), (Publisher<FrontendMessage>)Flux.just((Object)new CompositeFrontendMessage(factory.createMessages())).concatWith((Publisher)requestsProcessor)).handle(ExtendedFlowDelegate.handleReprepare((FluxSink<FrontendMessage>)requestsSink, operator, factory)).handle((message, sink) -> {
            if (message instanceof CommandComplete) {
                requestsSink.next((Object)new Close(portal, ExecutionType.PORTAL));
                requestsSink.next((Object)Sync.INSTANCE);
                requestsSink.complete();
                sink.next(message);
            } else if (message instanceof ErrorResponse) {
                requestsSink.next((Object)Sync.INSTANCE);
                requestsSink.complete();
                sink.next(message);
            } else if (message instanceof PortalSuspended) {
                if (isCanceled.get()) {
                    requestsSink.next((Object)new Close(portal, ExecutionType.PORTAL));
                    requestsSink.next((Object)Sync.INSTANCE);
                    requestsSink.complete();
                } else {
                    requestsSink.next((Object)new Execute(portal, fetchSize));
                    requestsSink.next((Object)Flush.INSTANCE);
                }
            } else {
                sink.next(message);
            }
        }).doFinally(ignore -> operator.close((FluxSink<FrontendMessage>)requestsSink)).as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
    }

    private static BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleReprepare(FluxSink<FrontendMessage> requests, ExtendedFlowOperator operator, MessageFactory messageFactory) {
        AtomicBoolean reprepared = new AtomicBoolean();
        return (message, sink) -> {
            if (message instanceof ErrorResponse && ExtendedFlowDelegate.requiresReprepare((ErrorResponse)message)) {
                operator.evictCachedStatement();
                if (reprepared.compareAndSet(false, true)) {
                    List<FrontendMessage.DirectEncoder> messages = messageFactory.createMessages();
                    if (!messages.contains(Sync.INSTANCE)) {
                        messages.add(0, Sync.INSTANCE);
                    }
                    requests.next((Object)new CompositeFrontendMessage(messages));
                    return;
                }
            }
            sink.next(message);
        };
    }

    private static boolean requiresReprepare(ErrorResponse errorResponse) {
        ErrorDetails details = new ErrorDetails(errorResponse.getFields());
        String code = details.getCode();
        if ("26000".equals(code)) {
            return true;
        }
        if (!"0A000".equals(code)) {
            return false;
        }
        String routine = details.getRoutine().orElse(null);
        return "RevalidateCachedQuery".equals(routine) || "RevalidateCachedPlan".equals(routine);
    }

    static {
        Predicate[] predicateArray = new Predicate[2];
        predicateArray[0] = BindComplete.class::isInstance;
        predicateArray[1] = NoData.class::isInstance;
        RESULT_FRAME_FILTER = PredicateUtils.not(PredicateUtils.or(predicateArray));
    }

    static class ExtendedFlowOperator
    extends AtomicInteger
    implements Predicate<BackendMessage> {
        private final String sql;
        private final Binding binding;
        @Nullable
        private volatile String name;
        private final StatementCache cache;
        private final List<ByteBuf> values;
        private final String portal;
        private final boolean forceBinary;

        public ExtendedFlowOperator(String sql, Binding binding, StatementCache cache, List<ByteBuf> values, String portal, boolean forceBinary) {
            this.sql = sql;
            this.binding = binding;
            this.cache = cache;
            this.values = values;
            this.portal = portal;
            this.forceBinary = forceBinary;
        }

        public void close(FluxSink<FrontendMessage> requests) {
            requests.complete();
            this.values.forEach(ReferenceCountUtil::release);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void evictCachedStatement() {
            ExtendedFlowOperator extendedFlowOperator = this;
            synchronized (extendedFlowOperator) {
                this.name = null;
            }
            this.cache.evict(this.sql);
        }

        public void hydrateStatementCache() {
            this.cache.put(this.binding, this.sql, this.getStatementName());
        }

        public Predicate<BackendMessage> takeUntil() {
            return this;
        }

        @Override
        public boolean test(BackendMessage backendMessage) {
            if (backendMessage instanceof ReadyForQuery) {
                return this.decrementAndGet() <= 0;
            }
            return false;
        }

        private boolean isPrepareRequired() {
            return this.cache.requiresPrepare(this.binding, this.sql);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public String getStatementName() {
            ExtendedFlowOperator extendedFlowOperator = this;
            synchronized (extendedFlowOperator) {
                if (this.name == null) {
                    this.name = this.cache.getName(this.binding, this.sql);
                }
                return this.name;
            }
        }

        public List<FrontendMessage.DirectEncoder> getMessages(Collection<FrontendMessage.DirectEncoder> append) {
            this.incrementAndGet();
            ArrayList<FrontendMessage.DirectEncoder> messagesToSend = new ArrayList<FrontendMessage.DirectEncoder>(6);
            if (this.isPrepareRequired()) {
                messagesToSend.add(new Parse(this.getStatementName(), this.binding.getParameterTypes(), this.sql));
            }
            for (ByteBuf value : this.values) {
                value.readerIndex(0);
                value.touch((Object)"ExtendedFlowOperator").retain();
            }
            Bind bind = new Bind(this.portal, this.binding.getParameterFormats(), this.values, ExtendedQueryMessageFlow.resultFormat(this.forceBinary), this.getStatementName());
            messagesToSend.add(bind);
            messagesToSend.add(new Describe(this.portal, ExecutionType.PORTAL));
            messagesToSend.addAll(append);
            return messagesToSend;
        }
    }

    static interface MessageFactory {
        public List<FrontendMessage.DirectEncoder> createMessages();
    }
}

