/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.statemachine.support;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineException;
import org.springframework.statemachine.state.JoinPseudoState;
import org.springframework.statemachine.state.PseudoStateKind;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.support.DefaultStateContext;
import org.springframework.statemachine.support.LifecycleObjectSupport;
import org.springframework.statemachine.support.StateMachineExecutor;
import org.springframework.statemachine.support.StateMachineInterceptor;
import org.springframework.statemachine.support.StateMachineInterceptorList;
import org.springframework.statemachine.support.StateMachineReactiveLifecycle;
import org.springframework.statemachine.support.StateMachineUtils;
import org.springframework.statemachine.support.TransitionComparator;
import org.springframework.statemachine.transition.AbstractTransition;
import org.springframework.statemachine.transition.Transition;
import org.springframework.statemachine.transition.TransitionConflictPolicy;
import org.springframework.statemachine.trigger.DefaultTriggerContext;
import org.springframework.statemachine.trigger.TimerTrigger;
import org.springframework.statemachine.trigger.Trigger;
import org.springframework.statemachine.trigger.TriggerListener;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;

public class ReactiveStateMachineExecutor<S, E>
extends LifecycleObjectSupport
implements StateMachineExecutor<S, E> {
    private static final Log log = LogFactory.getLog(ReactiveStateMachineExecutor.class);
    private static final String REACTOR_CONTEXT_TRIGGER_ERRORS = "stateMachineTriggerErrors";
    private final StateMachine<S, E> stateMachine;
    private final StateMachine<S, E> relayStateMachine;
    private final Map<Trigger<S, E>, Transition<S, E>> triggerToTransitionMap;
    private final List<Transition<S, E>> triggerlessTransitions;
    private final Collection<Transition<S, E>> transitions;
    private final Transition<S, E> initialTransition;
    private final Message<E> initialEvent;
    private final TransitionComparator<S, E> transitionComparator;
    private final TransitionConflictPolicy transitionConflictPolicy;
    private final Queue<Message<E>> deferList = new ConcurrentLinkedQueue<Message<E>>();
    private final AtomicBoolean initialHandled = new AtomicBoolean(false);
    private final StateMachineInterceptorList<S, E> interceptors = new StateMachineInterceptorList();
    private volatile Message<E> forwardedInitialEvent;
    private volatile Message<E> queuedMessage = null;
    private StateMachineExecutor.StateMachineExecutorTransit<S, E> stateMachineExecutorTransit;
    private Sinks.Many<TriggerQueueItem> triggerSink;
    private Flux<Void> triggerFlux;
    private Disposable triggerDisposable;
    private final Set<Transition<S, E>> joinSyncTransitions = new HashSet<Transition<S, E>>();
    private final Set<State<S, E>> joinSyncStates = new HashSet<State<S, E>>();

    public ReactiveStateMachineExecutor(StateMachine<S, E> stateMachine, StateMachine<S, E> relayStateMachine, Collection<Transition<S, E>> transitions, Map<Trigger<S, E>, Transition<S, E>> triggerToTransitionMap, List<Transition<S, E>> triggerlessTransitions, Transition<S, E> initialTransition, Message<E> initialEvent, TransitionConflictPolicy transitionConflictPolicy) {
        this.stateMachine = stateMachine;
        this.relayStateMachine = relayStateMachine;
        this.triggerToTransitionMap = triggerToTransitionMap;
        this.triggerlessTransitions = triggerlessTransitions;
        this.transitions = transitions;
        this.initialTransition = initialTransition;
        this.initialEvent = initialEvent;
        this.transitionComparator = new TransitionComparator(transitionConflictPolicy);
        this.transitionConflictPolicy = transitionConflictPolicy;
        this.triggerlessTransitions.sort(this.transitionComparator);
        this.registerTriggerListener();
    }

    @Override
    protected void onInit() throws Exception {
        this.triggerSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.triggerFlux = this.triggerSink.asFlux().flatMap(trigger -> this.handleTrigger((TriggerQueueItem)trigger), 1);
    }

    @Override
    protected Mono<Void> doPreStartReactively() {
        return Mono.defer(() -> {
            Mono mono = this.startTriggers();
            if (this.triggerDisposable == null) {
                this.triggerDisposable = this.triggerFlux.subscribe();
            }
            if (!this.initialHandled.getAndSet(true)) {
                ArrayList<Transition<S, E>> trans = new ArrayList<Transition<S, E>>();
                trans.add(this.initialTransition);
                mono = this.initialEvent != null ? mono.then(this.handleInitialTrans(this.initialTransition, this.initialEvent)) : mono.then(this.handleInitialTrans(this.initialTransition, this.forwardedInitialEvent));
            }
            mono = mono.then(this.handleTriggerlessTransitions(null, null));
            return mono;
        });
    }

    @Override
    protected Mono<Void> doPreStopReactively() {
        Mono mono = Mono.fromRunnable(() -> {
            if (this.triggerDisposable != null) {
                this.triggerDisposable.dispose();
                this.triggerDisposable = null;
            }
            this.initialHandled.set(false);
        });
        return this.stopTriggers().and((Publisher)mono);
    }

    @Override
    public void queueDeferredEvent(Message<E> message) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Deferring message " + message));
        }
        this.deferList.add(message);
    }

    @Override
    public Mono<Void> executeTriggerlessTransitions(StateContext<S, E> context, State<S, E> state) {
        if (this.stateMachine.getState() != null) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"About to handleTriggerlessTransitions");
            }
            return this.handleTriggerlessTransitions(context, state);
        }
        return Mono.empty();
    }

    @Override
    public void setInitialEnabled(boolean enabled) {
        this.initialHandled.set(!enabled);
    }

    @Override
    public void setForwardedInitialEvent(Message<E> message) {
        this.forwardedInitialEvent = message;
    }

    @Override
    public void setStateMachineExecutorTransit(StateMachineExecutor.StateMachineExecutorTransit<S, E> stateMachineExecutorTransit) {
        this.stateMachineExecutorTransit = stateMachineExecutorTransit;
    }

    @Override
    public void addStateMachineInterceptor(StateMachineInterceptor<S, E> interceptor) {
        this.interceptors.add(interceptor);
    }

    @Override
    public Mono<Void> queueEvent(Mono<Message<E>> message, StateMachineExecutor.StateMachineExecutorCallback callback) {
        Flux messages = Flux.merge((Publisher[])new Publisher[]{message, Flux.fromIterable(this.deferList)});
        StateMachineExecutor.MonoSinkStateMachineExecutorCallback triggerCallback = new StateMachineExecutor.MonoSinkStateMachineExecutorCallback();
        Mono triggerCallbackSink = Mono.create((Consumer)triggerCallback);
        return messages.flatMap(m -> this.handleEvent((Message<E>)m, callback, triggerCallback)).flatMap(tqi -> Mono.fromRunnable(() -> this.triggerSink.emitNext(tqi, Sinks.EmitFailureHandler.FAIL_FAST)).retryWhen((Retry)Retry.fixedDelay((long)10L, (Duration)Duration.ofMillis(10L)))).then().and((Publisher)triggerCallbackSink);
    }

    private Mono<TriggerQueueItem> handleEvent(Message<E> queuedEvent, StateMachineExecutor.StateMachineExecutorCallback callback, StateMachineExecutor.StateMachineExecutorCallback triggerCallback) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Handling message " + queuedEvent));
        }
        return Mono.defer(() -> {
            State currentState = this.stateMachine.getState();
            if (currentState != null && currentState.shouldDefer(queuedEvent)) {
                log.info((Object)("Current state " + currentState + " deferred event " + queuedEvent));
                return Mono.just((Object)new TriggerQueueItem(null, queuedEvent, callback, triggerCallback));
            }
            DefaultTriggerContext triggerContext = new DefaultTriggerContext(queuedEvent.getPayload());
            return Flux.fromIterable(this.transitions).filter(transition -> transition.getTrigger() != null).filter(transition -> StateMachineUtils.containsAtleastOne(transition.getSource().getIds(), currentState.getIds())).flatMap(transition -> Mono.from(transition.getTrigger().evaluate(triggerContext)).flatMap(e -> {
                if (e.booleanValue()) {
                    return Mono.just(transition.getTrigger());
                }
                return Mono.empty();
            })).next().doOnNext(trigger -> this.deferList.remove(queuedEvent)).map(trigger -> new TriggerQueueItem(trigger, queuedEvent, callback, triggerCallback));
        });
    }

    private Mono<Void> handleTrigger(TriggerQueueItem queueItem) {
        return Mono.defer(() -> {
            Mono ret = null;
            State currentState = this.stateMachine.getState();
            if (queueItem != null && currentState != null) {
                Transition<S, E> t;
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Process trigger item " + queueItem + " " + this));
                }
                this.queuedMessage = queueItem.message;
                Object event = this.queuedMessage != null ? this.queuedMessage.getPayload() : null;
                ArrayList<Transition<S, Transition<S, E>>> trans = new ArrayList<Transition<S, Transition<S, E>>>();
                if (event != null) {
                    ArrayList ids = new ArrayList(currentState.getIds());
                    Collections.reverse(ids);
                    for (Object id : ids) {
                        for (Map.Entry<Trigger<S, E>, Transition<S, E>> e : this.triggerToTransitionMap.entrySet()) {
                            Trigger<S, E> tri = e.getKey();
                            E ee = tri.getEvent();
                            Transition<S, E> tra = e.getValue();
                            if (!event.equals(ee) || !tra.getSource().getId().equals(id) || trans.contains(tra)) continue;
                            trans.add(tra);
                        }
                    }
                }
                if (trans.isEmpty() && (t = this.triggerToTransitionMap.get(queueItem.trigger)) != null) {
                    trans.add(t);
                }
                trans.sort(this.transitionComparator);
                ret = this.handleTriggerTrans(trans, this.queuedMessage).then();
            }
            ArrayList<Transition<S, E>> transWithGuards = new ArrayList<Transition<S, E>>();
            for (Transition<S, E> t : this.triggerlessTransitions) {
                if (((AbstractTransition)t).getGuard() == null) continue;
                transWithGuards.add(t);
            }
            if (ret == null) {
                ret = Mono.empty();
            }
            return ret;
        }).onErrorResume(ReactiveStateMachineExecutor.resumeTriggerErrorToContext()).and((Publisher)Mono.deferContextual(Mono::just).doOnNext(ctx -> {
            Optional holder;
            if (queueItem.callback != null) {
                holder = ctx.getOrEmpty((Object)"stateMachineErrors");
                holder.ifPresent(h -> {
                    if (h.getError() != null) {
                        queueItem.callback.error((Throwable)((Object)new StateMachineException("Execution error", h.getError())));
                    } else {
                        queueItem.callback.complete();
                    }
                });
            }
            if (queueItem.triggerCallback != null) {
                holder = ctx.getOrEmpty((Object)REACTOR_CONTEXT_TRIGGER_ERRORS);
                holder.ifPresent(h -> {
                    if (h.getError() != null) {
                        queueItem.triggerCallback.error((Throwable)((Object)new StateMachineException("Execution error", h.getError())));
                    } else {
                        queueItem.triggerCallback.complete();
                    }
                });
            }
        })).contextWrite((ContextView)Context.of((Object)"stateMachineErrors", (Object)new StateMachineExecutor.ExecutorExceptionHolder(), (Object)REACTOR_CONTEXT_TRIGGER_ERRORS, (Object)new StateMachineExecutor.ExecutorExceptionHolder()));
    }

    private Mono<Void> handleInitialTrans(Transition<S, E> tran, Message<E> queuedMessage) {
        return Mono.defer(() -> {
            StateContext<S, E> stateContext = this.buildStateContext(queuedMessage, tran, this.relayStateMachine);
            return tran.transit(stateContext).then(this.stateMachineExecutorTransit.transit(tran, stateContext, queuedMessage));
        });
    }

    private Mono<Void> handleTriggerlessTransitions(StateContext<S, E> context, State<S, E> state) {
        Flux monoFlux = Flux.generate(sink -> sink.next(this.handleTriggerTrans(this.triggerlessTransitions, context != null ? context.getMessage() : null, state)));
        Flux flux = Flux.concat((Publisher)monoFlux);
        return flux.takeUntil(b -> b == false).then();
    }

    private Mono<Boolean> handleTriggerTrans(List<Transition<S, E>> trans, Message<E> queuedMessage) {
        return this.handleTriggerTrans(trans, queuedMessage, null);
    }

    private Mono<Boolean> handleTriggerTrans(List<Transition<S, E>> trans, Message<E> queuedMessage, State<S, E> completion) {
        return Flux.fromIterable(trans).filter(t -> {
            State source = t.getSource();
            if (source == null) {
                return false;
            }
            State currentState = this.stateMachine.getState();
            if (currentState == null) {
                return false;
            }
            if (!StateMachineUtils.containsAtleastOne(source.getIds(), currentState.getIds())) {
                return false;
            }
            if (this.transitionConflictPolicy != TransitionConflictPolicy.PARENT && completion != null && !source.getId().equals(completion.getId())) {
                if (source.isOrthogonal()) {
                    return false;
                }
                if (!StateMachineUtils.isSubstate(source, completion)) {
                    return false;
                }
            }
            return true;
        }).flatMap(t -> {
            if (StateMachineUtils.isPseudoState(t.getTarget(), PseudoStateKind.JOIN)) {
                if (this.joinSyncStates.isEmpty()) {
                    List joins = ((JoinPseudoState)t.getTarget().getPseudoState()).getJoins();
                    for (List j : joins) {
                        this.joinSyncStates.addAll(j);
                    }
                }
                this.joinSyncTransitions.add((Transition<S, Transition>)t);
                boolean removed = this.joinSyncStates.remove(t.getSource());
                boolean joincomplete = removed & this.joinSyncStates.isEmpty();
                if (joincomplete) {
                    return Flux.fromIterable(this.joinSyncTransitions).flatMap(tt -> {
                        StateContext<S, E> stateContext = this.buildStateContext(queuedMessage, (Transition<S, E>)tt, this.relayStateMachine);
                        return tt.transit(stateContext).then(this.stateMachineExecutorTransit.transit((Transition<S, E>)tt, stateContext, queuedMessage));
                    }).doFinally(s -> this.joinSyncTransitions.clear()).then(Mono.just((Object)true));
                }
                return Mono.just((Object)false);
            }
            StateContext<S, E> stateContext = this.buildStateContext(queuedMessage, (Transition<S, E>)t, this.relayStateMachine);
            return Mono.just(stateContext).map(context -> this.interceptors.preTransition(stateContext)).then(t.transit(stateContext).flatMap(at -> {
                if (at.booleanValue()) {
                    return this.stateMachineExecutorTransit.transit((Transition<S, E>)t, stateContext, queuedMessage).thenReturn((Object)true).doOnNext(a -> this.interceptors.postTransition(stateContext));
                }
                return Mono.just((Object)false);
            }));
        }).takeUntil(transit -> transit).last((Object)false);
    }

    private StateContext<S, E> buildStateContext(Message<E> message, Transition<S, E> transition, StateMachine<S, E> stateMachine) {
        MessageHeaders messageHeaders = message != null ? message.getHeaders() : new MessageHeaders(new HashMap());
        HashMap<String, UUID> map = new HashMap<String, UUID>((Map<String, UUID>)messageHeaders);
        if (!map.containsKey("_sm_id_")) {
            map.put("_sm_id_", stateMachine.getUuid());
        }
        return new DefaultStateContext<S, E>(StateContext.Stage.TRANSITION, message, new MessageHeaders(map), stateMachine.getExtendedState(), transition, stateMachine, null, null, null);
    }

    private void registerTriggerListener() {
        for (final Trigger<S, E> trigger : this.triggerToTransitionMap.keySet()) {
            if (!(trigger instanceof TimerTrigger)) continue;
            ((TimerTrigger)trigger).addTriggerListener(new TriggerListener(){

                @Override
                public void triggered() {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("TimedTrigger triggered " + trigger));
                    }
                    Mono.just((Object)new TriggerQueueItem(trigger, null, null, null)).flatMap(tqi -> Mono.fromCallable(() -> {
                        ReactiveStateMachineExecutor.this.triggerSink.emitNext(tqi, Sinks.EmitFailureHandler.FAIL_FAST);
                        return null;
                    }).retryWhen((Retry)Retry.fixedDelay((long)10L, (Duration)Duration.ofNanos(10L)))).subscribe();
                }
            });
        }
    }

    private Mono<Void> startTriggers() {
        List smrl = this.triggerToTransitionMap.keySet().stream().filter(StateMachineReactiveLifecycle.class::isInstance).map(StateMachineReactiveLifecycle.class::cast).collect(Collectors.toList());
        return Flux.fromIterable(smrl).flatMap(StateMachineReactiveLifecycle::startReactively).then();
    }

    private Mono<Void> stopTriggers() {
        List smrl = this.triggerToTransitionMap.keySet().stream().filter(StateMachineReactiveLifecycle.class::isInstance).map(StateMachineReactiveLifecycle.class::cast).collect(Collectors.toList());
        return Flux.fromIterable(smrl).flatMap(StateMachineReactiveLifecycle::stopReactively).then();
    }

    private static Function<? super Throwable, Mono<Void>> resumeTriggerErrorToContext() {
        return t -> Mono.deferContextual(Mono::just).doOnNext(ctx -> {
            Optional holder = ctx.getOrEmpty((Object)REACTOR_CONTEXT_TRIGGER_ERRORS);
            holder.ifPresent(h -> h.setError((Throwable)t));
        }).then();
    }

    private class TriggerQueueItem {
        Trigger<S, E> trigger;
        Message<E> message;
        StateMachineExecutor.StateMachineExecutorCallback callback;
        StateMachineExecutor.StateMachineExecutorCallback triggerCallback;

        public TriggerQueueItem(Trigger<S, E> trigger, Message<E> message, StateMachineExecutor.StateMachineExecutorCallback callback, StateMachineExecutor.StateMachineExecutorCallback triggerCallback) {
            this.trigger = trigger;
            this.message = message;
            this.callback = callback;
            this.triggerCallback = triggerCallback;
        }

        public String toString() {
            return "TriggerQueueItem [message=" + this.message + ", trigger=" + this.trigger + "]";
        }
    }
}

