package org.apache.samza.diagnostics;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.samza.config.Config;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Clock;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/diagnostics/DiagnosticsManager.class */
public class DiagnosticsManager {
    private static final Logger LOG = LoggerFactory.getLogger(DiagnosticsManager.class);
    private static final Duration DEFAULT_PUBLISH_PERIOD = Duration.ofSeconds(60);
    private static final String PUBLISH_THREAD_NAME = "DiagnosticsManager Thread-%d";
    private final String jobName;
    private final String jobId;
    private final String containerId;
    private final String executionEnvContainerId;
    private final String samzaEpochId;
    private final String taskClassVersion;
    private final String samzaVersion;
    private final String hostname;
    private final Instant resetTime;
    private final int containerMemoryMb;
    private final int containerNumCores;
    private final int numPersistentStores;
    private final long maxHeapSizeBytes;
    private final int containerThreadPoolSize;
    private final Map<String, ContainerModel> containerModels;
    private final boolean autosizingEnabled;
    private final Config config;
    private final Clock clock;
    private boolean jobParamsEmitted;
    private final SystemProducer systemProducer;
    private final BoundedList<DiagnosticsExceptionEvent> exceptions;
    private final ConcurrentLinkedQueue<ProcessorStopEvent> processorStopEvents;
    private final ScheduledExecutorService scheduler;
    private final Duration terminationDuration;
    private final SystemStream diagnosticSystemStream;

