package org.ballerinalang.net.websub.broker;

import io.ballerina.messaging.broker.common.StartupContext;
import io.ballerina.messaging.broker.common.ValidationException;
import io.ballerina.messaging.broker.common.config.BrokerCommonConfiguration;
import io.ballerina.messaging.broker.common.config.BrokerConfigProvider;
import io.ballerina.messaging.broker.common.data.types.FieldTable;
import io.ballerina.messaging.broker.core.Broker;
import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.BrokerImpl;
import io.ballerina.messaging.broker.core.Consumer;
import io.ballerina.messaging.broker.core.ContentChunk;
import io.ballerina.messaging.broker.core.Message;
import io.ballerina.messaging.broker.core.Metadata;
import io.ballerina.messaging.broker.core.configuration.BrokerCoreConfiguration;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/ballerinalang/net/websub/broker/BallerinaBroker.class */
public class BallerinaBroker {
    private static final Logger logger = LoggerFactory.getLogger(BallerinaBroker.class);
    private static BallerinaBroker instance = null;
    private BrokerImpl broker;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/ballerinalang/net/websub/broker/BallerinaBroker$BallerinaBrokerConfigProvider.class */
    public static class BallerinaBrokerConfigProvider implements BrokerConfigProvider {
        private Map<String, Object> configMap;

        private BallerinaBrokerConfigProvider() {
            this.configMap = new HashMap();
        }

        public <T> T getConfigurationObject(String str, Class<T> cls) throws Exception {
            return cls.cast(this.configMap.get(str));
        }

        void registerConfigurationObject(String str, Object obj) {
            this.configMap.put(str, obj);
        }
    }

    private BallerinaBroker(BrokerImpl brokerImpl) {
        this.broker = null;
        this.broker = brokerImpl;
    }

    public static BallerinaBroker getBrokerInstance() throws Exception {
        if (instance != null) {
            return instance;
        }
        synchronized (BallerinaBroker.class) {
            if (instance == null) {
                startInternalBroker();
            }
        }
        return instance;
    }

    public void addSubscription(String str, Consumer consumer) {
        String queueName = consumer.getQueueName();
        try {
            this.broker.createQueue(queueName, false, false, true);
            this.broker.bind(queueName, "amq.topic", str, FieldTable.EMPTY_TABLE);
            this.broker.addConsumer(consumer);
        } catch (BrokerException | ValidationException e) {
            logger.error("Error adding subscription: ", e);
        }
    }

    public void removeSubscription(Consumer consumer) {
        this.broker.removeConsumer(consumer);
    }

    public void publish(String str, byte[] bArr) {
        Message message = new Message(Broker.getNextMessageId(), new Metadata(str, "amq.topic", bArr.length));
        message.addChunk(new ContentChunk(0L, Unpooled.copiedBuffer(bArr)));
        try {
            this.broker.publish(message);
        } catch (BrokerException e) {
            logger.error("Error publishing to topic: ", e);
        }
    }

    public void publish(String str, ByteBuf byteBuf) {
        Message message = new Message(Broker.getNextMessageId(), new Metadata(str, "amq.topic", byteBuf.array().length));
        message.addChunk(new ContentChunk(0L, byteBuf));
        try {
            this.broker.publish(message);
        } catch (BrokerException e) {
            logger.error("Error publishing to topic: ", e);
        }
    }

    private static void startInternalBroker() throws Exception {
        BallerinaBrokerConfigProvider ballerinaBrokerConfigProvider = new BallerinaBrokerConfigProvider();
        BrokerCommonConfiguration brokerCommonConfiguration = new BrokerCommonConfiguration();
        brokerCommonConfiguration.setEnableInMemoryMode(true);
        ballerinaBrokerConfigProvider.registerConfigurationObject("ballerina.broker", brokerCommonConfiguration);
        ballerinaBrokerConfigProvider.registerConfigurationObject("ballerina.broker.core", new BrokerCoreConfiguration());
        StartupContext startupContext = new StartupContext();
        startupContext.registerService(BrokerConfigProvider.class, ballerinaBrokerConfigProvider);
        BrokerImpl brokerImpl = new BrokerImpl(startupContext);
        brokerImpl.startMessageDelivery();
        instance = new BallerinaBroker(brokerImpl);
    }
}
