/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.am.integration.test.utils.thrift;

import java.net.SocketException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.am.integration.test.utils.thrift.DataPublisherTestUtil;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.utils.EventDefinitionConverterUtils;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.DataBridge;
import org.wso2.carbon.databridge.core.DataBridgeReceiverService;
import org.wso2.carbon.databridge.core.Utils.AgentSession;
import org.wso2.carbon.databridge.core.definitionstore.AbstractStreamDefinitionStore;
import org.wso2.carbon.databridge.core.definitionstore.InMemoryStreamDefinitionStore;
import org.wso2.carbon.databridge.core.exception.DataBridgeException;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.databridge.core.internal.authentication.AuthenticationHandler;
import org.wso2.carbon.databridge.receiver.thrift.ThriftDataReceiver;
import org.wso2.carbon.user.api.UserStoreException;

public class DASThriftTestServer {
    private Log log = LogFactory.getLog(DASThriftTestServer.class);
    private ThriftDataReceiver thriftDataReceiver;
    private InMemoryStreamDefinitionStore streamDefinitionStore;
    private AtomicInteger numberOfEventsReceived;
    private ReStarterThread reStarterThread;
    private Map<String, List<Event>> dataTables = new HashMap<String, List<Event>>();
    private int tenantId;
    private String tenantDomain;

    public void setTenantId(int tenantId) {
        this.tenantId = tenantId;
    }

    public void setTenantDomain(String tenantDomain) {
        this.tenantDomain = tenantDomain;
    }

    public void clearTables() {
        this.dataTables.clear();
    }

    public Map<String, List<Event>> getDataTables() {
        return this.dataTables;
    }

    public void resetTables() {
        this.dataTables.clear();
    }

    public void addStreamDefinition(StreamDefinition streamDefinition, int tenantId) throws StreamDefinitionStoreException {
        this.streamDefinitionStore.saveStreamDefinitionToStore(streamDefinition, tenantId);
    }

    public void addStreamDefinition(String streamDefinitionStr, int tenantId) throws StreamDefinitionStoreException, MalformedStreamDefinitionException {
        StreamDefinition streamDefinition = EventDefinitionConverterUtils.convertFromJson((String)streamDefinitionStr);
        this.getStreamDefinitionStore().saveStreamDefinitionToStore(streamDefinition, tenantId);
    }

    private InMemoryStreamDefinitionStore getStreamDefinitionStore() {
        if (this.streamDefinitionStore == null) {
            this.streamDefinitionStore = new InMemoryStreamDefinitionStore();
        }
        return this.streamDefinitionStore;
    }

    public void start(int receiverPort) throws DataBridgeException {
        DataPublisherTestUtil.setKeyStoreParams();
        this.streamDefinitionStore = this.getStreamDefinitionStore();
        this.numberOfEventsReceived = new AtomicInteger(0);
        DataBridge databridge = new DataBridge(new AuthenticationHandler(){

            public boolean authenticate(String userName, String password) {
                return true;
            }

            public String getTenantDomain(String userName) {
                return DASThriftTestServer.this.tenantDomain;
            }

            public int getTenantId(String tenantDomain) throws UserStoreException {
                return DASThriftTestServer.this.tenantId;
            }

            public void initContext(AgentSession agentSession) {
            }

            public void destroyContext(AgentSession agentSession) {
            }
        }, (AbstractStreamDefinitionStore)this.streamDefinitionStore, DataPublisherTestUtil.getDataBridgeConfigPath());
        this.thriftDataReceiver = new ThriftDataReceiver(receiverPort, (DataBridgeReceiverService)databridge);
        databridge.subscribe(new AgentCallback(){
            int totalSize = 0;

            public void definedStream(StreamDefinition streamDefinition, int tenantId) {
            }

            public void removeStream(StreamDefinition streamDefinition, int tenantId) {
                DASThriftTestServer.this.log.info((Object)("StreamDefinition remove " + streamDefinition));
            }

            public void receive(List<Event> eventList, Credentials credentials) {
                for (Event event : eventList) {
                    String streamKey = event.getStreamId();
                    if (!DASThriftTestServer.this.dataTables.containsKey(streamKey)) {
                        DASThriftTestServer.this.dataTables.put(streamKey, new ArrayList());
                    }
                    ((List)DASThriftTestServer.this.dataTables.get(streamKey)).add(event);
                    DASThriftTestServer.this.log.info((Object)("===  " + event.toString()));
                }
                DASThriftTestServer.this.numberOfEventsReceived.addAndGet(eventList.size());
                DASThriftTestServer.this.log.info((Object)("Received events : " + DASThriftTestServer.this.numberOfEventsReceived));
            }
        });
        String address = "localhost";
        this.log.info((Object)("DAS Test Thrift Server starting on " + address));
        this.thriftDataReceiver.start(address);
        this.log.info((Object)"DAS Test Thrift Server Started");
    }

    public int getNumberOfEventsReceived() {
        if (this.numberOfEventsReceived != null) {
            return this.numberOfEventsReceived.get();
        }
        return 0;
    }

    public void resetReceivedEvents() {
        this.numberOfEventsReceived.set(0);
    }

    public void stop() {
        this.thriftDataReceiver.stop();
        this.log.info((Object)"DAS Test Thrift Server Stopped");
    }

    public void stopAndStartDuration(int port, long stopAfterTimeMilliSeconds, long startAfterTimeMS) throws SocketException, DataBridgeException {
        this.reStarterThread = new ReStarterThread(port, stopAfterTimeMilliSeconds, startAfterTimeMS);
        Thread thread = new Thread(this.reStarterThread);
        thread.start();
    }

    public int getEventsReceivedBeforeLastRestart() {
        return this.reStarterThread.eventReceived;
    }

    class ReStarterThread
    implements Runnable {
        int eventReceived;
        int port;
        long stopAfterTimeMilliSeconds;
        long startAfterTimeMS;

        ReStarterThread(int port, long stopAfterTime, long startAfterTime) {
            this.port = port;
            this.stopAfterTimeMilliSeconds = stopAfterTime;
            this.startAfterTimeMS = startAfterTime;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(this.stopAfterTimeMilliSeconds);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            if (DASThriftTestServer.this.thriftDataReceiver != null) {
                DASThriftTestServer.this.thriftDataReceiver.stop();
            }
            this.eventReceived = DASThriftTestServer.this.getNumberOfEventsReceived();
            DASThriftTestServer.this.log.info((Object)("Number of events received in server shutdown :" + this.eventReceived));
            try {
                Thread.sleep(this.startAfterTimeMS);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            try {
                if (DASThriftTestServer.this.thriftDataReceiver != null) {
                    DASThriftTestServer.this.thriftDataReceiver.start("localhost");
                } else {
                    DASThriftTestServer.this.start(this.port);
                }
            }
            catch (DataBridgeException e) {
                DASThriftTestServer.this.log.error((Object)e);
            }
        }
    }
}