    /* loaded from: input_file:org/apache/samza/diagnostics/DiagnosticsManager$DiagnosticsStreamPublisher.class */
    private class DiagnosticsStreamPublisher implements Runnable {
        private DiagnosticsStreamPublisher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                DiagnosticsStreamMessage diagnosticsStreamMessage = new DiagnosticsStreamMessage(DiagnosticsManager.this.jobName, DiagnosticsManager.this.jobId, "samza-container-" + DiagnosticsManager.this.containerId, DiagnosticsManager.this.executionEnvContainerId, Optional.of(DiagnosticsManager.this.samzaEpochId), DiagnosticsManager.this.taskClassVersion, DiagnosticsManager.this.samzaVersion, DiagnosticsManager.this.hostname, DiagnosticsManager.this.clock.currentTimeMillis(), DiagnosticsManager.this.resetTime.toEpochMilli());
                if (!DiagnosticsManager.this.jobParamsEmitted) {
                    diagnosticsStreamMessage.addContainerMb(Integer.valueOf(DiagnosticsManager.this.containerMemoryMb));
                    diagnosticsStreamMessage.addContainerNumCores(Integer.valueOf(DiagnosticsManager.this.containerNumCores));
                    diagnosticsStreamMessage.addNumPersistentStores(Integer.valueOf(DiagnosticsManager.this.numPersistentStores));
                    diagnosticsStreamMessage.addContainerModels(DiagnosticsManager.this.containerModels);
                    diagnosticsStreamMessage.addMaxHeapSize(Long.valueOf(DiagnosticsManager.this.maxHeapSizeBytes));
                    diagnosticsStreamMessage.addContainerThreadPoolSize(Integer.valueOf(DiagnosticsManager.this.containerThreadPoolSize));
                    diagnosticsStreamMessage.addAutosizingEnabled(Boolean.valueOf(DiagnosticsManager.this.autosizingEnabled));
                    diagnosticsStreamMessage.addConfig(DiagnosticsManager.this.config);
                }
                diagnosticsStreamMessage.addProcessorStopEvents(new ArrayList(DiagnosticsManager.this.processorStopEvents));
                diagnosticsStreamMessage.addDiagnosticsExceptionEvents(DiagnosticsManager.this.exceptions.getValues());
                if (!diagnosticsStreamMessage.isEmpty()) {
                    DiagnosticsManager.this.systemProducer.send(DiagnosticsManager.class.getName(), new OutgoingMessageEnvelope(DiagnosticsManager.this.diagnosticSystemStream, (Object) null, (Object) null, new MetricsSnapshotSerdeV2().toBytes(diagnosticsStreamMessage.convertToMetricsSnapshot())));
                    DiagnosticsManager.this.systemProducer.flush(DiagnosticsManager.class.getName());
                    if (diagnosticsStreamMessage.getProcessorStopEvents() != null) {
                        DiagnosticsManager.this.processorStopEvents.removeAll(diagnosticsStreamMessage.getProcessorStopEvents());
                    }
                    if (diagnosticsStreamMessage.getExceptionEvents() != null) {
                        DiagnosticsManager.this.exceptions.remove(diagnosticsStreamMessage.getExceptionEvents());
                    }
                    DiagnosticsManager.this.jobParamsEmitted = true;
                }
            } catch (Exception e) {
                DiagnosticsManager.LOG.error("Exception when flushing diagnosticsStreamMessage", e);
            }
        }
    }

    public DiagnosticsManager(String str, String str2, Map<String, ContainerModel> map, int i, int i2, int i3, long j, int i4, String str3, String str4, String str5, String str6, String str7, String str8, SystemStream systemStream, SystemProducer systemProducer, Duration duration, boolean z, Config config) {
        this(str, str2, map, i, i2, i3, j, i4, str3, str4, str5, str6, str7, str8, systemStream, systemProducer, duration, Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), z, config, SystemClock.instance());
    }

    @VisibleForTesting
    DiagnosticsManager(String str, String str2, Map<String, ContainerModel> map, int i, int i2, int i3, long j, int i4, String str3, String str4, String str5, String str6, String str7, String str8, SystemStream systemStream, SystemProducer systemProducer, Duration duration, ScheduledExecutorService scheduledExecutorService, boolean z, Config config, Clock clock) {
        this.jobParamsEmitted = false;
        this.jobName = str;
        this.jobId = str2;
        this.containerModels = map;
        this.containerMemoryMb = i;
        this.containerNumCores = i2;
        this.numPersistentStores = i3;
        this.maxHeapSizeBytes = j;
        this.containerThreadPoolSize = i4;
        this.containerId = str3;
        this.executionEnvContainerId = str4;
        this.samzaEpochId = str5;
        this.taskClassVersion = str6;
        this.samzaVersion = str7;
        this.hostname = str8;
        this.diagnosticSystemStream = systemStream;
        this.systemProducer = systemProducer;
        this.terminationDuration = duration;
        this.processorStopEvents = new ConcurrentLinkedQueue<>();
        this.exceptions = new BoundedList<>("exceptions");
        this.scheduler = scheduledExecutorService;
        this.autosizingEnabled = z;
        this.config = config;
        this.clock = clock;
        this.resetTime = Instant.ofEpochMilli(this.clock.currentTimeMillis());
        this.systemProducer.register(getClass().getSimpleName());
        try {
            ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender", Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class));
            LOG.info("Attached log4j diagnostics appender.");
        } catch (Exception e) {
            try {
                ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender", Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class));
                LOG.info("Attached log4j2 diagnostics appender.");
            } catch (Exception e2) {
                LOG.warn("Failed to instantiate neither diagnostic appender for sending error information to diagnostics stream.", e2);
            }
        }
    }

    public void start() {
        this.systemProducer.start();
        this.scheduler.scheduleWithFixedDelay(new DiagnosticsStreamPublisher(), 0L, DEFAULT_PUBLISH_PERIOD.getSeconds(), TimeUnit.SECONDS);
    }

    public void stop() throws InterruptedException {
        try {
            this.scheduler.shutdown();
            this.scheduler.awaitTermination(this.terminationDuration.toMillis(), TimeUnit.MILLISECONDS);
        } finally {
            if (!this.scheduler.isTerminated()) {
                LOG.warn("Unable to terminate scheduler");
                this.scheduler.shutdownNow();
            }
            this.systemProducer.stop();
        }
    }

    public void addExceptionEvent(DiagnosticsExceptionEvent diagnosticsExceptionEvent) {
        this.exceptions.add(diagnosticsExceptionEvent);
    }

    public void addProcessorStopEvent(String str, String str2, String str3, int i) {
        this.processorStopEvents.add(new ProcessorStopEvent(str, str2, str3, i));
        LOG.info("Added stop event for Container Id: {}, resource Id: {}, host: {}, exitStatus: {}", new Object[]{str, str2, str3, Integer.valueOf(i)});
    }
}
