package org.apache.samza.job.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
import com.google.common.hash.PrimitiveSink;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/job/metadata/JobCoordinatorMetadataManager.class */
public class JobCoordinatorMetadataManager {
    private static final Logger LOG = LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
    static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
    static final String CONTAINER_ID_DELIMITER = "_";
    private final JobCoordinatorMetadataManagerMetrics metrics;
    private final MetadataStore metadataStore;
    private final ObjectMapper metadataMapper;
    private final Serde<String> valueSerde;
    private final ClusterType clusterType;

    /* loaded from: input_file:org/apache/samza/job/metadata/JobCoordinatorMetadataManager$ClusterType.class */
    public enum ClusterType {
        YARN,
        NON_YARN
    }

    /* loaded from: input_file:org/apache/samza/job/metadata/JobCoordinatorMetadataManager$ConfigHashFunnel.class */
    private static class ConfigHashFunnel implements Funnel<Config> {
        private static final Logger LOG = LoggerFactory.getLogger(ConfigHashFunnel.class);
        private static final SortedSet<String> ALLOWED_PREFIXES = ImmutableSortedSet.of("job.autosizing");

        private ConfigHashFunnel() {
        }

        public void funnel(Config config, PrimitiveSink primitiveSink) {
            TreeMap treeMap = new TreeMap();
            ALLOWED_PREFIXES.forEach(str -> {
                treeMap.putAll(config.subset(str, false));
            });
            LOG.info("Using the config {} to generate hash", treeMap);
            treeMap.forEach((str2, str3) -> {
                primitiveSink.putUnencodedChars(str2);
                primitiveSink.putUnencodedChars(str3);
            });
        }
    }

    /* loaded from: input_file:org/apache/samza/job/metadata/JobCoordinatorMetadataManager$JobCoordinatorMetadataManagerMetrics.class */
    static class JobCoordinatorMetadataManagerMetrics {
        private static final String APPLICATION_ATTEMPT_COUNT = "application-attempt-count";
        private static final String GROUP = "JobCoordinatorMetadataManager";
        private static final String JOB_MODEL_CHANGED = "job-model-changed";
        private static final String CONFIG_CHANGED = "config-changed";
        private static final String METADATA_GENERATION_FAILED_COUNT = "metadata-generation-failed-count";
        private static final String METADATA_READ_FAILED_COUNT = "metadata-read-failed-count";
        private static final String METADATA_WRITE_FAILED_COUNT = "metadata-write-failed-count";
        private static final String NEW_DEPLOYMENT = "new-deployment";
        private final Gauge<Integer> applicationAttemptCount;
        private final Gauge<Integer> metadataGenerationFailedCount;
        private final Gauge<Integer> metadataReadFailedCount;
        private final Gauge<Integer> metadataWriteFailedCount;
        private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
        private final Gauge<Integer> configChangedAcrossApplicationAttempt;
        private final Gauge<Integer> newDeployment;

        public JobCoordinatorMetadataManagerMetrics(MetricsRegistry metricsRegistry) {
            this.applicationAttemptCount = metricsRegistry.newGauge(GROUP, APPLICATION_ATTEMPT_COUNT, 0);
            this.configChangedAcrossApplicationAttempt = metricsRegistry.newGauge(GROUP, CONFIG_CHANGED, 0);
            this.jobModelChangedAcrossApplicationAttempt = metricsRegistry.newGauge(GROUP, JOB_MODEL_CHANGED, 0);
            this.metadataGenerationFailedCount = metricsRegistry.newGauge(GROUP, METADATA_GENERATION_FAILED_COUNT, 0);
            this.metadataReadFailedCount = metricsRegistry.newGauge(GROUP, METADATA_READ_FAILED_COUNT, 0);
            this.metadataWriteFailedCount = metricsRegistry.newGauge(GROUP, METADATA_WRITE_FAILED_COUNT, 0);
            this.newDeployment = metricsRegistry.newGauge(GROUP, NEW_DEPLOYMENT, 0);
        }

