package org.wso2.carbon.stream.processor.core.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response;
import org.apache.log4j.Logger;
import org.wso2.carbon.stream.processor.core.api.HaApiService;
import org.wso2.carbon.stream.processor.core.api.NotFoundException;
import org.wso2.carbon.stream.processor.core.ha.HACoordinationRecordTableHandler;
import org.wso2.carbon.stream.processor.core.ha.HACoordinationSinkHandler;
import org.wso2.carbon.stream.processor.core.internal.SiddhiAppData;
import org.wso2.carbon.stream.processor.core.internal.StreamProcessorDataHolder;
import org.wso2.carbon.stream.processor.core.model.OutputSyncTimestampCollection;
import org.wso2.carbon.stream.processor.core.model.OutputSyncTimestamps;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.util.snapshot.PersistenceReference;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/impl/HaApiServiceImpl.class */
public class HaApiServiceImpl extends HaApiService {
    private static final Logger log = Logger.getLogger(HaApiServiceImpl.class);

    @Override // org.wso2.carbon.stream.processor.core.api.HaApiService
    public Response haOutputSyncTimestampGet() throws NotFoundException {
        Map registeredSinkHandlers = StreamProcessorDataHolder.getSinkHandlerManager().getRegisteredSinkHandlers();
        ArrayList arrayList = new ArrayList();
        if (registeredSinkHandlers.size() > 0) {
            for (Map.Entry entry : registeredSinkHandlers.entrySet()) {
                arrayList.add(new OutputSyncTimestamps((String) entry.getKey(), Long.toString(((HACoordinationSinkHandler) entry.getValue()).getActiveNodeLastPublishedTimestamp())));
            }
        }
        Map registeredRecordTableHandlers = StreamProcessorDataHolder.getRecordTableHandlerManager().getRegisteredRecordTableHandlers();
        ArrayList arrayList2 = new ArrayList();
        if (registeredRecordTableHandlers.size() > 0) {
            for (Map.Entry entry2 : registeredRecordTableHandlers.entrySet()) {
                arrayList2.add(new OutputSyncTimestamps((String) entry2.getKey(), Long.toString(((HACoordinationRecordTableHandler) entry2.getValue()).getActiveNodeLastOperationTimestamp())));
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Active Node: Sending back last published event's timestamp of " + registeredSinkHandlers.size() + " sinks and timestamps of last operation's of " + registeredRecordTableHandlers.size() + " record tables");
        }
        return Response.ok().entity(new OutputSyncTimestampCollection(arrayList, arrayList2)).build();
    }

    @Override // org.wso2.carbon.stream.processor.core.api.HaApiService
    public Response haStateGet() throws NotFoundException, IOException {
        try {
            for (Map.Entry<String, SiddhiAppData> entry : StreamProcessorDataHolder.getStreamProcessorService().getSiddhiAppMap().entrySet()) {
                SiddhiAppRuntime siddhiAppRuntime = entry.getValue().getSiddhiAppRuntime();
                if (siddhiAppRuntime != null) {
                    PersistenceReference persist = siddhiAppRuntime.persist();
                    Future fullStateFuture = persist.getFullStateFuture();
                    if (fullStateFuture != null) {
                        fullStateFuture.get(StreamProcessorDataHolder.getDeploymentConfig().getLiveSync().getStateSyncTimeout(), TimeUnit.MILLISECONDS);
                    } else {
                        Iterator it = persist.getIncrementalStateFuture().iterator();
                        while (it.hasNext()) {
                            ((Future) it.next()).get(StreamProcessorDataHolder.getDeploymentConfig().getLiveSync().getStateSyncTimeout(), TimeUnit.MILLISECONDS);
                        }
                    }
                } else {
                    log.error("Active Node: Persisting of Siddhi app " + entry.getValue() + " not successful. Check if app deployed properly");
                }
            }
            log.info("Active Node: Persisting of all Siddhi Applications on request of passive node successful");
            return Response.ok().entity(Response.Status.OK).build();
        } catch (Exception e) {
            log.error("Error while persisting siddhi applications. " + e.getMessage(), e);
            return Response.serverError().build();
        }
    }

    @Override // org.wso2.carbon.stream.processor.core.api.HaApiService
    public Response haStateGet(String str) throws NotFoundException, IOException {
        try {
            SiddhiAppData siddhiAppData = StreamProcessorDataHolder.getStreamProcessorService().getSiddhiAppMap().get(str);
            if (siddhiAppData == null) {
                log.warn("Siddhi application " + str + " may not be deployed in active node yet but requested to be persisted from passive node");
                return Response.status(Response.Status.NOT_FOUND).build();
            }
            PersistenceReference persist = siddhiAppData.getSiddhiAppRuntime().persist();
            Future fullStateFuture = persist.getFullStateFuture();
            if (fullStateFuture != null) {
                fullStateFuture.get(60000L, TimeUnit.MILLISECONDS);
            } else {
                Iterator it = persist.getIncrementalStateFuture().iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get(60000L, TimeUnit.MILLISECONDS);
                }
            }
            log.info("Active Node: Persisting of " + str + " on request of passive node successfull");
            return Response.ok().entity(Response.Status.OK).build();
        } catch (Exception e) {
            log.error("Error while persisting " + str + ". " + e.getMessage(), e);
            return Response.serverError().build();
        }
    }
}
