package org.apache.samza.coordinator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/samza/coordinator/StreamRegexMonitor.class */
public class StreamRegexMonitor {
    private static final Logger log = LoggerFactory.getLogger(StreamRegexMonitor.class);
    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Samza-" + StreamRegexMonitor.class.getSimpleName()).build();
    private final Set<SystemStream> streamsToMonitor;
    private final Map<String, Pattern> systemRegexesToMonitor;
    private final StreamMetadataCache metadataCache;
    private final int inputRegexMonitorPeriodMs;
    private final Map<String, Gauge<Integer>> gauges;
    private final Callback callbackMethod;
    private final Object lock = new Object();
    private final ScheduledExecutorService schedulerService = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
    private volatile State state = State.INIT;

    /* renamed from: org.apache.samza.coordinator.StreamRegexMonitor$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/samza/coordinator/StreamRegexMonitor$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$samza$coordinator$StreamRegexMonitor$State = new int[State.values().length];

        static {
            try {
                $SwitchMap$org$apache$samza$coordinator$StreamRegexMonitor$State[State.INIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$samza$coordinator$StreamRegexMonitor$State[State.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$samza$coordinator$StreamRegexMonitor$State[State.STOPPED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/samza/coordinator/StreamRegexMonitor$Callback.class */
    public interface Callback {
        void onInputStreamsChanged(Set<SystemStream> set, Set<SystemStream> set2, Map<String, Pattern> map);
    }

    /* loaded from: input_file:org/apache/samza/coordinator/StreamRegexMonitor$State.class */
    private enum State {
        INIT,
        RUNNING,
        STOPPED
    }

    public StreamRegexMonitor(Set<SystemStream> set, Map<String, Pattern> map, StreamMetadataCache streamMetadataCache, MetricsRegistry metricsRegistry, int i, Callback callback) {
        this.streamsToMonitor = set;
        this.systemRegexesToMonitor = map;
        this.metadataCache = streamMetadataCache;
        this.callbackMethod = callback;
        this.inputRegexMonitorPeriodMs = i;
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            hashMap.put(str, metricsRegistry.newGauge("job-coordinator", String.format("%s-new-input-streams", str), 0));
        }
        this.gauges = Collections.unmodifiableMap(hashMap);
        log.info("Created {} with inputRegexMonitorPeriodMs: {} and systemRegexesToMonitor: {}", new Object[]{getClass().getName(), Integer.valueOf(this.inputRegexMonitorPeriodMs), this.systemRegexesToMonitor});
    }

    public void start() {
        synchronized (this.lock) {
            switch (AnonymousClass2.$SwitchMap$org$apache$samza$coordinator$StreamRegexMonitor$State[this.state.ordinal()]) {
                case 1:
                    if (this.inputRegexMonitorPeriodMs > 0) {
                        this.schedulerService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.samza.coordinator.StreamRegexMonitor.1
                            @Override // java.lang.Runnable
                            public void run() {
                                StreamRegexMonitor.this.monitorInputRegexes();
                            }
                        }, 0L, this.inputRegexMonitorPeriodMs, TimeUnit.MILLISECONDS);
                    }
                    this.state = State.RUNNING;
                    break;
                case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
                    return;
                case 3:
                    throw new IllegalStateException("StreamRegexMonitor was stopped and cannot be restarted.");
            }
        }
    }

    public void stop() {
        synchronized (this.lock) {
            this.schedulerService.shutdownNow();
            this.state = State.STOPPED;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void monitorInputRegexes() {
        log.debug("Running monitorInputRegexes");
        try {
            HashSet hashSet = new HashSet();
            for (String str : this.systemRegexesToMonitor.keySet()) {
                try {
                    hashSet.addAll((Collection) ((Set) JavaConverters.setAsJavaSetConverter(this.metadataCache.getAllSystemStreams(str)).asJava()).stream().filter(systemStream -> {
                        return systemStream.getStream().matches(this.systemRegexesToMonitor.get(str).pattern());
                    }).collect(Collectors.toSet()));
                } catch (UnsupportedOperationException e) {
                    log.error("UnsupportedOperationException while monitoring input regexes for system {}", str, e);
                }
            }
            if (this.streamsToMonitor.containsAll(hashSet)) {
                log.info("No new input system-Streams discovered streamsToMonitor {} inputStreamsMatchingPattern {}", this.streamsToMonitor, hashSet);
            } else {
                log.info("New input system-streams discovered. InputStreamsMatchingPattern: {} but streamsToMonitor: {} ", hashSet, this.streamsToMonitor);
                this.callbackMethod.onInputStreamsChanged(this.streamsToMonitor, Sets.difference(hashSet, this.streamsToMonitor), this.systemRegexesToMonitor);
            }
        } catch (Exception e2) {
            log.error("Exception while monitoring input regexes.", e2);
        }
    }

    @VisibleForTesting
    boolean isRunning() {
        return this.state == State.RUNNING;
    }

    @VisibleForTesting
    boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.schedulerService.awaitTermination(j, timeUnit);
    }
}
