package com.netflix.turbine.streaming;

import com.netflix.turbine.data.DataFromSingleInstance;
import com.netflix.turbine.data.TurbineData;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.handler.PerformanceCriteria;
import com.netflix.turbine.handler.TurbineDataHandler;
import com.netflix.turbine.streaming.RelevanceKey;
import com.netflix.turbine.streaming.servlet.TurbineStreamServlet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectWriter;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/turbine/streaming/TurbineStreamingConnection.class */
public class TurbineStreamingConnection<T extends TurbineData> implements TurbineDataHandler<T> {
    private static final Logger logger = LoggerFactory.getLogger(TurbineStreamingConnection.class);
    private static final Object CurrentTime = "currentTime";
    protected final StreamingDataHandler streamHandler;
    protected int streamingDelay;
    protected final Set<String> filterPrefixes;
    private final Set<String> statsTypeFilter;
    private final Set<String> dataNames;
    private final ObjectWriter objectWriter;
    private volatile boolean stopMonitoring = false;
    private volatile long lastEvent = -1;
    protected ConcurrentHashMap<String, Object> streamingConnectionSession = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, String> dataHash = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, AtomicLong> lastOutputForTypeCache = new ConcurrentHashMap<>();
    protected final String name = "StreamingHandler_" + UUID.randomUUID().toString();
    private final PerformanceCriteria perfCriteria = new BroweserPerfCriteria();
    private final Map<String, TurbineStreamingConnection<T>.RelevantMetrics> relevantMetrics = new HashMap();

    /* loaded from: input_file:com/netflix/turbine/streaming/TurbineStreamingConnection$BroweserPerfCriteria.class */
    private class BroweserPerfCriteria implements PerformanceCriteria {
        private BroweserPerfCriteria() {
        }

        @Override // com.netflix.turbine.handler.PerformanceCriteria
        public boolean isCritical() {
            return false;
        }

        @Override // com.netflix.turbine.handler.PerformanceCriteria
        public int getMaxQueueSize() {
            return 1000;
        }

