package org.apache.sysds.runtime.controlprogram.federated.monitoring.services;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.WorkerController;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.DataObjectModel;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.EventModel;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.EventStageModel;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.HeavyHitterModel;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.RequestModel;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.StatisticsModel;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.TrafficModel;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.WorkerModel;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories.Constants;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories.DerbyRepository;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories.IRepository;

/* loaded from: input_file:org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.class */
public class WorkerService {
    protected static final Log LOG = LogFactory.getLog(WorkerController.class.getName());
    private static final IRepository entityRepository = new DerbyRepository();
    private static final Map<Long, Pair<String, Boolean>> cachedWorkers = new HashMap();
    private static ScheduledExecutorService executorService;

    public WorkerService() {
        startStatsCollectionProcess(1, ConfigurationManager.getDMLConfig().getDoubleValue(DMLConfig.FEDERATED_MONITOR_FREQUENCY));
    }

    public Long create(WorkerModel workerModel) {
        long longValue = entityRepository.createEntity(workerModel).longValue();
        workerModel.id = Long.valueOf(longValue);
        updateCachedWorkers(List.of(workerModel), false);
        return Long.valueOf(longValue);
    }

    public void update(WorkerModel workerModel) {
        entityRepository.updateEntity(workerModel);
        updateCachedWorkers(List.of(workerModel), false);
    }

    public void remove(Long l) {
        entityRepository.removeEntity(l, WorkerModel.class);
        updateCachedWorkers(List.of(new WorkerModel(l)), true);
    }

    public WorkerModel get(Long l) {
        WorkerModel workerModel = (WorkerModel) entityRepository.getEntity(l, WorkerModel.class);
        updateCachedWorkers(List.of(workerModel), false);
        return workerModel;
    }

    public List<WorkerModel> getAll() {
        List<WorkerModel> allEntities = entityRepository.getAllEntities(WorkerModel.class);
        updateCachedWorkers(allEntities, false);
        return allEntities;
    }

    public Boolean getWorkerOnlineStatus(Long l) {
        return (Boolean) cachedWorkers.get(l).getRight();
    }

    private static synchronized void updateCachedWorkers(List<WorkerModel> list, boolean z) {
        if (z) {
            Iterator<WorkerModel> it = list.iterator();
            while (it.hasNext()) {
                cachedWorkers.remove(it.next().id);
            }
            return;
        }
        for (WorkerModel workerModel : list) {
            if (cachedWorkers.containsKey(workerModel.id)) {
                cachedWorkers.replace(workerModel.id, new MutablePair(workerModel.address, (Boolean) cachedWorkers.get(workerModel.id).getRight()));
            } else {
                cachedWorkers.put(workerModel.id, new MutablePair(workerModel.address, false));
            }
        }
    }

    private static synchronized void startStatsCollectionProcess(int i, double d) {
        if (executorService == null) {
            executorService = Executors.newScheduledThreadPool(i);
            executorService.scheduleAtFixedRate(syncWorkerStatisticsRunnable(), 0L, Math.round(d * 1000.0d), TimeUnit.MILLISECONDS);
        }
    }

    public static void syncWorkerStatisticsWithDB(StatisticsModel statisticsModel, Long l) {
        if (statisticsModel == null) {
            cachedWorkers.get(l).setValue(false);
            return;
        }
        cachedWorkers.get(l).setValue(true);
        if (statisticsModel.utilization != null) {
            CompletableFuture.runAsync(() -> {
                entityRepository.createEntity(statisticsModel.utilization.get(0));
            });
        }
        if (statisticsModel.traffic != null) {
            CompletableFuture.runAsync(() -> {
                for (TrafficModel trafficModel : statisticsModel.traffic) {
                    if (trafficModel.coordinatorId.longValue() > 0) {
                        entityRepository.createEntity(trafficModel);
                    }
                }
            });
        }
        if (statisticsModel.events != null) {
            for (EventModel eventModel : statisticsModel.events) {
                if (eventModel.coordinatorId.longValue() > 0) {
                    CompletableFuture.runAsync(() -> {
                        Long createEntity = entityRepository.createEntity(eventModel);
                        for (EventStageModel eventStageModel : eventModel.stages) {
                            eventStageModel.eventId = createEntity;
                            entityRepository.createEntity(eventStageModel);
                        }
                    });
                }
            }
        }
        if (statisticsModel.dataObjects != null) {
            CompletableFuture.runAsync(() -> {
                entityRepository.removeAllEntitiesByField(Constants.ENTITY_WORKER_ID_COL, l, DataObjectModel.class);
                Iterator<DataObjectModel> it = statisticsModel.dataObjects.iterator();
                while (it.hasNext()) {
                    entityRepository.createEntity(it.next());
                }
            });
        }
        if (statisticsModel.requests != null) {
            CompletableFuture.runAsync(() -> {
                entityRepository.removeAllEntitiesByField(Constants.ENTITY_WORKER_ID_COL, l, RequestModel.class);
                for (RequestModel requestModel : statisticsModel.requests) {
                    if (requestModel.coordinatorId.longValue() > 0) {
                        entityRepository.createEntity(requestModel);
                    }
                }
            });
        }
        if (statisticsModel.heavyHitters != null) {
            CompletableFuture.runAsync(() -> {
                if (statisticsModel.heavyHitters.isEmpty()) {
                    return;
                }
                entityRepository.removeAllEntitiesByField(Constants.ENTITY_WORKER_ID_COL, l, HeavyHitterModel.class);
                Iterator<HeavyHitterModel> it = statisticsModel.heavyHitters.iterator();
                while (it.hasNext()) {
                    entityRepository.createEntity(it.next());
                }
            });
        }
    }

    private static Runnable syncWorkerStatisticsRunnable() {
        return () -> {
            for (Map.Entry<Long, Pair<String, Boolean>> entry : cachedWorkers.entrySet()) {
                Long key = entry.getKey();
                String str = (String) entry.getValue().getLeft();
                CompletableFuture.supplyAsync(() -> {
                    return StatisticsService.getWorkerStatistics(key, str);
                }).thenAcceptAsync(statisticsModel -> {
                    syncWorkerStatisticsWithDB(statisticsModel, key);
                });
            }
        };
    }
}
