/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.streams;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import org.ballerinalang.model.values.BFunctionPointer;
import org.ballerinalang.model.values.BStream;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.streams.DefaultStreamSubscription;
import org.ballerinalang.streams.StreamSubscription;
import org.ballerinalang.util.exceptions.BallerinaException;

public class StreamSubscriptionManager
implements Observer {
    private Map<String, List<StreamSubscription>> processors = new HashMap<String, List<StreamSubscription>>();
    private static StreamSubscriptionManager streamSubscriptionManager = new StreamSubscriptionManager();

    private StreamSubscriptionManager() {
    }

    public static StreamSubscriptionManager getInstance() {
        return streamSubscriptionManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerMessageProcessor(BStream stream, BFunctionPointer functionPointer) {
        StreamSubscriptionManager streamSubscriptionManager = this;
        synchronized (streamSubscriptionManager) {
            this.processors.computeIfAbsent(stream.topicName, key -> new ArrayList()).add(new DefaultStreamSubscription(stream, functionPointer, this));
        }
    }

    public void sendMessage(BStream stream, BValue value) {
        List<StreamSubscription> msgProcessors = this.processors.get(stream.topicName);
        if (msgProcessors != null) {
            msgProcessors.forEach(processor -> processor.send(value));
        }
    }

    @Override
    public void update(Observable o, Object arg) {
        if (!(o instanceof StreamSubscription)) {
            throw new BallerinaException("Invalid subscription. Expected a subscription to a stream");
        }
        StreamSubscription msgProcessor = (StreamSubscription)o;
        BStream stream = msgProcessor.getStream();
        if (!(arg instanceof BValue)) {
            throw new BallerinaException("Data received to stream: " + stream.getStreamId() + "is not supported");
        }
        msgProcessor.execute((BValue)arg);
    }
}

