package org.ballerinalang.streams;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import org.ballerinalang.model.values.BFunctionPointer;
import org.ballerinalang.model.values.BStream;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.util.exceptions.BallerinaException;

/* loaded from: input_file:org/ballerinalang/streams/StreamSubscriptionManager.class */
public class StreamSubscriptionManager implements Observer {
    private Map<String, List<StreamSubscription>> processors = new HashMap();
    private static StreamSubscriptionManager streamSubscriptionManager = new StreamSubscriptionManager();

    private StreamSubscriptionManager() {
    }

    public static StreamSubscriptionManager getInstance() {
        return streamSubscriptionManager;
    }

    public void registerMessageProcessor(BStream bStream, BFunctionPointer bFunctionPointer) {
        synchronized (this) {
            this.processors.computeIfAbsent(bStream.topicName, str -> {
                return new ArrayList();
            }).add(new DefaultStreamSubscription(bStream, bFunctionPointer, this));
        }
    }

    public void sendMessage(BStream bStream, BValue bValue) {
        List<StreamSubscription> list = this.processors.get(bStream.topicName);
        if (list != null) {
            list.forEach(streamSubscription -> {
                streamSubscription.send(bValue);
            });
        }
    }

    @Override // java.util.Observer
    public void update(Observable observable, Object obj) {
        if (!(observable instanceof StreamSubscription)) {
            throw new BallerinaException("Invalid subscription. Expected a subscription to a stream");
        }
        StreamSubscription streamSubscription = (StreamSubscription) observable;
        BStream stream = streamSubscription.getStream();
        if (!(obj instanceof BValue)) {
            throw new BallerinaException("Data received to stream: " + stream.getStreamId() + "is not supported");
        }
        streamSubscription.execute((BValue) obj);
    }
}
