package org.springframework.shell.component.view.event;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.shell.component.message.ShellMessageHeaderAccessor;
import org.springframework.shell.component.message.StaticShellMessageHeaderAccessor;
import org.springframework.shell.component.view.control.View;
import org.springframework.shell.component.view.control.ViewEvent;
import org.springframework.shell.component.view.event.EventLoop;
import org.springframework.shell.component.view.event.processor.AnimationEventLoopProcessor;
import org.springframework.shell.component.view.event.processor.TaskEventLoopProcessor;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/springframework/shell/component/view/event/DefaultEventLoop.class */
public class DefaultEventLoop implements EventLoop {
    private static final Logger log = LoggerFactory.getLogger(DefaultEventLoop.class);
    private final Queue<Message<?>> messageQueue;
    private final Sinks.Many<Message<?>> many;
    private Flux<Message<?>> sink;
    private final Disposable.Composite disposables;
    private final Scheduler scheduler;
    private volatile boolean active;
    private final List<EventLoop.EventLoopProcessor> processors;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.shell.component.view.event.DefaultEventLoop$1, reason: invalid class name */
    /* loaded from: input_file:org/springframework/shell/component/view/event/DefaultEventLoop$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$reactor$core$publisher$Sinks$EmitResult = new int[Sinks.EmitResult.values().length];

        static {
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.OK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_NON_SERIALIZED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_OVERFLOW.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_TERMINATED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_CANCELLED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* loaded from: input_file:org/springframework/shell/component/view/event/DefaultEventLoop$MessageComparator.class */
    private static class MessageComparator implements Comparator<Message<?>> {
        private MessageComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Message<?> message, Message<?> message2) {
            Integer priority = StaticShellMessageHeaderAccessor.getPriority(message);
            Integer priority2 = StaticShellMessageHeaderAccessor.getPriority(message2);
            if (priority == null || priority2 == null) {
                return 0;
            }
            return priority.compareTo(priority2);
        }

