/*
 * Decompiled with CFR 0.152.
 */
package darabonba.core.sse;

import darabonba.core.TeaResponseHandler;
import darabonba.core.async.AsyncResponseHandler;
import darabonba.core.sse.Event;
import darabonba.core.sse.SSEResponseIterator;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class SSEResponseHandler
implements AsyncResponseHandler<Object, Object> {
    protected volatile AsyncResponseHandler<?, ?> handler;
    private static final Pattern pattern = Pattern.compile("\\r\\n\\r\\n|\\r\\r|\\n\\n");
    private volatile SSEResponseIterator<?> iterator;

    public SSEResponseHandler() {
    }

    public SSEResponseHandler(TeaResponseHandler handler, SSEResponseIterator<?> iterator) {
        this.handler = (AsyncResponseHandler)handler;
        this.iterator = iterator;
    }

    @Override
    public void onStream(Publisher<ByteBuffer> publisher) {
        SSEProcessor proc = new SSEProcessor(this.iterator);
        this.handler.onStream((Publisher<ByteBuffer>)proc);
        publisher.subscribe((Subscriber)proc);
    }

    @Override
    public void onError(Throwable throwable) {
        this.iterator.endOfFailure(throwable);
    }

    @Override
    public Object transform(Object response) {
        return null;
    }

    static class SSEProcessor
    implements Processor<ByteBuffer, ByteBuffer> {
        protected volatile Subscriber<? super ByteBuffer> subscriber;
        private volatile StringBuilder stringBuilder = new StringBuilder();
        private volatile SSEResponseIterator<?> iterator;

        SSEProcessor(SSEResponseIterator<?> iterator) {
            this.iterator = iterator;
        }

        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriber.onSubscribe(subscription);
        }

        public void onNext(ByteBuffer byteBuffer) {
            try {
                if (byteBuffer.hasRemaining()) {
                    String data = new String(byteBuffer.array(), StandardCharsets.UTF_8);
                    Matcher m = pattern.matcher(data);
                    if (m.find()) {
                        this.iterator.addEvent(Event.parse(this.stringBuilder.append(data).toString()));
                        this.stringBuilder = new StringBuilder();
                    } else {
                        this.stringBuilder.append(data);
                    }
                }
                this.subscriber.onNext((Object)byteBuffer);
            }
            catch (Exception e) {
                this.subscriber.onError((Throwable)e);
            }
        }

        public void onError(Throwable throwable) {
            this.iterator.endOfFailure(throwable);
            this.subscriber.onError(throwable);
        }

        public void onComplete() {
            this.iterator.endOfEvent();
            this.subscriber.onComplete();
        }
    }
}

