package org.apache.samza.drain;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.metadatastore.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/drain/DrainMonitor.class */
public class DrainMonitor {
    private static final Logger LOG = LoggerFactory.getLogger(DrainMonitor.class);
    private static final int INITIAL_POLL_DELAY_MILLIS = 0;
    private final ScheduledExecutorService schedulerService;
    private final String appRunId;
    private final NamespaceAwareCoordinatorStreamStore drainMetadataStore;
    private final Object lock;

    @GuardedBy("lock")
    private State state;
    private DrainCallback callback;
    private long pollingIntervalMillis;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.samza.drain.DrainMonitor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/samza/drain/DrainMonitor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$samza$drain$DrainMonitor$State = new int[State.values().length];

        static {
            try {
                $SwitchMap$org$apache$samza$drain$DrainMonitor$State[State.INIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$samza$drain$DrainMonitor$State[State.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$samza$drain$DrainMonitor$State[State.STOPPED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/samza/drain/DrainMonitor$DrainCallback.class */
    public interface DrainCallback {
        void onDrain();
    }

    /* loaded from: input_file:org/apache/samza/drain/DrainMonitor$State.class */
    public enum State {
        INIT,
        RUNNING,
        STOPPED
    }

    public DrainMonitor(MetadataStore metadataStore, Config config) {
        this.schedulerService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Samza DrainMonitor Thread-%d").setDaemon(true).build());
        this.lock = new Object();
        this.state = State.INIT;
        Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot be null.");
        Preconditions.checkNotNull(config, "Config parameter cannot be null.");
        this.drainMetadataStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, DrainUtils.DRAIN_METADATA_STORE_NAMESPACE);
        this.appRunId = new ApplicationConfig(config).getRunId();
        this.pollingIntervalMillis = new JobConfig(config).getDrainMonitorPollIntervalMillis();
    }

    public DrainMonitor(MetadataStore metadataStore, Config config, long j) {
        this(metadataStore, config);
        Preconditions.checkArgument(j > 0, String.format("Polling interval specified is %d ms. It should be greater than 0.", Long.valueOf(j)));
        this.pollingIntervalMillis = j;
    }

    public void start() {
        Preconditions.checkState(this.callback != null, "Drain Callback needs to be set using registerCallback(callback) prior to starting the DrainManager.");
        synchronized (this.lock) {
            switch (AnonymousClass1.$SwitchMap$org$apache$samza$drain$DrainMonitor$State[this.state.ordinal()]) {
                case 1:
                    if (!shouldDrain(this.drainMetadataStore, this.appRunId)) {
                        this.state = State.RUNNING;
                        this.schedulerService.scheduleAtFixedRate(() -> {
                            if (shouldDrain(this.drainMetadataStore, this.appRunId)) {
                                LOG.info("Received Drain Notification for deployment: {}", this.appRunId);
                                stop();
                                this.callback.onDrain();
                            }
                        }, 0L, this.pollingIntervalMillis, TimeUnit.MILLISECONDS);
                        LOG.info("Started DrainMonitor.");
                        break;
                    } else {
                        LOG.info("Found DrainNotification message on container start. Skipping poll of DrainNotifications.");
                        this.callback.onDrain();
                        break;
                    }
                case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
                case 3:
                    LOG.info("Cannot call start() on the DrainMonitor when it is in {} state.", this.state);
                    break;
            }
        }
    }

    public void stop() {
        synchronized (this.lock) {
            switch (AnonymousClass1.$SwitchMap$org$apache$samza$drain$DrainMonitor$State[this.state.ordinal()]) {
                case 1:
                case 3:
                    LOG.info("Cannot stop DrainMonitor as it is not running. State: {}.", this.state);
                    break;
                case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
                    this.schedulerService.shutdownNow();
                    this.state = State.STOPPED;
                    LOG.info("Stopped DrainMonitor.");
                    break;
            }
        }
    }

    public boolean registerDrainCallback(DrainCallback drainCallback) {
        Preconditions.checkNotNull(drainCallback);
        switch (AnonymousClass1.$SwitchMap$org$apache$samza$drain$DrainMonitor$State[this.state.ordinal()]) {
            case 1:
                if (this.callback != null) {
                    LOG.warn("Cannot register callback as a callback is already registered.");
                    return false;
                }
                this.callback = drainCallback;
                return true;
            case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
            case 3:
                LOG.warn("Cannot register callback when it is in {} state. Please register callback before calling start on DrainMonitor.", this.state);
                return false;
            default:
                return false;
        }
    }

    @VisibleForTesting
    State getState() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean shouldDrain(NamespaceAwareCoordinatorStreamStore namespaceAwareCoordinatorStreamStore, String str) {
        Optional<List<DrainNotification>> readDrainNotificationMessages = readDrainNotificationMessages(namespaceAwareCoordinatorStreamStore);
        return readDrainNotificationMessages.isPresent() && !((ImmutableList) readDrainNotificationMessages.get().stream().filter(drainNotification -> {
            return str.equals(drainNotification.getRunId());
        }).collect(ImmutableList.toImmutableList())).isEmpty();
    }

    private static Optional<List<DrainNotification>> readDrainNotificationMessages(NamespaceAwareCoordinatorStreamStore namespaceAwareCoordinatorStreamStore) {
        ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
        ImmutableList immutableList = (ImmutableList) namespaceAwareCoordinatorStreamStore.all().values().stream().map(bArr -> {
            try {
                return (DrainNotification) objectMapper.readValue(bArr, DrainNotification.class);
            } catch (IOException e) {
                LOG.error("Unable to deserialize DrainNotification from the metadata store", e);
                throw new SamzaException(e);
            }
        }).collect(ImmutableList.toImmutableList());
        return immutableList.size() > 0 ? Optional.of(immutableList) : Optional.empty();
    }
}
