package darabonba.core.sse;

import darabonba.core.TeaResponseHandler;
import darabonba.core.async.AsyncResponseHandler;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:darabonba/core/sse/SSEResponseHandler.class */
public class SSEResponseHandler implements AsyncResponseHandler<Object, Object> {
    protected volatile AsyncResponseHandler<?, ?> handler;
    private static final Pattern pattern = Pattern.compile("\\r\\n\\r\\n|\\r\\r|\\n\\n");
    private volatile SSEResponseIterator<?> iterator;

    /* loaded from: input_file:darabonba/core/sse/SSEResponseHandler$SSEProcessor.class */
    static class SSEProcessor implements Processor<ByteBuffer, ByteBuffer> {
        protected volatile Subscriber<? super ByteBuffer> subscriber;
        private volatile StringBuilder stringBuilder = new StringBuilder();
        private volatile SSEResponseIterator<?> iterator;

        SSEProcessor(SSEResponseIterator<?> sSEResponseIterator) {
            this.iterator = sSEResponseIterator;
        }

        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriber.onSubscribe(subscription);
        }

        public void onNext(ByteBuffer byteBuffer) {
            try {
                if (byteBuffer.hasRemaining()) {
                    String str = new String(byteBuffer.array(), StandardCharsets.UTF_8);
                    if (SSEResponseHandler.pattern.matcher(str).find()) {
                        this.iterator.addEvent(Event.parse(this.stringBuilder.append(str).toString()));
                        this.stringBuilder = new StringBuilder();
                    } else {
                        this.stringBuilder.append(str);
                    }
                }
                this.subscriber.onNext(byteBuffer);
            } catch (Exception e) {
                this.subscriber.onError(e);
            }
        }

        public void onError(Throwable th) {
            this.iterator.endOfFailure(th);
            this.subscriber.onError(th);
        }

        public void onComplete() {
            this.iterator.endOfEvent();
            this.subscriber.onComplete();
        }
    }

    public SSEResponseHandler() {
    }

    public SSEResponseHandler(TeaResponseHandler teaResponseHandler, SSEResponseIterator<?> sSEResponseIterator) {
        this.handler = (AsyncResponseHandler) teaResponseHandler;
        this.iterator = sSEResponseIterator;
    }

    @Override // darabonba.core.async.AsyncResponseHandler
    public void onStream(Publisher<ByteBuffer> publisher) {
        Publisher<ByteBuffer> sSEProcessor = new SSEProcessor(this.iterator);
        this.handler.onStream(sSEProcessor);
        publisher.subscribe(sSEProcessor);
    }

    @Override // darabonba.core.async.AsyncResponseHandler
    public void onError(Throwable th) {
        this.iterator.endOfFailure(th);
    }

    @Override // darabonba.core.async.AsyncResponseHandler
    public Object transform(Object obj) {
        return null;
    }
}
