package org.apache.samza.drain;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Set;
import java.util.UUID;
import joptsimple.internal.Strings;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/drain/DrainUtils.class */
public class DrainUtils {
    private static final Logger LOG = LoggerFactory.getLogger(DrainUtils.class);
    private static final Integer VERSION = 1;
    public static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" + VERSION;

    private DrainUtils() {
    }

    public static UUID writeDrainNotification(MetadataStore metadataStore) {
        Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot be null.");
        String runId = new ApplicationConfig(CoordinatorStreamUtil.readConfigFromCoordinatorStream(metadataStore)).getRunId();
        if (Strings.isNullOrEmpty(runId)) {
            throw new SamzaException("Unable to retrieve runId from metadata store. DrainNotification will not be written to the metadata store.");
        }
        LOG.info("Received runId {}", runId);
        LOG.info("Attempting to write DrainNotification to metadata-store for the deployment ID {}", runId);
        return writeDrainNotification(metadataStore, DrainNotification.create(UUID.randomUUID(), runId));
    }

    @VisibleForTesting
    static UUID writeDrainNotification(MetadataStore metadataStore, DrainNotification drainNotification) {
        Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot be null.");
        Preconditions.checkArgument(drainNotification != null, "DrainNotification cannot be null.");
        NamespaceAwareCoordinatorStreamStore namespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE);
        try {
            namespaceAwareCoordinatorStreamStore.put(drainNotification.getUuid().toString(), DrainNotificationObjectMapper.getObjectMapper().writeValueAsBytes(drainNotification));
            namespaceAwareCoordinatorStreamStore.flush();
            LOG.info("DrainNotification with id {} written to metadata-store for the deployment ID {}", drainNotification.getUuid(), drainNotification.getUuid());
            return drainNotification.getUuid();
        } catch (Exception e) {
            throw new SamzaException(String.format("DrainNotification might have been not written to metastore %s", drainNotification), e);
        }
    }

    public static void cleanup(MetadataStore metadataStore, Config config) {
        Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot be null.");
        Preconditions.checkNotNull(config, "Config parameter cannot be null.");
        String runId = new ApplicationConfig(config).getRunId();
        ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
        NamespaceAwareCoordinatorStreamStore namespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE);
        if (!DrainMonitor.shouldDrain(namespaceAwareCoordinatorStreamStore, runId)) {
            LOG.info("No DrainNotification found in the metadata-store for the current deployment {}. No need to cleanup.", runId);
            return;
        }
        LOG.info("Attempting to clean up DrainNotifications from the metadata-store for the current deployment {}", runId);
        namespaceAwareCoordinatorStreamStore.all().values().stream().map(bArr -> {
            try {
                return (DrainNotification) objectMapper.readValue(bArr, DrainNotification.class);
            } catch (IOException e) {
                LOG.error("Unable to deserialize DrainNotification from the metadata store", e);
                throw new SamzaException(e);
            }
        }).filter(drainNotification -> {
            return runId.equals(drainNotification.getRunId());
        }).forEach(drainNotification2 -> {
            namespaceAwareCoordinatorStreamStore.delete(drainNotification2.getUuid().toString());
        });
        namespaceAwareCoordinatorStreamStore.flush();
        LOG.info("Successfully cleaned up DrainNotifications from the metadata-store for the current deployment {}", runId);
    }

    public static void cleanupAll(MetadataStore metadataStore) {
        Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot be null.");
        NamespaceAwareCoordinatorStreamStore namespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE);
        LOG.info("Attempting to cleanup all DrainNotifications from the metadata-store.");
        Set<String> keySet = namespaceAwareCoordinatorStreamStore.all().keySet();
        namespaceAwareCoordinatorStreamStore.getClass();
        keySet.forEach(namespaceAwareCoordinatorStreamStore::delete);
        namespaceAwareCoordinatorStreamStore.flush();
        LOG.info("Successfully cleaned up all DrainNotifications from the metadata-store.");
    }
}
