package io.advantageous.qbit.metrics;

import io.advantageous.boon.core.Pair;
import io.advantageous.boon.core.Str;
import io.advantageous.qbit.GlobalConstants;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.service.ServiceProxyUtils;
import io.advantageous.qbit.service.discovery.EndpointDefinition;
import io.advantageous.qbit.service.discovery.ServiceChangedEventChannel;
import io.advantageous.qbit.service.discovery.ServiceDiscovery;
import io.advantageous.qbit.service.discovery.ServicePool;
import io.advantageous.qbit.service.discovery.ServicePoolListener;
import io.advantageous.qbit.util.Timer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/advantageous/qbit/metrics/ClusteredStatReplicator.class */
public class ClusteredStatReplicator implements StatReplicator, ServiceChangedEventChannel {
    private final ServiceDiscovery serviceDiscovery;
    private final StatReplicatorProvider statReplicatorProvider;
    private final ConcurrentHashMap<String, Pair<EndpointDefinition, StatReplicator>> replicatorsMap = new ConcurrentHashMap<>();
    private final Logger logger = LoggerFactory.getLogger(ClusteredStatReplicator.class);
    private final boolean debug;
    private final boolean trace;
    private final String serviceName;
    private final ServicePool servicePool;
    private final String localServiceId;
    private final Timer timer;
    private final int tallyInterval;
    private final int flushInterval;
    private final ConcurrentHashMap<String, LocalCount> countMap;
    private long currentTime;
    private long lastReconnectTime;
    private long lastSendTime;
    private long lastReplicatorFlush;
    private List<Pair<EndpointDefinition, StatReplicator>> statReplicators;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/advantageous/qbit/metrics/ClusteredStatReplicator$LocalCount.class */
    public static final class LocalCount {
        long count;
        String name;

        LocalCount() {
        }
    }

    public ClusteredStatReplicator(String str, ServiceDiscovery serviceDiscovery, StatReplicatorProvider statReplicatorProvider, String str2, Timer timer, int i, int i2) {
        this.debug = GlobalConstants.DEBUG || this.logger.isDebugEnabled();
        this.trace = this.logger.isTraceEnabled();
        this.countMap = new ConcurrentHashMap<>();
        this.lastReplicatorFlush = 0L;
        this.statReplicators = new ArrayList();
        this.serviceDiscovery = serviceDiscovery;
        this.statReplicatorProvider = statReplicatorProvider;
        this.serviceName = str;
        this.localServiceId = str2;
        this.servicePool = new ServicePool(str, (ServicePoolListener) null);
        this.timer = timer;
        this.tallyInterval = i;
        this.flushInterval = i2;
    }

    @Override // io.advantageous.qbit.metrics.StatReplicator
    public void replicateCount(String str, long j, long j2) {
        if (this.trace) {
            this.logger.trace(Str.sputs(new Object[]{"ClusteredStatReplicator::replicateCount()", this.serviceName, str, Long.valueOf(j), Long.valueOf(j2)}));
        }
        if (this.debug && this.statReplicators.size() == 0) {
            this.logger.debug(Str.sputs(new Object[]{"ClusteredStatReplicator::replicateCount", str, Long.valueOf(j), Long.valueOf(j2)}));
        }
        LocalCount localCount = this.countMap.get(str);
        if (localCount == null) {
            localCount = new LocalCount();
            localCount.name = str;
            this.countMap.put(str, localCount);
        }
        localCount.count += j;
    }

    @Override // io.advantageous.qbit.metrics.StatReplicator
    public void replicateLevel(String str, long j, long j2) {
        LocalCount localCount = this.countMap.get(str);
        if (localCount == null) {
            localCount = new LocalCount();
            localCount.name = str;
            this.countMap.put(str, localCount);
        }
        localCount.count = j;
    }

    @Override // io.advantageous.qbit.metrics.StatReplicator
    public void replicateTiming(String str, long j, long j2) {
        LocalCount localCount = this.countMap.get(str);
        if (localCount == null) {
            localCount = new LocalCount();
            localCount.name = str;
            this.countMap.put(str, localCount);
        }
        localCount.count = j;
    }

    private void doRecordCount(Pair<EndpointDefinition, StatReplicator> pair, String str, long j, long j2) {
        try {
            ((StatReplicator) pair.getSecond()).replicateCount(str, j, j2);
        } catch (Exception e) {
            this.logger.error(Str.sputs(new Object[]{"ClusteredStatReplicator::Replicator failed", pair}), e);
        }
    }

    @QueueCallback({QueueCallbackType.IDLE, QueueCallbackType.EMPTY, QueueCallbackType.LIMIT})
    void process() {
        this.currentTime = this.timer.now();
        sendIfNeeded();
        checkForReconnect();
    }

    private void sendIfNeeded() {
        if (this.currentTime - this.lastSendTime > this.tallyInterval) {
            this.lastSendTime = this.currentTime;
            for (LocalCount localCount : this.countMap.values()) {
                if (localCount.count > 0) {
                    this.statReplicators.forEach(pair -> {
                        doRecordCount(pair, localCount.name, localCount.count, this.currentTime);
                    });
                }
                localCount.count = 0L;
            }
            if (this.countMap.size() > 10000000) {
                this.countMap.clear();
            }
            flushReplicatorsAll();
        }
    }

