/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.jvm.streams;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.streams.DefaultStreamSubscription;
import org.ballerinalang.jvm.streams.StreamSubscription;
import org.ballerinalang.jvm.util.exceptions.BallerinaException;
import org.ballerinalang.jvm.values.FPValue;
import org.ballerinalang.jvm.values.StreamValue;

public class StreamSubscriptionManager
implements Observer {
    private Map<String, List<StreamSubscription>> processors = new HashMap<String, List<StreamSubscription>>();
    private static StreamSubscriptionManager streamSubscriptionManager = new StreamSubscriptionManager();

    private StreamSubscriptionManager() {
    }

    public static StreamSubscriptionManager getInstance() {
        return streamSubscriptionManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerMessageProcessor(StreamValue stream2, FPValue<Object[], Object> functionPointer) {
        StreamSubscriptionManager streamSubscriptionManager = this;
        synchronized (streamSubscriptionManager) {
            this.processors.computeIfAbsent(stream2.streamId, key -> new ArrayList()).add(new DefaultStreamSubscription(stream2, functionPointer, this));
        }
    }

    public void sendMessage(StreamValue stream2, Strand strand, Object value2) {
        List<StreamSubscription> msgProcessors = this.processors.get(stream2.streamId);
        if (msgProcessors != null) {
            msgProcessors.forEach(processor -> processor.send(strand, value2));
        }
    }

    @Override
    public void update(Observable o, Object arg) {
        if (!(o instanceof StreamSubscription)) {
            throw new BallerinaException("Invalid subscription. Expected a subscription to a stream");
        }
        StreamSubscription msgProcessor = (StreamSubscription)o;
        StreamValue stream2 = msgProcessor.getStream();
        if (!(arg instanceof Object[])) {
            throw new BallerinaException("Invalid data parameters received to stream: " + stream2.getStreamId());
        }
        msgProcessor.execute((Object[])arg);
    }
}

