package org.ballerinalang.jvm.streams;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import org.ballerinalang.jvm.scheduling.Strand;
import org.ballerinalang.jvm.util.exceptions.BallerinaException;
import org.ballerinalang.jvm.values.CloneUtils;
import org.ballerinalang.jvm.values.FPValue;
import org.ballerinalang.jvm.values.StreamValue;

@Deprecated
/* loaded from: input_file:org/ballerinalang/jvm/streams/StreamSubscriptionManager.class */
public class StreamSubscriptionManager implements Observer {
    private Map<String, List<StreamSubscription>> processors = new HashMap();
    private static StreamSubscriptionManager streamSubscriptionManager = new StreamSubscriptionManager();

    private StreamSubscriptionManager() {
    }

    public static StreamSubscriptionManager getInstance() {
        return streamSubscriptionManager;
    }

    public void registerMessageProcessor(StreamValue streamValue, FPValue<Object[], Object> fPValue) {
        synchronized (this) {
            this.processors.computeIfAbsent(streamValue.streamId, str -> {
                return new ArrayList();
            }).add(new DefaultStreamSubscription(streamValue, fPValue, this));
        }
    }

    public void sendMessage(StreamValue streamValue, Strand strand, Object obj) {
        List<StreamSubscription> list = this.processors.get(streamValue.streamId);
        if (list != null) {
            list.forEach(streamSubscription -> {
                streamSubscription.send(strand, CloneUtils.cloneValue(obj));
            });
        }
    }

    @Override // java.util.Observer
    public void update(Observable observable, Object obj) {
        if (!(observable instanceof StreamSubscription)) {
            throw new BallerinaException("Invalid subscription. Expected a subscription to a stream");
        }
        StreamSubscription streamSubscription = (StreamSubscription) observable;
        StreamValue stream = streamSubscription.getStream();
        if (!(obj instanceof Object[])) {
            throw new BallerinaException("Invalid data parameters received to stream: " + stream.getStreamId());
        }
        streamSubscription.execute((Object[]) obj);
    }
}
