/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.resteasy.reactive.server.handlers;

import io.smallrye.mutiny.Multi;
import java.util.function.BiFunction;
import javax.ws.rs.core.MediaType;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.core.SseUtil;
import org.jboss.resteasy.reactive.server.core.StreamingUtil;
import org.jboss.resteasy.reactive.server.jaxrs.OutboundSseEventImpl;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MultiResponseHandler
implements ServerRestHandler {
    private static final Logger log = Logger.getLogger(MultiResponseHandler.class);
    private static final ServerRestHandler[] AWOL = new ServerRestHandler[]{new ServerRestHandler(){

        @Override
        public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
            throw new IllegalStateException("FAILURE: should never be restarted");
        }
    }};

    @Override
    public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
        if (requestContext.getResult() instanceof Multi) {
            Multi result = (Multi)requestContext.getResult();
            MediaType[] mediaTypes = requestContext.getTarget().getProduces().getSortedMediaTypes();
            if (mediaTypes.length != 1) {
                throw new IllegalStateException("Negotiation or dynamic media type not supported yet for Multi: please use a single @Produces annotation");
            }
            requestContext.setResponseContentType(mediaTypes[0]);
            requestContext.setGenericReturnType(requestContext.getTarget().getReturnType());
            requestContext.suspend();
            if (requestContext.getResponseContentType().getMediaType().isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) {
                this.handleSse(requestContext, result);
            } else {
                this.handleStreaming(requestContext, result);
            }
        }
    }

    private void handleStreaming(ResteasyReactiveRequestContext requestContext, Multi<?> result) {
        result.subscribe().withSubscriber((Subscriber)new StreamingMultiSubscriber(requestContext));
    }

    private void handleSse(ResteasyReactiveRequestContext requestContext, Multi<?> result) {
        result.subscribe().withSubscriber((Subscriber)new SseMultiSubscriber(requestContext));
    }

    static abstract class AbstractMultiSubscriber
    implements Subscriber<Object> {
        protected Subscription subscription;
        protected ResteasyReactiveRequestContext requestContext;

        AbstractMultiSubscriber(ResteasyReactiveRequestContext requestContext) {
            this.requestContext = requestContext;
            requestContext.restart(AWOL, true);
        }

        public void onSubscribe(Subscription s) {
            this.subscription = s;
            s.request(1L);
        }

        public void onComplete() {
            this.requestContext.serverResponse().end();
            this.requestContext.serverRequest().closeConnection();
            this.requestContext.close();
        }

        public void onError(Throwable t) {
            this.handleException(this.requestContext, t);
        }

        protected void handleException(ResteasyReactiveRequestContext requestContext, Throwable t) {
            if (requestContext.serverResponse().headWritten()) {
                log.error((Object)"Exception in SSE server handling, impossible to send it to client", t);
            } else {
                requestContext.resume(t);
            }
        }
    }

    private static class StreamingMultiSubscriber
    extends AbstractMultiSubscriber {
        StreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext) {
            super(requestContext);
        }

        public void onNext(Object item) {
            StreamingUtil.send(this.requestContext, item).handle(new BiFunction<Object, Throwable, Object>(){

                @Override
                public Object apply(Object v, Throwable t) {
                    if (t != null) {
                        try {
                            subscription.cancel();
                        }
                        catch (Throwable t2) {
                            t2.printStackTrace();
                        }
                        this.handleException(requestContext, t);
                    } else {
                        subscription.request(1L);
                    }
                    return null;
                }
            });
        }
    }

    private static class SseMultiSubscriber
    extends AbstractMultiSubscriber {
        SseMultiSubscriber(ResteasyReactiveRequestContext requestContext) {
            super(requestContext);
        }

        public void onNext(Object item) {
            OutboundSseEventImpl event = new OutboundSseEventImpl.BuilderImpl().data(item).build();
            SseUtil.send(this.requestContext, event).handle(new BiFunction<Object, Throwable, Object>(){

                @Override
                public Object apply(Object v, Throwable t) {
                    if (t != null) {
                        subscription.cancel();
                        this.handleException(requestContext, t);
                    } else {
                        subscription.request(1L);
                    }
                    return null;
                }
            });
        }
    }
}