        @VisibleForTesting
        Gauge<Integer> getApplicationAttemptCount() {
            return this.applicationAttemptCount;
        }

        @VisibleForTesting
        Gauge<Integer> getMetadataGenerationFailedCount() {
            return this.metadataGenerationFailedCount;
        }

        @VisibleForTesting
        Gauge<Integer> getMetadataReadFailedCount() {
            return this.metadataReadFailedCount;
        }

        @VisibleForTesting
        Gauge<Integer> getMetadataWriteFailedCount() {
            return this.metadataWriteFailedCount;
        }

        @VisibleForTesting
        Gauge<Integer> getJobModelChangedAcrossApplicationAttempt() {
            return this.jobModelChangedAcrossApplicationAttempt;
        }

        @VisibleForTesting
        Gauge<Integer> getConfigChangedAcrossApplicationAttempt() {
            return this.configChangedAcrossApplicationAttempt;
        }

        @VisibleForTesting
        Gauge<Integer> getNewDeployment() {
            return this.newDeployment;
        }

        void incrementApplicationAttemptCount() {
            this.applicationAttemptCount.set(Integer.valueOf(((Integer) this.applicationAttemptCount.getValue()).intValue() + 1));
        }

        void incrementMetadataGenerationFailedCount() {
            this.metadataGenerationFailedCount.set(Integer.valueOf(((Integer) this.metadataGenerationFailedCount.getValue()).intValue() + 1));
        }

        void incrementMetadataReadFailedCount() {
            this.metadataReadFailedCount.set(Integer.valueOf(((Integer) this.metadataReadFailedCount.getValue()).intValue() + 1));
        }

        void incrementMetadataWriteFailedCount() {
            this.metadataWriteFailedCount.set(Integer.valueOf(((Integer) this.metadataWriteFailedCount.getValue()).intValue() + 1));
        }

        void setConfigChangedAcrossApplicationAttempt(int i) {
            this.configChangedAcrossApplicationAttempt.set(Integer.valueOf(i));
        }

        void setJobModelChangedAcrossApplicationAttempt(int i) {
            this.jobModelChangedAcrossApplicationAttempt.set(Integer.valueOf(i));
        }

        void setNewDeployment(int i) {
            this.newDeployment.set(Integer.valueOf(i));
        }
    }