    private void flushReplicatorsAll() {
        if (this.currentTime - this.lastReplicatorFlush > this.flushInterval) {
            this.lastReplicatorFlush = this.currentTime;
            ArrayList arrayList = new ArrayList();
            this.statReplicators.forEach(pair -> {
                flushReplicator(pair, arrayList);
            });
            arrayList.forEach(pair2 -> {
                try {
                    ((StatReplicator) pair2.getSecond()).stop();
                } catch (Exception e) {
                    this.logger.info("Failed to stop failed node", e);
                }
                this.statReplicators.remove(pair2);
                this.replicatorsMap.remove(((EndpointDefinition) pair2.getFirst()).getId());
            });
            if (this.trace) {
                this.logger.trace(Str.sputs(new Object[]{"ClusteredStatReplicator::flushReplicatorsAll()", Integer.valueOf(arrayList.size())}));
                arrayList.forEach(pair3 -> {
                    this.logger.debug(Str.sputs(new Object[]{pair3}));
                });
            }
        }
    }

    private void checkForReconnect() {
        if (this.currentTime - this.lastReconnectTime > 60000) {
            doCheckReconnect();
        }
    }

    public void doCheckReconnect() {
        this.lastReconnectTime = this.currentTime;
        List services = this.servicePool.services();
        if (services.size() - 1 != this.statReplicators.size()) {
            this.logger.info(Str.sputs(new Object[]{"DOING RECONNECT", Integer.valueOf(services.size() - 1), Integer.valueOf(this.statReplicators.size())}));
            shutDownReplicators();
            services.forEach(this::addService);
        }
    }

    private void shutDownReplicators() {
        if (this.debug) {
            this.logger.debug("Shutting down replicators");
        }
        Iterator<Pair<EndpointDefinition, StatReplicator>> it = this.statReplicators.iterator();
        while (it.hasNext()) {
            try {
                ((StatReplicator) it.next().getSecond()).stop();
            } catch (Exception e) {
                this.logger.debug("Shutdown replicator failed", e);
            }
            if (this.debug) {
                this.logger.debug("Shutting down replicator");
            }
        }
        this.statReplicators.clear();
        this.replicatorsMap.clear();
    }

    private void flushReplicator(Pair<EndpointDefinition, StatReplicator> pair, List<Pair<EndpointDefinition, StatReplicator>> list) {
        try {
            ServiceProxyUtils.flushServiceProxy(pair.getSecond());
        } catch (Exception e) {
            list.add(pair);
            this.logger.info("Replicator failed " + pair, e);
        }
    }

    public void servicePoolChanged(String str) {
        if (this.serviceName.equals(str)) {
            this.logger.info("ClusteredStatReplicator::servicePoolChanged({})", str);
            updateServicePool(str);
        } else if (this.debug) {
            this.logger.debug("ClusteredStatReplicator::servicePoolChanged({})", "got event for another service", str);
        }
    }

    private void updateServicePool(String str) {
        try {
            this.servicePool.setHealthyNodes(this.serviceDiscovery.loadServices(str), new ServicePoolListener() { // from class: io.advantageous.qbit.metrics.ClusteredStatReplicator.1
                public void servicePoolChanged(String str2) {
                }

                public void serviceAdded(String str2, EndpointDefinition endpointDefinition) {
                    ClusteredStatReplicator.this.addService(endpointDefinition);
                }

                public void serviceRemoved(String str2, EndpointDefinition endpointDefinition) {
                    ClusteredStatReplicator.this.removeService(endpointDefinition);
                }
            });
        } catch (Exception e) {
            this.logger.error("Error updating service pool");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeService(EndpointDefinition endpointDefinition) {
        this.logger.info(Str.sputs(new Object[]{"ClusteredStatReplicator::removeService()", this.serviceName, endpointDefinition, " replicator count ", Integer.valueOf(this.replicatorsMap.size())}));
        Pair<EndpointDefinition, StatReplicator> pair = this.replicatorsMap.get(endpointDefinition.getId());
        if (pair == null) {
            this.logger.error(Str.sputs(new Object[]{"ClusteredStatReplicator::removeService() Trying to remove a service that we are not managing", this.serviceName, "END POINT ID", endpointDefinition.getId(), " replicator count ", Integer.valueOf(this.replicatorsMap.size())}));
            return;
        }
        if (pair.getSecond() == null) {
            this.logger.error(Str.sputs(new Object[]{"ClusteredStatReplicator::removeService() Trying to remove a service that we are nto managingand the getSecond() is null", this.serviceName, "END POINT ID", endpointDefinition.getId(), " replicator count ", Integer.valueOf(this.replicatorsMap.size())}));
            return;
        }
        try {
            ((StatReplicator) pair.getSecond()).stop();
        } catch (Exception e) {
            this.logger.error("Unable to stop service endpoint that was removed " + endpointDefinition, e);
        }
        this.replicatorsMap.remove(endpointDefinition.getId());
        this.statReplicators = new ArrayList(this.replicatorsMap.values());
        this.logger.info(Str.sputs(new Object[]{"ClusteredStatReplicator::removeService() removed", this.serviceName, endpointDefinition, " replicator count ", Integer.valueOf(this.replicatorsMap.size())}));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addService(EndpointDefinition endpointDefinition) {
        if (endpointDefinition.getId().equals(this.localServiceId)) {
            return;
        }
        this.logger.info(Str.sputs(new Object[]{"ClusteredStatReplicator::addService()", endpointDefinition, " replicator count ", Integer.valueOf(this.replicatorsMap.size())}));
        this.replicatorsMap.put(endpointDefinition.getId(), Pair.pair(endpointDefinition, this.statReplicatorProvider.provide(endpointDefinition)));
        this.statReplicators = new ArrayList(this.replicatorsMap.values());
        this.logger.info(Str.sputs(new Object[]{"ClusteredStatReplicator::addService() added", this.serviceName, endpointDefinition, " replicator count ", Integer.valueOf(this.replicatorsMap.size())}));
    }

    public void flush() {
        process();
    }
}