        @Override // com.netflix.turbine.handler.PerformanceCriteria
        public int numThreads() {
            return 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/netflix/turbine/streaming/TurbineStreamingConnection$RelevantMetrics.class */
    public class RelevantMetrics {
        private final RelevanceConfig config;
        private AtomicReference<Set<String>> topN;
        private AtomicReference<Set<String>> deletedSet;
        private final AtomicReference<State> state = new AtomicReference<>(State.DoNotSort);
        private ConcurrentHashMap<String, TurbineData> dataMap = new ConcurrentHashMap<>();

        public RelevantMetrics(RelevanceConfig relevanceConfig) {
            this.config = relevanceConfig;
            if (this.config != null) {
                TurbineStreamingConnection.logger.info("Relevance metrics are enabled: " + this.config.toString());
                this.topN = new AtomicReference<>(null);
                this.deletedSet = new AtomicReference<>(null);
            }
        }

        public void put(String str, TurbineData turbineData) {
            this.dataMap.put(str, turbineData);
        }

        public boolean sort() {
            ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet(new RelevanceKey.RelevanceComparator());
            for (String str : this.dataMap.keySet()) {
                concurrentSkipListSet.add(new RelevanceKey(str, this.config.items, this.dataMap.get(str).getNumericAttributes()));
            }
            HashSet hashSet = new HashSet();
            Iterator descendingIterator = concurrentSkipListSet.descendingIterator();
            for (int i = 0; descendingIterator.hasNext() && i < this.config.topN; i++) {
                hashSet.add(((RelevanceKey) descendingIterator.next()).getName());
            }
            Set<String> set = this.topN.get();
            this.topN.set(hashSet);
            this.deletedSet.set(null);
            if (set == null) {
                set = this.dataMap.keySet();
            }
            HashSet hashSet2 = new HashSet(set);
            hashSet2.removeAll(hashSet);
            if (hashSet2.size() <= 0) {
                return false;
            }
            this.deletedSet.set(hashSet2);
            return true;
        }

        public boolean isInTopN(String str) {
            Set<String> set = this.topN.get();
            if (set == null || set.size() == 0) {
                return true;
            }
            return set.contains(str);
        }

        public String toString() {
            return this.config.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/turbine/streaming/TurbineStreamingConnection$State.class */
    public enum State {
        DoNotSort,
        StartSorting,
        Sorted
    }

    /* loaded from: input_file:com/netflix/turbine/streaming/TurbineStreamingConnection$UnitTest.class */
    public static class UnitTest {
        @Test
        public void testFilterStream() throws Exception {
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final AtomicInteger atomicInteger2 = new AtomicInteger(0);
            final AtomicInteger atomicInteger3 = new AtomicInteger(0);
            final AtomicReference atomicReference = new AtomicReference(null);
            final HashMap hashMap = new HashMap();
            final Instance instance = new Instance("host", "cluster", true);
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            StreamingDataHandler streamingDataHandler = new StreamingDataHandler() { // from class: com.netflix.turbine.streaming.TurbineStreamingConnection.UnitTest.1
                @Override // com.netflix.turbine.streaming.StreamingDataHandler
                public void writeData(String str) throws Exception {
                    System.out.println("Data: " + str);
                    if (atomicBoolean.get()) {
                        throw new RuntimeException("stop!");
                    }
                    atomicInteger.incrementAndGet();
                    atomicReference.set(str);
                }

                @Override // com.netflix.turbine.streaming.StreamingDataHandler
                public void deleteData(String str, Set<String> set) throws Exception {
                    if (atomicBoolean.get()) {
                        throw new RuntimeException("stop!");
                    }
                    atomicInteger3.incrementAndGet();
                }

                @Override // com.netflix.turbine.streaming.StreamingDataHandler
                public void noData() throws Exception {
                    if (atomicBoolean.get()) {
                        throw new RuntimeException("stop!");
                    }
                    atomicInteger2.incrementAndGet();
                }
            };
            ArrayList arrayList = new ArrayList();
            arrayList.add(TurbineStreamServlet.FilterCriteria.parseCriteria("type:testType|name:testName"));
            final TurbineStreamingConnection turbineStreamingConnection = new TurbineStreamingConnection(streamingDataHandler, arrayList, 10);
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < 3; i++) {
                arrayList2.add(newFixedThreadPool.submit(new Callable<Integer>() { // from class: com.netflix.turbine.streaming.TurbineStreamingConnection.UnitTest.2
                    final AtomicInteger count = new AtomicInteger(0);
                    final Random random = new Random();

                    /* JADX WARN: Can't rename method to resolve collision */
                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // java.util.concurrent.Callable
                    public Integer call() throws Exception {
                        while (!atomicBoolean.get()) {
                            Collection<TurbineData> randomData = getRandomData();
                            turbineStreamingConnection.handleData(randomData);
                            this.count.addAndGet(randomData.size());
                            Thread.sleep(50L);
                        }
                        return Integer.valueOf(this.count.get());
                    }

                    private Collection<TurbineData> getRandomData() {
                        int nextInt = this.random.nextInt(10);
                        ArrayList arrayList3 = new ArrayList();
                        for (int i2 = 0; i2 < nextInt; i2++) {
                            arrayList3.add(new DataFromSingleInstance(null, "testType", "testName", instance, hashMap, 0L));
                            arrayList3.add(new DataFromSingleInstance(null, "foo", "bar", instance, hashMap, 0L));
                        }
                        return arrayList3;
                    }
                }));
            }
            final Timer timer = new Timer();
            timer.schedule(new TimerTask() { // from class: com.netflix.turbine.streaming.TurbineStreamingConnection.UnitTest.3
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    atomicBoolean.set(true);
                    timer.cancel();
                }
            }, 3000L);
            turbineStreamingConnection.waitOnConnection();
            newFixedThreadPool.shutdownNow();
            int i2 = 0;
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                i2 += ((Integer) ((Future) it.next()).get()).intValue();
            }
            Assert.assertTrue(i2 > atomicInteger.get());
            Assert.assertTrue(1 == atomicInteger.get());
            Assert.assertTrue(0 == atomicInteger3.get());
            Assert.assertTrue(atomicInteger2.get() >= 1);
            Assert.assertTrue((String) atomicReference.get(), ((String) atomicReference.get()).contains("testType"));
            Assert.assertTrue(((String) atomicReference.get()).contains("testName"));
            Assert.assertFalse(((String) atomicReference.get()).contains("foo"));
            Assert.assertFalse(((String) atomicReference.get()).contains("bar"));
        }
    }

    public TurbineStreamingConnection(StreamingDataHandler streamingDataHandler, Collection<TurbineStreamServlet.FilterCriteria> collection, int i) throws Exception {
        this.streamingDelay = 100;
        this.streamHandler = streamingDataHandler;
        this.dataNames = getNameFilters(collection);
        this.statsTypeFilter = getTypeFilters(collection);
        this.filterPrefixes = getFilterPrefixes(collection);
        this.streamingDelay = i;
        Set<RelevanceConfig> relevanceConfig = getRelevanceConfig(collection);
        logger.info("Relevance config: " + relevanceConfig);
        for (RelevanceConfig relevanceConfig2 : relevanceConfig) {
            this.relevantMetrics.put(relevanceConfig2.type, new RelevantMetrics(relevanceConfig2));
        }
        logger.info("Relevance metrics config: " + this.relevantMetrics);
        this.objectWriter = new ObjectMapper().prettyPrintingWriter(new MinimalPrettyPrinter());
    }

