package org.wso2.carbon.databridge.core;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.wso2.carbon.config.ConfigProviderFactory;
import org.wso2.carbon.config.ConfigurationException;
import org.wso2.carbon.config.provider.ConfigProvider;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.AuthenticationException;
import org.wso2.carbon.databridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.SessionTimeoutException;
import org.wso2.carbon.databridge.commons.exception.UndefinedEventTypeException;
import org.wso2.carbon.databridge.commons.utils.DataBridgeCommonsUtils;
import org.wso2.carbon.databridge.core.conf.DataBridgeConfiguration;
import org.wso2.carbon.databridge.core.conf.DatabridgeConfigurationFileResolver;
import org.wso2.carbon.databridge.core.definitionstore.AbstractStreamDefinitionStore;
import org.wso2.carbon.databridge.core.definitionstore.StreamAddRemoveListener;
import org.wso2.carbon.databridge.core.definitionstore.StreamDefinitionStore;
import org.wso2.carbon.databridge.core.exception.DataBridgeConfigurationException;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionNotFoundException;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.databridge.core.internal.EventDispatcher;
import org.wso2.carbon.databridge.core.internal.authentication.AuthenticationHandler;
import org.wso2.carbon.databridge.core.internal.authentication.Authenticator;
import org.wso2.carbon.databridge.core.internal.utils.DataBridgeConstants;
import org.wso2.carbon.databridge.core.utils.AgentSession;
import org.wso2.carbon.utils.Utils;

/* loaded from: input_file:org/wso2/carbon/databridge/core/DataBridge.class */
public class DataBridge implements DataBridgeSubscriberService, DataBridgeReceiverService {
    private static final Logger log = Logger.getLogger(DataBridge.class);
    private StreamDefinitionStore streamDefinitionStore;
    private EventDispatcher eventDispatcher;
    private Authenticator authenticator;
    private AuthenticationHandler authenticatorHandler;
    private List<StreamAddRemoveListener> streamAddRemoveListenerList = new ArrayList();
    private DataBridgeConfiguration dataBridgeConfiguration;
    private AtomicInteger eventsReceived;
    private AtomicInteger totalEventCounter;
    private long startTime;
    private boolean isProfileReceiver;
    private int cutoff;

    public DataBridge(AuthenticationHandler authenticationHandler, AbstractStreamDefinitionStore abstractStreamDefinitionStore, DataBridgeConfiguration dataBridgeConfiguration) {
        this.cutoff = 100000;
        setInitialConfig(dataBridgeConfiguration);
        this.eventDispatcher = new EventDispatcher(abstractStreamDefinitionStore, dataBridgeConfiguration);
        this.streamDefinitionStore = abstractStreamDefinitionStore;
        this.authenticatorHandler = authenticationHandler;
        this.authenticator = new Authenticator(authenticationHandler, dataBridgeConfiguration);
        String property = System.getProperty("profileReceiver");
        String property2 = System.getProperty("receiverStatsCutoff");
        if (property == null || !property.equalsIgnoreCase("true")) {
            return;
        }
        this.isProfileReceiver = true;
        this.eventsReceived = new AtomicInteger();
        this.totalEventCounter = new AtomicInteger();
        this.startTime = 0L;
        if (property2 == null || !StringUtils.isNumeric(property2)) {
            return;
        }
        this.cutoff = Integer.parseInt(property2);
    }

