package org.wso2.ppaas.python.cartridge.agent.integration.common;

import java.net.SocketException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.utils.EventDefinitionConverterUtils;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.DataBridge;
import org.wso2.carbon.databridge.core.Utils.AgentSession;
import org.wso2.carbon.databridge.core.definitionstore.InMemoryStreamDefinitionStore;
import org.wso2.carbon.databridge.core.exception.DataBridgeException;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.databridge.core.internal.authentication.AuthenticationHandler;
import org.wso2.carbon.databridge.receiver.thrift.ThriftDataReceiver;
import org.wso2.carbon.user.api.UserStoreException;

/* loaded from: input_file:org/wso2/ppaas/python/cartridge/agent/integration/common/ThriftTestServer.class */
public class ThriftTestServer {
    private Logger log = Logger.getLogger(ThriftTestServer.class);
    private ThriftDataReceiver thriftDataReceiver;
    private InMemoryStreamDefinitionStore streamDefinitionStore;
    private AtomicInteger numberOfEventsReceived;
    private RestarterThread restarterThread;
    private DataBridge databridge;

    /* loaded from: input_file:org/wso2/ppaas/python/cartridge/agent/integration/common/ThriftTestServer$RestarterThread.class */
    class RestarterThread implements Runnable {
        int eventReceived;
        int port;
        long stopAfterTimeMilliSeconds;
        long startAfterTimeMS;

        RestarterThread(int i, long j, long j2) {
            this.port = i;
            this.stopAfterTimeMilliSeconds = j;
            this.startAfterTimeMS = j2;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Thread.sleep(this.stopAfterTimeMilliSeconds);
            } catch (InterruptedException e) {
            }
            if (ThriftTestServer.this.thriftDataReceiver != null) {
                ThriftTestServer.this.thriftDataReceiver.stop();
            }
            this.eventReceived = ThriftTestServer.this.getNumberOfEventsReceived();
            ThriftTestServer.this.log.info("Number of events received in server shutdown :" + this.eventReceived);
            try {
                Thread.sleep(this.startAfterTimeMS);
            } catch (InterruptedException e2) {
            }
            try {
                if (ThriftTestServer.this.thriftDataReceiver != null) {
                    ThriftTestServer.this.thriftDataReceiver.start(DataPublisherTestUtil.LOCAL_HOST);
                } else {
                    ThriftTestServer.this.start(this.port);
                }
            } catch (DataBridgeException e3) {
                ThriftTestServer.this.log.error(e3);
            }
        }
    }

    public void startTestServer() throws DataBridgeException, InterruptedException {
        ThriftTestServer thriftTestServer = new ThriftTestServer();
        thriftTestServer.start(7611);
        Thread.sleep(100000000L);
        thriftTestServer.stop();
    }

    public void addStreamDefinition(StreamDefinition streamDefinition, int i) throws StreamDefinitionStoreException {
        this.streamDefinitionStore.saveStreamDefinitionToStore(streamDefinition, i);
    }

    public void addStreamDefinition(String str, int i) throws StreamDefinitionStoreException, MalformedStreamDefinitionException {
        getStreamDefinitionStore().saveStreamDefinitionToStore(EventDefinitionConverterUtils.convertFromJson(str), i);
    }

    private InMemoryStreamDefinitionStore getStreamDefinitionStore() {
        if (this.streamDefinitionStore == null) {
            this.streamDefinitionStore = new InMemoryStreamDefinitionStore();
        }
        return this.streamDefinitionStore;
    }

    public void start(int i) throws DataBridgeException {
        DataPublisherTestUtil.setKeyStoreParams();
        this.streamDefinitionStore = getStreamDefinitionStore();
        this.numberOfEventsReceived = new AtomicInteger(0);
        this.databridge = new DataBridge(new AuthenticationHandler() { // from class: org.wso2.ppaas.python.cartridge.agent.integration.common.ThriftTestServer.1
            public boolean authenticate(String str, String str2) {
                ThriftTestServer.this.log.info("Thrift authentication returning true");
                return true;
            }

            public String getTenantDomain(String str) {
                return "admin";
            }

            public int getTenantId(String str) throws UserStoreException {
                return -1234;
            }

            public void initContext(AgentSession agentSession) {
                ThriftTestServer.this.log.info("Initializing Thrift agent context");
            }

            public void destroyContext(AgentSession agentSession) {
            }
        }, this.streamDefinitionStore, DataPublisherTestUtil.getDataBridgeConfigPath());
        this.thriftDataReceiver = new ThriftDataReceiver(i, this.databridge);
        this.databridge.subscribe(new AgentCallback() { // from class: org.wso2.ppaas.python.cartridge.agent.integration.common.ThriftTestServer.2
            public void definedStream(StreamDefinition streamDefinition, int i2) {
                ThriftTestServer.this.log.info("StreamDefinition defined:" + streamDefinition);
            }

            public void removeStream(StreamDefinition streamDefinition, int i2) {
                ThriftTestServer.this.log.info("StreamDefinition removed: " + streamDefinition);
            }

            public void receive(List<Event> list, Credentials credentials) {
                ThriftTestServer.this.numberOfEventsReceived.addAndGet(list.size());
                ThriftTestServer.this.log.info("Number of received events: " + ThriftTestServer.this.numberOfEventsReceived);
            }
        });
        this.log.info("Test Server starting on " + DataPublisherTestUtil.LOCAL_HOST);
        this.thriftDataReceiver.start(DataPublisherTestUtil.LOCAL_HOST);
        this.log.info("Test Server Started");
    }

    public int getNumberOfEventsReceived() {
        if (this.numberOfEventsReceived != null) {
            return this.numberOfEventsReceived.get();
        }
        return 0;
    }

    public void resetReceivedEvents() {
        this.numberOfEventsReceived.set(0);
    }

    public void stop() {
        this.thriftDataReceiver.stop();
        this.log.info("Test Server Stopped");
    }

    public void stopAndStartDuration(int i, long j, long j2) throws SocketException, DataBridgeException {
        this.restarterThread = new RestarterThread(i, j, j2);
        new Thread(this.restarterThread).start();
    }

    public int getEventsReceivedBeforeLastRestart() {
        return this.restarterThread.eventReceived;
    }

    public ThriftDataReceiver getThriftDataReceiver() {
        return this.thriftDataReceiver;
    }

    public RestarterThread getRestarterThread() {
        return this.restarterThread;
    }

    public DataBridge getDatabridge() {
        return this.databridge;
    }
}
