package com.oracle.coherence.patterns.messaging;

import com.oracle.coherence.common.finitestatemachines.AnnotationDrivenModel;
import com.oracle.coherence.common.finitestatemachines.Event;
import com.oracle.coherence.common.finitestatemachines.Instruction;
import com.oracle.coherence.common.finitestatemachines.NonBlockingFiniteStateMachine;
import com.oracle.coherence.common.identifiers.Identifier;
import com.oracle.coherence.common.threading.ExecutorServiceFactory;
import com.oracle.coherence.common.threading.ThreadFactories;
import com.oracle.coherence.patterns.messaging.AbstractEngine;
import com.oracle.coherence.patterns.messaging.entryprocessors.ExposeMessageToQueueProcessor;
import com.oracle.coherence.patterns.messaging.entryprocessors.RegisterSubscriptionsWithMessageProcessor;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.cache.ContinuousQueryCache;
import com.tangosol.util.extractor.ChainedExtractor;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.processor.UpdaterProcessor;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/* loaded from: input_file:com/oracle/coherence/patterns/messaging/MessageEngine.class */
public class MessageEngine extends AbstractEngine {
    private static volatile ContinuousQueryCache cqcTopicCache = null;

    /* loaded from: input_file:com/oracle/coherence/patterns/messaging/MessageEngine$MessageEvent.class */
    public static class MessageEvent<S extends Enum<S>> extends NonBlockingFiniteStateMachine.CoalescedEvent<S> {
        private MessagePublisher publisher;

        public MessageEvent(Event<S> event, NonBlockingFiniteStateMachine.CoalescedEvent.Process process, Object obj, MessagePublisher messagePublisher) {
            super(event, process, obj);
            this.publisher = messagePublisher;
        }

        public MessagePublisher getMessagePublisher() {
            return this.publisher;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageEngine(Identifier identifier) {
        super(identifier);
        this.m_fsm = new NonBlockingFiniteStateMachine<>("Message", new AnnotationDrivenModel(AbstractEngine.State.class, new AbstractEngine.Model(this)), AbstractEngine.State.IDLE, ExecutorServiceFactory.newSingleThreadScheduledExecutor(ThreadFactories.newThreadFactory(true, "MessagingEngine", (ThreadGroup) null)), false);
    }

    public void processRunEvent(MessagePublisher messagePublisher) {
        this.m_fsm.process(new MessageEvent(new AbstractEngine.StateEvent(AbstractEngine.State.RUNNING), NonBlockingFiniteStateMachine.CoalescedEvent.Process.MOST_RECENT, messagePublisher, messagePublisher));
    }

    @Override // com.oracle.coherence.patterns.messaging.AbstractEngine
    Instruction onRunning(Event<AbstractEngine.State> event) {
        MessageTracker trackerSnapShot = MessagesToExpose.getInstance().getTrackerSnapShot(getDestinationIdentifier());
        MessagePublisher messagePublisher = ((MessageEvent) event).getMessagePublisher();
        if (trackerSnapShot != null && !trackerSnapShot.isEmpty()) {
            if (messagePublisher.isQueue()) {
                exposeQueueMessageBatch(getDestinationIdentifier(), trackerSnapShot);
            } else if (messagePublisher.isTopic()) {
                exposeTopicMessageBatch(getDestinationIdentifier(), trackerSnapShot);
            }
        }
        return new Instruction.TransitionTo(AbstractEngine.State.IDLE);
    }

    private void exposeQueueMessageBatch(Identifier identifier, MessageTracker messageTracker) {
        CacheFactory.getCache(Destination.CACHENAME).invoke(identifier, new ExposeMessageToQueueProcessor(messageTracker));
        CacheFactory.getCache(Message.CACHENAME).invokeAll(messageTracker.getMessageKeys(identifier), new UpdaterProcessor("setVisible", Boolean.TRUE));
    }

    private void exposeTopicMessageBatch(Identifier identifier, MessageTracker messageTracker) {
        NamedCache cache = CacheFactory.getCache(Message.CACHENAME);
        Set<SubscriptionIdentifier> subscriptionIdentifiers = ((Destination) getDestinationCache().get(identifier)).getSubscriptionIdentifiers();
        if (subscriptionIdentifiers.isEmpty()) {
            Iterator<MessageIdentifier> it = messageTracker.iterator();
            while (it.hasNext()) {
                cache.remove(Message.getKey(identifier, it.next()));
            }
            return;
        }
        Map invokeAll = CacheFactory.getCache(Subscription.CACHENAME).invokeAll(subscriptionIdentifiers, new UpdaterProcessor("onAcceptMessage", messageTracker));
        HashSet hashSet = new HashSet();
        for (SubscriptionIdentifier subscriptionIdentifier : invokeAll.keySet()) {
            if (((Boolean) invokeAll.get(subscriptionIdentifier)).booleanValue()) {
                hashSet.add(subscriptionIdentifier);
            }
        }
        cache.invokeAll(messageTracker.getMessageKeys(identifier), new RegisterSubscriptionsWithMessageProcessor(new SubscriptionIdentifierSet(hashSet)));
    }

    private ContinuousQueryCache getDestinationCache() {
        if (cqcTopicCache == null) {
            cqcTopicCache = new ContinuousQueryCache(CacheFactory.getCache(Destination.CACHENAME), new EqualsFilter(new ChainedExtractor("getClass.getName"), Topic.class.getName()));
        }
        return cqcTopicCache;
    }

    @Override // com.oracle.coherence.patterns.messaging.AbstractEngine
    public /* bridge */ /* synthetic */ void dispose() {
        super.dispose();
    }

    @Override // com.oracle.coherence.patterns.messaging.AbstractEngine
    public /* bridge */ /* synthetic */ Identifier getDestinationIdentifier() {
        return super.getDestinationIdentifier();
    }
}