    public JobCoordinatorMetadataManager(MetadataStore metadataStore, ClusterType clusterType, MetricsRegistry metricsRegistry) {
        this(metadataStore, clusterType, metricsRegistry, new CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE));
    }

    @VisibleForTesting
    JobCoordinatorMetadataManager(MetadataStore metadataStore, ClusterType clusterType, MetricsRegistry metricsRegistry, Serde<String> serde) {
        this.metadataMapper = SamzaObjectMapper.getObjectMapper();
        Preconditions.checkNotNull(clusterType, "Cluster type cannot be null");
        this.clusterType = clusterType;
        this.metadataStore = metadataStore;
        this.valueSerde = serde;
        this.metrics = new JobCoordinatorMetadataManagerMetrics(metricsRegistry);
    }

    public JobCoordinatorMetadata generateJobCoordinatorMetadata(JobModel jobModel, Config config) {
        try {
            int asInt = Hashing.crc32c().hashBytes(SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel.getContainers())).asInt();
            int asInt2 = Hashing.crc32().hashObject(config, new ConfigHashFunnel()).asInt();
            LOG.info("Generated job model id {} and config id {}", Integer.valueOf(asInt), Integer.valueOf(asInt2));
            return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), String.valueOf(asInt2), String.valueOf(asInt));
        } catch (Exception e) {
            this.metrics.incrementMetadataGenerationFailedCount();
            LOG.error("Failed to generate metadata for the current attempt due to ", e);
            throw new SamzaException("Failed to generate the metadata for the current attempt due to ", e);
        }
    }

    public Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata jobCoordinatorMetadata, JobCoordinatorMetadata jobCoordinatorMetadata2) {
        HashSet hashSet = new HashSet();
        boolean z = false;
        if (jobCoordinatorMetadata2 == null || !jobCoordinatorMetadata2.getEpochId().equals(jobCoordinatorMetadata.getEpochId())) {
            this.metrics.setNewDeployment(1);
            z = true;
            hashSet.add(JobMetadataChange.NEW_DEPLOYMENT);
        }
        if (jobCoordinatorMetadata2 == null || !jobCoordinatorMetadata2.getJobModelId().equals(jobCoordinatorMetadata.getJobModelId())) {
            if (!z) {
                this.metrics.setJobModelChangedAcrossApplicationAttempt(1);
            }
            hashSet.add(JobMetadataChange.JOB_MODEL);
        }
        if (jobCoordinatorMetadata2 == null || !jobCoordinatorMetadata2.getConfigId().equals(jobCoordinatorMetadata.getConfigId())) {
            if (!z) {
                this.metrics.setConfigChangedAcrossApplicationAttempt(1);
            }
            hashSet.add(JobMetadataChange.CONFIG);
        }
        if (hashSet.isEmpty()) {
            LOG.info("Job coordinator metadata {} unchanged.", jobCoordinatorMetadata);
            this.metrics.incrementApplicationAttemptCount();
        } else {
            LOG.info("Job coordinator metadata changed from: {} to: {}", jobCoordinatorMetadata2, jobCoordinatorMetadata);
        }
        return hashSet;
    }

    public JobCoordinatorMetadata readJobCoordinatorMetadata() {
        JobCoordinatorMetadata jobCoordinatorMetadata = null;
        for (Map.Entry entry : this.metadataStore.all().entrySet()) {
            if (this.clusterType.name().equals(entry.getKey())) {
                try {
                    jobCoordinatorMetadata = (JobCoordinatorMetadata) this.metadataMapper.readValue((String) this.valueSerde.fromBytes((byte[]) entry.getValue()), JobCoordinatorMetadata.class);
                    break;
                } catch (Exception e) {
                    this.metrics.incrementMetadataReadFailedCount();
                    LOG.error("Failed to read job coordinator metadata due to ", e);
                }
            }
        }
        LOG.info("Fetched the job coordinator metadata for cluster {} as {}.", this.clusterType, jobCoordinatorMetadata);
        return jobCoordinatorMetadata;
    }

    public void writeJobCoordinatorMetadata(JobCoordinatorMetadata jobCoordinatorMetadata) {
        Preconditions.checkNotNull(jobCoordinatorMetadata, "Job coordinator metadata cannot be null");
        try {
            this.metadataStore.put(this.clusterType.name(), this.valueSerde.toBytes(this.metadataMapper.writeValueAsString(jobCoordinatorMetadata)));
            LOG.info("Successfully written job coordinator metadata: {} for cluster {}.", jobCoordinatorMetadata, this.clusterType);
        } catch (Exception e) {
            this.metrics.incrementMetadataWriteFailedCount();
            LOG.error("Failed to write the job coordinator metadata to metadata store due to ", e);
            throw new SamzaException("Failed to write the job coordinator metadata.", e);
        }
    }

    @VisibleForTesting
    String fetchEpochIdForJobCoordinator() {
        if (!ClusterType.YARN.equals(this.clusterType)) {
            return getEnvProperty(EnvironmentVariables.SAMZA_EPOCH_ID);
        }
        String[] split = getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
        return split[1] + CONTAINER_ID_DELIMITER + split[2];
    }

    @VisibleForTesting
    String getEnvProperty(String str) {
        return System.getenv(str);
    }

    @VisibleForTesting
    JobCoordinatorMetadataManagerMetrics getMetrics() {
        return this.metrics;
    }
}