    @Override // com.netflix.turbine.handler.TurbineDataHandler
    public String getName() {
        return this.name;
    }

    public void waitOnConnection() {
        Set<String> set;
        while (!this.stopMonitoring) {
            try {
                if (this.lastEvent > -1 && System.currentTimeMillis() - this.lastEvent > 10000) {
                    logger.info("ERROR: We haven't heard from the monitor in a while so are killing the handler and will stop streaming to client.");
                    this.stopMonitoring = true;
                }
                try {
                    this.streamHandler.noData();
                    long currentTimeMillis = System.currentTimeMillis();
                    for (TurbineStreamingConnection<T>.RelevantMetrics relevantMetrics : this.relevantMetrics.values()) {
                        while (((RelevantMetrics) relevantMetrics).state.get() == State.DoNotSort && System.currentTimeMillis() - currentTimeMillis < 3000) {
                            Thread.sleep(10L);
                        }
                        if (((RelevantMetrics) relevantMetrics).state.get() != State.DoNotSort) {
                            boolean sort = relevantMetrics.sort();
                            if (((RelevantMetrics) relevantMetrics).state.get() == State.StartSorting) {
                                Iterator it = ((Set) ((RelevantMetrics) relevantMetrics).topN.get()).iterator();
                                while (it.hasNext()) {
                                    this.streamHandler.writeData(this.objectWriter.writeValueAsString((TurbineData) ((RelevantMetrics) relevantMetrics).dataMap.get((String) it.next())));
                                }
                                ((RelevantMetrics) relevantMetrics).state.set(State.Sorted);
                            }
                            if (sort && (set = (Set) ((RelevantMetrics) relevantMetrics).deletedSet.get()) != null && set.size() > 0) {
                                this.streamHandler.deleteData(((RelevantMetrics) relevantMetrics).config.type, set);
                            }
                        }
                    }
                } catch (Exception e) {
                    if ("Broken pipe".equals(e.getMessage())) {
                        logger.debug("Broken pipe (most likely client disconnected) when writing to response stream", e);
                    } else {
                        logger.error("Got exception when writing to response stream", e);
                    }
                    this.stopMonitoring = true;
                }
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e2) {
                    this.stopMonitoring = true;
                    logger.info("Got interrupted when waiting on connection", e2);
                }
            } catch (Throwable th) {
                logger.error("Caught throwable when waiting on connection", th);
                return;
            }
        }
    }

    @Override // com.netflix.turbine.handler.TurbineDataHandler
    public void handleData(Collection<T> collection) {
        if (this.stopMonitoring) {
            return;
        }
        writeToStream(collection);
    }

    @Override // com.netflix.turbine.handler.TurbineDataHandler
    public void handleHostLost(Instance instance) {
    }