    public DataBridge(AuthenticationHandler authenticationHandler, AbstractStreamDefinitionStore abstractStreamDefinitionStore, String str) {
        this.cutoff = 100000;
        DataBridgeConfiguration dataBridgeConfiguration = null;
        try {
            dataBridgeConfiguration = createDataBridgeConfiguration(str);
        } catch (DataBridgeConfigurationException e) {
            log.error("Error while loading the data bridge configuration file : " + str, e);
        }
        setInitialConfig(dataBridgeConfiguration);
        this.eventDispatcher = new EventDispatcher(abstractStreamDefinitionStore, dataBridgeConfiguration);
        this.streamDefinitionStore = abstractStreamDefinitionStore;
        this.authenticatorHandler = authenticationHandler;
        this.authenticator = new Authenticator(authenticationHandler, dataBridgeConfiguration);
        String property = System.getProperty("profileReceiver");
        String property2 = System.getProperty("receiverStatsCutoff");
        if (property == null || !property.equalsIgnoreCase("true")) {
            return;
        }
        this.isProfileReceiver = true;
        this.eventsReceived = new AtomicInteger();
        this.totalEventCounter = new AtomicInteger();
        this.startTime = 0L;
        if (property2 == null || !StringUtils.isNumeric(property2)) {
            return;
        }
        this.cutoff = Integer.parseInt(property2);
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public String defineStream(String str, String str2) throws DifferentStreamDefinitionAlreadyDefinedException, MalformedStreamDefinitionException, SessionTimeoutException {
        AgentSession session = this.authenticator.getSession(str);
        try {
            if (session.getCredentials() == null) {
                if (log.isDebugEnabled()) {
                    log.debug("session " + str + " expired ");
                }
                throw new SessionTimeoutException(str + " expired");
            }
            try {
                this.authenticatorHandler.initContext(session);
                String defineStream = this.eventDispatcher.defineStream(str2, session);
                if (defineStream != null) {
                    Iterator<StreamAddRemoveListener> it = this.streamAddRemoveListenerList.iterator();
                    while (it.hasNext()) {
                        it.next().streamAdded(defineStream);
                    }
                }
                return defineStream;
            } catch (DifferentStreamDefinitionAlreadyDefinedException e) {
                throw new DifferentStreamDefinitionAlreadyDefinedException(e.getErrorMessage(), e);
            } catch (MalformedStreamDefinitionException e2) {
                throw new MalformedStreamDefinitionException(e2.getErrorMessage(), e2);
            } catch (StreamDefinitionStoreException e3) {
                throw new MalformedStreamDefinitionException(e3.getErrorMessage(), e3);
            }
        } finally {
            this.authenticatorHandler.destroyContext(session);
        }
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public String defineStream(String str, String str2, String str3) throws DifferentStreamDefinitionAlreadyDefinedException, MalformedStreamDefinitionException, SessionTimeoutException {
        AgentSession session = this.authenticator.getSession(str);
        try {
            if (session.getCredentials() == null) {
                if (log.isDebugEnabled()) {
                    log.debug("session " + str + " expired ");
                }
                throw new SessionTimeoutException(str + " expired");
            }
            try {
                try {
                    this.authenticatorHandler.initContext(session);
                    String defineStream = this.eventDispatcher.defineStream(str2, session, str3);
                    if (defineStream != null) {
                        Iterator<StreamAddRemoveListener> it = this.streamAddRemoveListenerList.iterator();
                        while (it.hasNext()) {
                            it.next().streamAdded(defineStream);
                        }
                    }
                    return defineStream;
                } catch (MalformedStreamDefinitionException e) {
                    throw new MalformedStreamDefinitionException(e.getErrorMessage(), e);
                }
            } catch (StreamDefinitionStoreException e2) {
                throw new MalformedStreamDefinitionException(e2.getErrorMessage(), e2);
            } catch (DifferentStreamDefinitionAlreadyDefinedException e3) {
                throw new DifferentStreamDefinitionAlreadyDefinedException(e3.getErrorMessage(), e3);
            }
        } finally {
            this.authenticatorHandler.destroyContext(session);
        }
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public String findStreamId(String str, String str2, String str3) throws SessionTimeoutException {
        AgentSession session = this.authenticator.getSession(str);
        try {
            if (session.getCredentials() == null) {
                if (log.isDebugEnabled()) {
                    log.debug("session " + str + " expired ");
                }
                throw new SessionTimeoutException(str + " expired");
            }
            try {
                this.authenticatorHandler.initContext(session);
                String findStreamId = this.eventDispatcher.findStreamId(str2, str3, session);
                this.authenticatorHandler.destroyContext(session);
                return findStreamId;
            } catch (StreamDefinitionStoreException e) {
                log.warn("Cannot find streamId for " + str2 + " " + str3, e);
                this.authenticatorHandler.destroyContext(session);
                return null;
            }
        } catch (Throwable th) {
            this.authenticatorHandler.destroyContext(session);
            throw th;
        }
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public boolean deleteStream(String str, String str2) throws SessionTimeoutException {
        AgentSession session = this.authenticator.getSession(str);
        if (session.getCredentials() == null) {
            if (log.isDebugEnabled()) {
                log.debug("session " + str + " expired ");
            }
            throw new SessionTimeoutException(str + " expired");
        }
        try {
            this.authenticatorHandler.initContext(session);
            boolean deleteStream = this.eventDispatcher.deleteStream(DataBridgeCommonsUtils.getStreamNameFromStreamId(str2), DataBridgeCommonsUtils.getStreamVersionFromStreamId(str2), session);
            if (deleteStream) {
                Iterator<StreamAddRemoveListener> it = this.streamAddRemoveListenerList.iterator();
                while (it.hasNext()) {
                    it.next().streamRemoved(str2);
                }
            }
            return deleteStream;
        } finally {
            this.authenticatorHandler.destroyContext(session);
        }
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public boolean deleteStream(String str, String str2, String str3) throws SessionTimeoutException {
        return deleteStream(str, DataBridgeCommonsUtils.generateStreamId(str2, str3));
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public void publish(Object obj, String str, EventConverter eventConverter) throws UndefinedEventTypeException, SessionTimeoutException {
        startTimeMeasurement();
        AgentSession session = this.authenticator.getSession(str);
        if (session.getCredentials() == null) {
            if (log.isDebugEnabled()) {
                log.debug("session " + str + " expired ");
            }
            throw new SessionTimeoutException(str + " expired");
        }
        try {
            this.authenticatorHandler.initContext(session);
            this.eventDispatcher.publish(obj, session, eventConverter);
            endTimeMeasurement(eventConverter.getNumberOfEvents(obj));
            this.authenticatorHandler.destroyContext(session);
        } catch (Throwable th) {
            this.authenticatorHandler.destroyContext(session);
            throw th;
        }
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public Boolean isQueueEmpty() {
        return this.eventDispatcher.isQueueEmpty();
    }

    private void endTimeMeasurement(int i) {
        if (this.isProfileReceiver) {
            this.eventsReceived.addAndGet(i);
            if (this.eventsReceived.get() > this.cutoff) {
                synchronized (this) {
                    if (this.eventsReceived.get() > this.cutoff) {
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
                        Date date = new Date();
                        long currentTimeMillis = System.currentTimeMillis();
                        int andSet = this.eventsReceived.getAndSet(0);
                        this.totalEventCounter.addAndGet(andSet);
                        String str = "[" + simpleDateFormat.format(date) + "] # of events : " + andSet + " start timestamp : " + this.startTime + " end time stamp : " + currentTimeMillis + " Throughput is (events / sec) : " + ((andSet * 1000) / (currentTimeMillis - this.startTime)) + " Total Event Count : " + this.totalEventCounter + " \n";
                        File file = new File(Utils.getCarbonHome() + File.separator + "receiver-perf.txt");
                        if (!file.exists()) {
                            log.info("Creating the performance measurement file at : " + file.getAbsolutePath());
                        }
                        try {
                            appendToFile(IOUtils.toInputStream(str), file);
                        } catch (IOException e) {
                            log.error(e.getMessage(), e);
                        }
                        this.startTime = 0L;
                    }
                }
            }
        }
    }

    public void appendToFile(InputStream inputStream, File file) throws IOException {
        BufferedOutputStream bufferedOutputStream = null;
        try {
            bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(file, true));
            IOUtils.copy(inputStream, bufferedOutputStream);
            IOUtils.closeQuietly(bufferedOutputStream);
        } catch (Throwable th) {
            IOUtils.closeQuietly(bufferedOutputStream);
            throw th;
        }
    }

    private void startTimeMeasurement() {
        if (this.isProfileReceiver && this.startTime == 0) {
            this.startTime = System.currentTimeMillis();
        }
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public String login(String str, String str2) throws AuthenticationException {
        log.info("user " + str + " connected");
        return this.authenticator.authenticate(str, str2);
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public void logout(String str) throws Exception {
        AgentSession session = this.authenticator.getSession(str);
        this.authenticator.logout(str);
        if (session != null) {
            log.info("user " + session.getUsername() + " disconnected");
        } else {
            log.info("session " + str + " disconnected");
        }
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public DataBridgeConfiguration getInitialConfig() {
        return this.dataBridgeConfiguration;
    }

    public void setInitialConfig(DataBridgeConfiguration dataBridgeConfiguration) {
        this.dataBridgeConfiguration = dataBridgeConfiguration;
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeSubscriberService
    public void subscribe(AgentCallback agentCallback) {
        this.eventDispatcher.addCallback(agentCallback);
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeSubscriberService
    public void subscribe(RawDataAgentCallback rawDataAgentCallback) {
        this.eventDispatcher.addCallback(rawDataAgentCallback);
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public StreamDefinition getStreamDefinition(String str, String str2, String str3) throws SessionTimeoutException, StreamDefinitionNotFoundException, StreamDefinitionStoreException {
        AgentSession session = this.authenticator.getSession(str);
        if (session.getUsername() == null) {
            if (log.isDebugEnabled()) {
                log.debug("session " + str + " expired ");
            }
            throw new SessionTimeoutException(str + " expired");
        }
        try {
            this.authenticatorHandler.initContext(session);
            StreamDefinition streamDefinition = getStreamDefinition(str2, str3);
            this.authenticatorHandler.destroyContext(session);
            return streamDefinition;
        } catch (Throwable th) {
            this.authenticatorHandler.destroyContext(session);
            throw th;
        }
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public List<StreamDefinition> getAllStreamDefinitions(String str) throws SessionTimeoutException {
        AgentSession session = this.authenticator.getSession(str);
        if (session.getUsername() == null) {
            if (log.isDebugEnabled()) {
                log.debug("session " + str + " expired ");
            }
            throw new SessionTimeoutException(str + " expired");
        }
        try {
            this.authenticatorHandler.initContext(session);
            List<StreamDefinition> allStreamDefinitions = getAllStreamDefinitions();
            this.authenticatorHandler.destroyContext(session);
            return allStreamDefinitions;
        } catch (Throwable th) {
            this.authenticatorHandler.destroyContext(session);
            throw th;
        }
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public void saveStreamDefinition(String str, StreamDefinition streamDefinition) throws SessionTimeoutException, StreamDefinitionStoreException, DifferentStreamDefinitionAlreadyDefinedException {
        AgentSession session = this.authenticator.getSession(str);
        if (session.getUsername() == null) {
            if (log.isDebugEnabled()) {
                log.debug("session " + str + " expired ");
            }
            throw new SessionTimeoutException(str + " expired");
        }
        try {
            this.authenticatorHandler.initContext(session);
            saveStreamDefinition(streamDefinition);
            this.eventDispatcher.updateStreamDefinitionHolder(session);
            this.authenticatorHandler.destroyContext(session);
        } catch (Throwable th) {
            this.authenticatorHandler.destroyContext(session);
            throw th;
        }
    }

    @Override // org.wso2.carbon.databridge.core.definitionstore.StreamDefinitionStore
    public StreamDefinition getStreamDefinition(String str, String str2) throws StreamDefinitionNotFoundException, StreamDefinitionStoreException {
        return this.streamDefinitionStore.getStreamDefinition(str, str2);
    }

    @Override // org.wso2.carbon.databridge.core.definitionstore.StreamDefinitionStore
    public StreamDefinition getStreamDefinition(String str) throws StreamDefinitionNotFoundException, StreamDefinitionStoreException {
        return this.streamDefinitionStore.getStreamDefinition(str);
    }

    @Override // org.wso2.carbon.databridge.core.definitionstore.StreamDefinitionStore
    public List<StreamDefinition> getAllStreamDefinitions() {
        return new ArrayList(this.streamDefinitionStore.getAllStreamDefinitions());
    }

    @Override // org.wso2.carbon.databridge.core.definitionstore.StreamDefinitionStore
    public void saveStreamDefinition(StreamDefinition streamDefinition) throws DifferentStreamDefinitionAlreadyDefinedException, StreamDefinitionStoreException {
        this.streamDefinitionStore.saveStreamDefinition(streamDefinition);
    }

    @Override // org.wso2.carbon.databridge.core.definitionstore.StreamDefinitionStore
    public boolean deleteStreamDefinition(String str, String str2) {
        return this.streamDefinitionStore.deleteStreamDefinition(str, str2);
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeSubscriberService
    public List<AgentCallback> getSubscribers() {
        return this.eventDispatcher.getSubscribers();
    }

    @Override // org.wso2.carbon.databridge.core.DataBridgeSubscriberService
    public List<RawDataAgentCallback> getRawDataSubscribers() {
        return this.eventDispatcher.getRawDataSubscribers();
    }

    @Override // org.wso2.carbon.databridge.core.definitionstore.StreamDefinitionStore, org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public void subscribe(StreamAddRemoveListener streamAddRemoveListener) {
        if (streamAddRemoveListener != null) {
            this.streamAddRemoveListenerList.add(streamAddRemoveListener);
        }
    }

    @Override // org.wso2.carbon.databridge.core.definitionstore.StreamDefinitionStore, org.wso2.carbon.databridge.core.DataBridgeReceiverService
    public void unsubscribe(StreamAddRemoveListener streamAddRemoveListener) {
        if (streamAddRemoveListener != null) {
            this.streamAddRemoveListenerList.remove(streamAddRemoveListener);
        }
    }

    private DataBridgeConfiguration createDataBridgeConfiguration(String str) throws DataBridgeConfigurationException {
        Path path = Paths.get(str, new String[0]);
        try {
            if (!Files.exists(path, new LinkOption[0])) {
                log.error("Cannot find data bridge configuration file : " + str);
                return null;
            }
            ConfigProvider configProvider = ConfigProviderFactory.getConfigProvider(path);
            LinkedHashMap linkedHashMap = (LinkedHashMap) configProvider.getConfigurationObject(DataBridgeConstants.TRANSPORTS_NAMESPACE);
            return linkedHashMap != null ? DatabridgeConfigurationFileResolver.resolveTransportsNamespaceConfiguration(linkedHashMap) : DatabridgeConfigurationFileResolver.resolveAndSetDatabridgeConfiguration((LinkedHashMap) configProvider.getConfigurationObject(DataBridgeConstants.DATABRIDGE_CONFIG_NAMESPACE));
        } catch (ConfigurationException e) {
            throw new DataBridgeConfigurationException("Error in when loading databridge configuration", e);
        }
    }
}