        static Comparator<Message<?>> comparingPriority() {
            return new MessageComparator();
        }
    }

    public DefaultEventLoop() {
        this(null);
    }

    public DefaultEventLoop(List<EventLoop.EventLoopProcessor> list) {
        this.messageQueue = new PriorityQueue(MessageComparator.comparingPriority());
        this.many = Sinks.many().unicast().onBackpressureBuffer(this.messageQueue);
        this.disposables = Disposables.composite();
        this.scheduler = Schedulers.boundedElastic();
        this.active = true;
        this.processors = new ArrayList();
        if (list != null) {
            this.processors.addAll(list);
        }
        this.processors.add(new AnimationEventLoopProcessor());
        this.processors.add(new TaskEventLoopProcessor());
        init();
    }

    private void init() {
        this.sink = this.many.asFlux().flatMap(message -> {
            Flux<? extends Message<?>> flux = null;
            Iterator<EventLoop.EventLoopProcessor> it = this.processors.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                EventLoop.EventLoopProcessor next = it.next();
                if (next.canProcess(message)) {
                    flux = next.process(message);
                    break;
                }
            }
            return flux != null ? flux : Mono.just(message);
        }).share();
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public void dispatch(Message<?> message) {
        log.debug("dispatch {}", message);
        if (doSend(message, 1000L)) {
            return;
        }
        log.warn("Failed to send message: {}", message);
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public void dispatch(Publisher<? extends Message<?>> publisher) {
        subscribeTo(publisher);
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public Flux<Message<?>> events() {
        return this.sink;
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public <T> Flux<T> events(EventLoop.Type type, Class<T> cls) {
        return events().filter(message -> {
            return type.equals(StaticShellMessageHeaderAccessor.getEventType(message));
        }).map(message2 -> {
            return message2.getPayload();
        }).ofType(cls);
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public <T> Flux<T> events(EventLoop.Type type, ParameterizedTypeReference<T> parameterizedTypeReference) {
        return events().filter(message -> {
            return type.equals(StaticShellMessageHeaderAccessor.getEventType(message));
        }).map(message2 -> {
            return message2.getPayload();
        }).ofType(ResolvableType.forType(parameterizedTypeReference).getRawClass());
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public Flux<KeyEvent> keyEvents() {
        return events().filter(message -> {
            return EventLoop.Type.KEY.equals(StaticShellMessageHeaderAccessor.getEventType(message));
        }).map(message2 -> {
            return message2.getPayload();
        }).ofType(KeyEvent.class);
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public Flux<MouseEvent> mouseEvents() {
        return events().filter(message -> {
            return EventLoop.Type.MOUSE.equals(StaticShellMessageHeaderAccessor.getEventType(message));
        }).map(message2 -> {
            return message2.getPayload();
        }).ofType(MouseEvent.class);
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public Flux<String> systemEvents() {
        return events().filter(message -> {
            return EventLoop.Type.SYSTEM.equals(StaticShellMessageHeaderAccessor.getEventType(message));
        }).map(message2 -> {
            return message2.getPayload();
        }).ofType(String.class);
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public Flux<String> signalEvents() {
        return events().filter(message -> {
            return EventLoop.Type.SIGNAL.equals(StaticShellMessageHeaderAccessor.getEventType(message));
        }).map(message2 -> {
            return message2.getPayload();
        }).ofType(String.class);
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public <T extends ViewEvent> Flux<T> viewEvents(Class<T> cls) {
        return events(EventLoop.Type.VIEW, cls);
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public <T extends ViewEvent> Flux<T> viewEvents(ParameterizedTypeReference<T> parameterizedTypeReference) {
        return events(EventLoop.Type.VIEW, parameterizedTypeReference);
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public <T extends ViewEvent> Flux<T> viewEvents(Class<T> cls, View view) {
        return events(EventLoop.Type.VIEW, cls).filter(viewEvent -> {
            return viewEvent.view() == view;
        });
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public <T extends ViewEvent> Flux<T> viewEvents(ParameterizedTypeReference<T> parameterizedTypeReference, View view) {
        return events(EventLoop.Type.VIEW, parameterizedTypeReference).filter(viewEvent -> {
            return viewEvent.view() == view;
        });
    }

    @Override // org.springframework.shell.component.view.event.EventLoop
    public void onDestroy(Disposable disposable) {
        this.disposables.add(disposable);
    }

    private boolean doSend(Message<?> message, long j) {
        if (!this.active || this.many.currentSubscriberCount() == 0) {
            return false;
        }
        long j2 = 0;
        if (j > 0) {
            j2 = j;
        }
        long nanos = TimeUnit.MILLISECONDS.toNanos(10L);
        while (this.active && !tryEmitMessage(message)) {
            j2 -= 10;
            if (j >= 0 && j2 <= 0) {
                return false;
            }
            LockSupport.parkNanos(nanos);
        }
        return true;
    }

    private boolean tryEmitMessage(Message<?> message) {
        switch (AnonymousClass1.$SwitchMap$reactor$core$publisher$Sinks$EmitResult[this.many.tryEmitNext(message).ordinal()]) {
            case 1:
                return true;
            case 2:
            case 3:
                return false;
            case 4:
                throw new IllegalStateException("The [" + this + "] doesn't have subscribers to accept messages");
            case 5:
            case 6:
                throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink: " + this.many);
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    public void subscribeTo(Publisher<? extends Message<?>> publisher) {
        this.disposables.add(Flux.from(publisher).publishOn(this.scheduler).flatMap(message -> {
            return Mono.just(message).handle((message, synchronousSink) -> {
                sendReactiveMessage(message);
            }).contextWrite(StaticShellMessageHeaderAccessor.getReactorContext(message));
        }).contextCapture().subscribe());
    }

    private void sendReactiveMessage(Message<?> message) {
        Message<?> message2 = message;
        if (message2.getHeaders().containsKey(ShellMessageHeaderAccessor.REACTOR_CONTEXT)) {
            message2 = MessageBuilder.fromMessage(message).removeHeader(ShellMessageHeaderAccessor.REACTOR_CONTEXT).build();
        }
        try {
            dispatch(message2);
        } catch (Exception e) {
            log.warn("Error during processing event: {}", message2);
        }
    }

    public void destroy() {
        this.active = false;
        this.disposables.dispose();
        this.many.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        this.scheduler.dispose();
    }
}