    protected void writeToStream(Collection<? extends TurbineData> collection) {
        this.lastEvent = System.currentTimeMillis();
        try {
            for (TurbineData turbineData : collection) {
                try {
                } catch (IOException e) {
                    throw e;
                } catch (Exception e2) {
                    if (turbineData == null) {
                        logger.error("Failed to process data but will continue processing in loop.", e2);
                    } else {
                        logger.error("Failed to process data with type:" + turbineData.getType() + " but will continue processing in loop.", e2);
                    }
                }
                if (this.statsTypeFilter.size() == 0 || this.statsTypeFilter.contains(turbineData.getType())) {
                    if (this.dataNames.size() == 0 || this.dataNames.contains(turbineData.getName())) {
                        if (this.filterPrefixes.size() > 0) {
                            boolean z = false;
                            Iterator<String> it = this.filterPrefixes.iterator();
                            while (true) {
                                if (!it.hasNext()) {
                                    break;
                                }
                                if (turbineData.getName().contains(it.next())) {
                                    z = true;
                                    break;
                                }
                            }
                            if (!z) {
                            }
                        }
                        String dataKey = getDataKey(turbineData);
                        if (isDataWithinStreamingWindow(dataKey)) {
                            Map<String, Object> attributes = turbineData.getAttributes();
                            attributes.remove(CurrentTime);
                            HashMap<String, Map<String, ? extends Number>> nestedMapAttributes = turbineData.getNestedMapAttributes();
                            if (nestedMapAttributes != null && nestedMapAttributes.keySet().size() > 0) {
                                for (String str : nestedMapAttributes.keySet()) {
                                    Map<String, ? extends Number> map = nestedMapAttributes.get(str);
                                    if (map != null) {
                                        attributes.put(str, map);
                                    }
                                }
                            }
                            String writeValueAsString = this.objectWriter.writeValueAsString(attributes);
                            String str2 = this.dataHash.get(dataKey);
                            if (str2 == null || !str2.equals(writeValueAsString)) {
                                this.dataHash.put(dataKey, writeValueAsString);
                                TurbineStreamingConnection<T>.RelevantMetrics relevantMetrics = this.relevantMetrics.get(turbineData.getType());
                                if (relevantMetrics != null) {
                                    relevantMetrics.put(turbineData.getName(), turbineData);
                                    if (((RelevantMetrics) relevantMetrics).state.get() == State.DoNotSort && ((RelevantMetrics) relevantMetrics).dataMap.size() >= ((RelevantMetrics) relevantMetrics).config.topN) {
                                        ((RelevantMetrics) relevantMetrics).state.compareAndSet(State.DoNotSort, State.StartSorting);
                                    }
                                    if (((RelevantMetrics) relevantMetrics).state.get() == State.Sorted && relevantMetrics.isInTopN(turbineData.getName())) {
                                        this.streamHandler.writeData(writeValueAsString);
                                    }
                                } else {
                                    this.streamHandler.writeData(writeValueAsString);
                                }
                            } else {
                                logger.debug("We skipped delivering a message since it hadn't changed: " + dataKey);
                            }
                        }
                    }
                }
            }
        } catch (IOException e3) {
            logger.debug("We lost the client connection. Will stop monitoring and streaming.", e3);
            this.stopMonitoring = true;
        } catch (Throwable th) {
            logger.error("An unknown error occurred trying to write data to client stream. Will stop monitoring and streaming.", th);
            this.stopMonitoring = true;
        }
    }

    private Set<String> getNameFilters(Collection<TurbineStreamServlet.FilterCriteria> collection) {
        HashSet hashSet = new HashSet();
        for (TurbineStreamServlet.FilterCriteria filterCriteria : collection) {
            if (filterCriteria.name != null) {
                hashSet.add(filterCriteria.name);
            }
        }
        return hashSet;
    }

    private Set<String> getTypeFilters(Collection<TurbineStreamServlet.FilterCriteria> collection) {
        HashSet hashSet = new HashSet();
        for (TurbineStreamServlet.FilterCriteria filterCriteria : collection) {
            if (filterCriteria.type != null) {
                hashSet.add(filterCriteria.type);
            }
            if (filterCriteria.relevanceConfig != null && filterCriteria.relevanceConfig.type != null) {
                hashSet.add(filterCriteria.relevanceConfig.type);
            }
        }
        return hashSet;
    }

    private Set<String> getFilterPrefixes(Collection<TurbineStreamServlet.FilterCriteria> collection) {
        HashSet hashSet = new HashSet();
        for (TurbineStreamServlet.FilterCriteria filterCriteria : collection) {
            if (filterCriteria.prefix != null) {
                hashSet.add(filterCriteria.prefix);
            }
        }
        return hashSet;
    }

    private Set<RelevanceConfig> getRelevanceConfig(Collection<TurbineStreamServlet.FilterCriteria> collection) {
        HashSet hashSet = new HashSet();
        for (TurbineStreamServlet.FilterCriteria filterCriteria : collection) {
            if (filterCriteria.relevanceConfig != null) {
                hashSet.add(filterCriteria.relevanceConfig);
            }
        }
        return hashSet;
    }

    private String getDataKey(TurbineData turbineData) {
        String str = turbineData.getClass().getSimpleName() + "_" + turbineData.getKey();
        if (turbineData instanceof DataFromSingleInstance) {
            str = str + "_" + ((DataFromSingleInstance) turbineData).getHost().getHostname();
        }
        return str;
    }

    private boolean isDataWithinStreamingWindow(String str) {
        long currentTimeMillis = System.currentTimeMillis();
        AtomicLong atomicLong = this.lastOutputForTypeCache.get(str);
        if (atomicLong == null) {
            return this.lastOutputForTypeCache.putIfAbsent(str, new AtomicLong(currentTimeMillis)) == null;
        }
        long j = atomicLong.get();
        return (this.streamingDelay == -1 || currentTimeMillis >= j + ((long) this.streamingDelay)) && atomicLong.compareAndSet(j, currentTimeMillis);
    }

    @Override // com.netflix.turbine.handler.TurbineDataHandler
    public PerformanceCriteria getCriteria() {
        return this.perfCriteria;
    }
}
