/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ActiveTaskCreator;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.StandbyTaskCreator;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsRebalanceListener;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.TaskMetadataImpl;
import org.apache.kafka.streams.processor.internals.ThreadMetadataImpl;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

public class StreamThread
extends Thread {
    private final Time time;
    private final Logger log;
    private final String logPrefix;
    public final Object stateLock;
    private final Duration pollTime;
    private final long commitTimeMs;
    private final long purgeTimeMs;
    private final int maxPollTimeMs;
    private final String originalReset;
    private final TaskManager taskManager;
    private final StreamsMetricsImpl streamsMetrics;
    private final Sensor commitSensor;
    private final Sensor pollSensor;
    private final Sensor pollRecordsSensor;
    private final Sensor punctuateSensor;
    private final Sensor processRecordsSensor;
    private final Sensor processLatencySensor;
    private final Sensor processRateSensor;
    private final Sensor pollRatioSensor;
    private final Sensor processRatioSensor;
    private final Sensor punctuateRatioSensor;
    private final Sensor commitRatioSensor;
    private final Sensor failedStreamThreadSensor;
    private static final long LOG_SUMMARY_INTERVAL_MS = 120000L;
    private long lastLogSummaryMs = -1L;
    private long totalRecordsProcessedSinceLastSummary = 0L;
    private long totalPunctuatorsSinceLastSummary = 0L;
    private long totalCommittedSinceLastSummary = 0L;
    private long now;
    private long lastPollMs;
    private long lastCommitMs;
    private long lastPurgeMs;
    private long lastPartitionAssignedMs = -1L;
    private int numIterations;
    private volatile State state = State.CREATED;
    private volatile ThreadMetadata threadMetadata;
    private StateListener stateListener;
    private final Optional<String> getGroupInstanceID;
    private final ChangelogReader changelogReader;
    private final ConsumerRebalanceListener rebalanceListener;
    private final Consumer<byte[], byte[]> mainConsumer;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final Admin adminClient;
    private final TopologyMetadata topologyMetadata;
    private final java.util.function.Consumer<Long> cacheResizer;
    private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
    private final Runnable shutdownErrorHook;
    private final AtomicInteger assignmentErrorCode;
    private final AtomicLong nextProbingRebalanceMs;
    private final Queue<StreamsException> nonFatalExceptionsToHandle;
    private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
    private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
    private final boolean eosEnabled;

    public void setStateListener(StateListener listener) {
        this.stateListener = listener;
    }

    public State state() {
        return this.state;
    }

    void setPartitionAssignedTime(long lastPartitionAssignedMs) {
        this.lastPartitionAssignedMs = lastPartitionAssignedMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    State setState(State newState) {
        State oldState;
        Object object = this.stateLock;
        synchronized (object) {
            oldState = this.state;
            if (this.state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                this.log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: only DEAD state is a valid next state", (Object)newState);
                return null;
            }
            if (this.state == State.DEAD) {
                this.log.debug("Ignoring request to transit from DEAD to {}: no valid next state after DEAD", (Object)newState);
                return null;
            }
            if (!this.state.isValidTransition(newState)) {
                this.log.error("Unexpected state transition from {} to {}", (Object)oldState, (Object)newState);
                throw new StreamsException(this.logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
            }
            this.log.info("State transition from {} to {}", (Object)oldState, (Object)newState);
            this.state = newState;
            if (newState == State.RUNNING) {
                this.updateThreadMetadata(this.taskManager.activeTaskMap(), this.taskManager.standbyTaskMap());
            }
            this.stateLock.notifyAll();
        }
        if (this.stateListener != null) {
            this.stateListener.onChange(this, this.state, oldState);
        }
        return oldState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.isAlive();
        }
    }

    public static StreamThread create(TopologyMetadata topologyMetadata, StreamsConfig config, KafkaClientSupplier clientSupplier, Admin adminClient, UUID processId, String clientId, StreamsMetricsImpl streamsMetrics, Time time, StreamsMetadataState streamsMetadataState, long cacheSizeBytes, StateDirectory stateDirectory, StateRestoreListener userStateRestoreListener, int threadIdx, Runnable shutdownErrorHook, BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {
        TaskManager taskManager;
        String threadId = clientId + "-StreamThread-" + threadIdx;
        String logPrefix = String.format("stream-thread [%s] ", threadId);
        LogContext logContext = new LogContext(logPrefix);
        Logger log = logContext.logger(StreamThread.class);
        ReferenceContainer referenceContainer = new ReferenceContainer();
        referenceContainer.adminClient = adminClient;
        referenceContainer.streamsMetadataState = streamsMetadataState;
        referenceContainer.time = time;
        referenceContainer.clientTags = config.getClientTags();
        log.info("Creating restore consumer client");
        Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(ClientUtils.getRestoreConsumerClientId(threadId));
        Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
        StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, restoreConsumer, userStateRestoreListener);
        ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
        ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(topologyMetadata, config, streamsMetrics, stateDirectory, changelogReader, cache, time, clientSupplier, threadId, processId, log);
        StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(topologyMetadata, config, streamsMetrics, stateDirectory, changelogReader, threadId, log);
        referenceContainer.taskManager = taskManager = new TaskManager(time, changelogReader, processId, logPrefix, streamsMetrics, activeTaskCreator, standbyTaskCreator, topologyMetadata, adminClient, stateDirectory);
        log.info("Creating consumer client");
        String applicationId = config.getString("application.id");
        Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, ClientUtils.getConsumerClientId(threadId), threadIdx);
        consumerConfigs.put("__reference.container.instance__", referenceContainer);
        String originalReset = (String)consumerConfigs.get("auto.offset.reset");
        if (topologyMetadata.hasOffsetResetOverrides()) {
            consumerConfigs.put("auto.offset.reset", "none");
        }
        Consumer<byte[], byte[]> mainConsumer = clientSupplier.getConsumer(consumerConfigs);
        taskManager.setMainConsumer(mainConsumer);
        referenceContainer.mainConsumer = mainConsumer;
        StreamThread streamThread = new StreamThread(time, config, adminClient, mainConsumer, restoreConsumer, changelogReader, originalReset, taskManager, streamsMetrics, topologyMetadata, threadId, logContext, referenceContainer.assignmentErrorCode, referenceContainer.nextScheduledRebalanceMs, referenceContainer.nonFatalExceptionsToHandle, shutdownErrorHook, streamsUncaughtExceptionHandler, cache::resize);
        return streamThread.updateThreadMetadata(ClientUtils.getSharedAdminClientId(clientId));
    }

    public StreamThread(Time time, StreamsConfig config, Admin adminClient, Consumer<byte[], byte[]> mainConsumer, Consumer<byte[], byte[]> restoreConsumer, ChangelogReader changelogReader, String originalReset, TaskManager taskManager, StreamsMetricsImpl streamsMetrics, TopologyMetadata topologyMetadata, String threadId, LogContext logContext, AtomicInteger assignmentErrorCode, AtomicLong nextProbingRebalanceMs, Queue<StreamsException> nonFatalExceptionsToHandle, Runnable shutdownErrorHook, BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler, java.util.function.Consumer<Long> cacheResizer) {
        super(threadId);
        this.stateLock = new Object();
        this.adminClient = adminClient;
        this.streamsMetrics = streamsMetrics;
        this.commitSensor = ThreadMetrics.commitSensor(threadId, streamsMetrics);
        this.pollSensor = ThreadMetrics.pollSensor(threadId, streamsMetrics);
        this.pollRecordsSensor = ThreadMetrics.pollRecordsSensor(threadId, streamsMetrics);
        this.pollRatioSensor = ThreadMetrics.pollRatioSensor(threadId, streamsMetrics);
        this.processLatencySensor = ThreadMetrics.processLatencySensor(threadId, streamsMetrics);
        this.processRecordsSensor = ThreadMetrics.processRecordsSensor(threadId, streamsMetrics);
        this.processRateSensor = ThreadMetrics.processRateSensor(threadId, streamsMetrics);
        this.processRatioSensor = ThreadMetrics.processRatioSensor(threadId, streamsMetrics);
        this.punctuateSensor = ThreadMetrics.punctuateSensor(threadId, streamsMetrics);
        this.punctuateRatioSensor = ThreadMetrics.punctuateRatioSensor(threadId, streamsMetrics);
        this.commitRatioSensor = ThreadMetrics.commitRatioSensor(threadId, streamsMetrics);
        this.failedStreamThreadSensor = ClientMetrics.failedStreamThreadSensor(streamsMetrics);
        this.assignmentErrorCode = assignmentErrorCode;
        this.shutdownErrorHook = shutdownErrorHook;
        this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
        this.cacheResizer = cacheResizer;
        ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
        ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
        ThreadMetrics.addThreadStartTimeMetric(threadId, streamsMetrics, time.milliseconds());
        ThreadMetrics.addThreadBlockedTimeMetric(threadId, new StreamThreadTotalBlockedTime(mainConsumer, restoreConsumer, taskManager::totalProducerBlockedTime), streamsMetrics);
        this.time = time;
        this.topologyMetadata = topologyMetadata;
        this.topologyMetadata.registerThread(this.getName());
        this.logPrefix = logContext.logPrefix();
        this.log = logContext.logger(StreamThread.class);
        this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, this.assignmentErrorCode);
        this.taskManager = taskManager;
        this.restoreConsumer = restoreConsumer;
        this.mainConsumer = mainConsumer;
        this.changelogReader = changelogReader;
        this.originalReset = originalReset;
        this.nextProbingRebalanceMs = nextProbingRebalanceMs;
        this.nonFatalExceptionsToHandle = nonFatalExceptionsToHandle;
        this.getGroupInstanceID = mainConsumer.groupMetadata().groupInstanceId();
        this.pollTime = Duration.ofMillis(config.getLong("poll.ms"));
        boolean dummyThreadIdx = true;
        this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId", 1)).getInt("max.poll.interval.ms");
        this.commitTimeMs = config.getLong("commit.interval.ms");
        this.purgeTimeMs = config.getLong("repartition.purge.interval.ms");
        this.numIterations = 1;
        this.eosEnabled = StreamsConfigUtils.eosEnabled(config);
    }

    @Override
    public void run() {
        this.log.info("Starting");
        if (this.setState(State.STARTING) == null) {
            this.log.info("StreamThread already shutdown. Not running");
            return;
        }
        boolean cleanRun = false;
        try {
            cleanRun = this.runLoop();
        }
        catch (Throwable e) {
            this.failedStreamThreadSensor.record();
            this.requestLeaveGroupDuringShutdown();
            this.streamsUncaughtExceptionHandler.accept(e, false);
        }
        finally {
            this.completeShutdown(cleanRun);
        }
    }

    boolean runLoop() {
        this.subscribeConsumer();
        while (this.isRunning() || this.taskManager.isRebalanceInProgress()) {
            try {
                this.checkForTopologyUpdates();
                if (!this.isRunning() && this.topologyMetadata.isEmpty()) {
                    this.log.info("Shutting down thread with empty topology.");
                    break;
                }
                this.maybeSendShutdown();
                long size = this.cacheResizeSize.getAndSet(-1L);
                if (size != -1L) {
                    this.cacheResizer.accept(size);
                }
                this.runOnce();
                if (this.taskManager.isRebalanceInProgress() || this.nextProbingRebalanceMs.get() >= this.time.milliseconds()) continue;
                this.log.info("Triggering the followup rebalance scheduled for {} ms.", (Object)this.nextProbingRebalanceMs.get());
                this.mainConsumer.enforceRebalance("triggered followup rebalance scheduled for " + this.nextProbingRebalanceMs.get());
                this.nextProbingRebalanceMs.set(Long.MAX_VALUE);
            }
            catch (TaskCorruptedException e) {
                this.log.warn("Detected the states of tasks " + e.corruptedTasks() + " are corrupted. Will close the task as dirty and re-create and bootstrap from scratch.", (Throwable)((Object)e));
                try {
                    boolean enforceRebalance = this.taskManager.handleCorruption(e.corruptedTasks());
                    if (!enforceRebalance || !this.eosEnabled) continue;
                    this.log.info("Active task(s) got corrupted. Triggering a rebalance.");
                    this.mainConsumer.enforceRebalance("Active tasks corrupted");
                }
                catch (TaskMigratedException taskMigrated) {
                    this.handleTaskMigrated(taskMigrated);
                }
            }
            catch (TaskMigratedException e) {
                this.handleTaskMigrated(e);
            }
            catch (UnsupportedVersionException e) {
                String errorMessage = e.getMessage();
                if (errorMessage != null && errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {
                    this.log.error("Shutting down because the Kafka cluster seems to be on a too old version. Setting {}=\"{}\"/\"{}\" requires broker version 2.5 or higher.", new Object[]{"processing.guarantee", "exactly_once_v2", "exactly_once_beta"});
                }
                this.failedStreamThreadSensor.record();
                this.streamsUncaughtExceptionHandler.accept((Throwable)((Object)new StreamsException(e)), false);
                return false;
            }
            catch (StreamsException e) {
                throw e;
            }
            catch (Exception e) {
                throw new StreamsException(e);
            }
        }
        return true;
    }

    public void setStreamsUncaughtExceptionHandler(BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {
        this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
    }

    public void maybeSendShutdown() {
        if (this.assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
            this.log.warn("Detected that shutdown was requested. All clients in this app will now begin to shutdown");
            this.mainConsumer.enforceRebalance("Shutdown requested");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean waitOnThreadState(State targetState, long timeoutMs) {
        long begin = this.time.milliseconds();
        Object object = this.stateLock;
        synchronized (object) {
            boolean interrupted = false;
            long elapsedMs = 0L;
            try {
                while (this.state != targetState) {
                    if (timeoutMs < elapsedMs) {
                        this.log.debug("Cannot transit to {} within {}ms", (Object)targetState, (Object)timeoutMs);
                        boolean bl = false;
                        return bl;
                    }
                    long remainingMs = timeoutMs - elapsedMs;
                    try {
                        this.stateLock.wait(remainingMs);
                    }
                    catch (InterruptedException e) {
                        interrupted = true;
                    }
                    elapsedMs = this.time.milliseconds() - begin;
                }
                boolean bl = true;
                return bl;
            }
            finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void shutdownToError() {
        this.shutdownErrorHook.run();
    }

    public void sendShutdownRequest(AssignorError assignorError) {
        this.assignmentErrorCode.set(assignorError.code());
    }

    private void handleTaskMigrated(TaskMigratedException e) {
        this.log.warn("Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group.", (Throwable)((Object)e));
        this.taskManager.handleLostAll();
        this.mainConsumer.unsubscribe();
        this.subscribeConsumer();
    }

    private void subscribeConsumer() {
        if (this.topologyMetadata.usesPatternSubscription()) {
            this.mainConsumer.subscribe(this.topologyMetadata.sourceTopicPattern(), this.rebalanceListener);
        } else {
            this.mainConsumer.subscribe(this.topologyMetadata.allFullSourceTopicNames(), this.rebalanceListener);
        }
    }

    public void resizeCache(long size) {
        this.cacheResizeSize.set(size);
    }

    void runOnce() {
        boolean logProcessingSummary;
        long startMs;
        this.now = startMs = this.time.milliseconds();
        long pollLatency = this.pollPhase();
        if (!this.isRunning()) {
            this.log.debug("Thread state is already {}, skipping the run once call after poll request", (Object)this.state);
            return;
        }
        this.initializeAndRestorePhase();
        this.advanceNowAndComputeLatency();
        int totalProcessed = 0;
        long totalCommitLatency = 0L;
        long totalProcessLatency = 0L;
        long totalPunctuateLatency = 0L;
        if (this.state == State.RUNNING) {
            while (true) {
                this.log.debug("Processing tasks with {} iterations.", (Object)this.numIterations);
                int processed = this.taskManager.process(this.numIterations, this.time);
                long processLatency = this.advanceNowAndComputeLatency();
                totalProcessLatency += processLatency;
                if (processed > 0) {
                    this.processRateSensor.record((double)processed, this.now);
                    this.processLatencySensor.record((double)processLatency / (double)processed, this.now);
                    totalProcessed += processed;
                    this.totalRecordsProcessedSinceLastSummary += (long)processed;
                }
                this.log.debug("Processed {} records with {} iterations; invoking punctuators if necessary", (Object)processed, (Object)this.numIterations);
                int punctuated = this.taskManager.punctuate();
                this.totalPunctuatorsSinceLastSummary += (long)punctuated;
                long punctuateLatency = this.advanceNowAndComputeLatency();
                totalPunctuateLatency += punctuateLatency;
                if (punctuated > 0) {
                    this.punctuateSensor.record((double)punctuateLatency / (double)punctuated, this.now);
                }
                this.log.debug("{} punctuators ran.", (Object)punctuated);
                long beforeCommitMs = this.now;
                int committed = this.maybeCommit();
                long commitLatency = Math.max(this.now - beforeCommitMs, 0L);
                totalCommitLatency += commitLatency;
                if (committed > 0) {
                    this.totalCommittedSinceLastSummary += (long)committed;
                    this.commitSensor.record((double)commitLatency / (double)committed, this.now);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Committed all active tasks {} and standby tasks {} in {}ms", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), commitLatency});
                    }
                }
                if (processed == 0) break;
                if (Math.max(this.now - this.lastPollMs, 0L) > (long)(this.maxPollTimeMs / 2)) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                    break;
                }
                if (punctuated > 0 || committed > 0) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                    continue;
                }
                ++this.numIterations;
            }
            this.taskManager.recordTaskProcessRatio(totalProcessLatency, this.now);
        }
        this.now = this.time.milliseconds();
        long runOnceLatency = this.now - startMs;
        this.processRecordsSensor.record((double)totalProcessed, this.now);
        this.processRatioSensor.record((double)totalProcessLatency / (double)runOnceLatency, this.now);
        this.punctuateRatioSensor.record((double)totalPunctuateLatency / (double)runOnceLatency, this.now);
        this.pollRatioSensor.record((double)pollLatency / (double)runOnceLatency, this.now);
        this.commitRatioSensor.record((double)totalCommitLatency / (double)runOnceLatency, this.now);
        boolean bl = logProcessingSummary = this.now - this.lastLogSummaryMs > 120000L;
        if (logProcessingSummary) {
            this.log.info("Processed {} total records, ran {} punctuators, and committed {} total tasks since the last update", new Object[]{this.totalRecordsProcessedSinceLastSummary, this.totalPunctuatorsSinceLastSummary, this.totalCommittedSinceLastSummary});
            this.totalRecordsProcessedSinceLastSummary = 0L;
            this.totalPunctuatorsSinceLastSummary = 0L;
            this.totalCommittedSinceLastSummary = 0L;
            this.lastLogSummaryMs = this.now;
        }
    }

    private void initializeAndRestorePhase() {
        State stateSnapshot = this.state;
        if (stateSnapshot == State.PARTITIONS_ASSIGNED || stateSnapshot == State.RUNNING && this.taskManager.needsInitializationOrRestoration()) {
            this.log.debug("State is {}; initializing tasks if necessary", (Object)stateSnapshot);
            this.changelogReader.enforceRestoreActive();
            if (this.taskManager.tryToCompleteRestoration(this.now, partitions -> this.resetOffsets((Set<TopicPartition>)partitions, null))) {
                this.changelogReader.transitToUpdateStandby();
                this.log.info("Restoration took {} ms for all tasks {}", (Object)(this.time.milliseconds() - this.lastPartitionAssignedMs), this.taskManager.tasks().keySet());
                this.setState(State.RUNNING);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Initialization call done. State is {}", (Object)this.state);
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Idempotently invoking restoration logic in state {}", (Object)this.state);
        }
        this.changelogReader.restore(this.taskManager.notPausedTasks());
        this.log.debug("Idempotent restore call done. Thread state has not changed.");
    }

    private void checkForTopologyUpdates() {
        if (this.topologyMetadata.isEmpty() || this.topologyMetadata.needsUpdate(this.getName())) {
            this.log.info("StreamThread has detected an update to the topology");
            this.taskManager.handleTopologyUpdates();
            this.topologyMetadata.maybeWaitForNonEmptyTopology(() -> this.state);
            this.log.info("Updating consumer subscription following topology update");
            this.subscribeConsumer();
        }
    }

    private long pollPhase() {
        ConsumerRecords<byte[], byte[]> records;
        this.log.debug("Invoking poll on main Consumer");
        if (this.state == State.PARTITIONS_ASSIGNED) {
            records = this.pollRequests(Duration.ZERO);
        } else if (this.state == State.PARTITIONS_REVOKED) {
            records = this.pollRequests(Duration.ZERO);
        } else if (this.state == State.RUNNING || this.state == State.STARTING) {
            records = this.pollRequests(this.pollTime);
        } else if (this.state == State.PENDING_SHUTDOWN) {
            records = this.pollRequests(Duration.ZERO);
        } else {
            this.log.error("Unexpected state {} during normal iteration", (Object)this.state);
            throw new StreamsException(this.logPrefix + "Unexpected state " + this.state + " during normal iteration");
        }
        long pollLatency = this.advanceNowAndComputeLatency();
        int numRecords = records.count();
        for (TopicPartition topicPartition : records.partitions()) {
            records.records(topicPartition).stream().max(Comparator.comparing(ConsumerRecord::offset)).ifPresent(t -> this.taskManager.updateTaskEndMetadata(topicPartition, t.offset()));
        }
        this.log.debug("Main Consumer poll completed in {} ms and fetched {} records from partitions {}", new Object[]{pollLatency, numRecords, records.partitions()});
        this.pollSensor.record((double)pollLatency, this.now);
        if (!records.isEmpty()) {
            this.pollRecordsSensor.record((double)numRecords, this.now);
            this.taskManager.addRecordsToTasks(records);
        }
        while (!this.nonFatalExceptionsToHandle.isEmpty()) {
            this.streamsUncaughtExceptionHandler.accept((Throwable)((Object)this.nonFatalExceptionsToHandle.poll()), true);
        }
        return pollLatency;
    }

    private ConsumerRecords<byte[], byte[]> pollRequests(Duration pollTime) {
        ConsumerRecords records = ConsumerRecords.empty();
        this.lastPollMs = this.now;
        try {
            records = this.mainConsumer.poll(pollTime);
        }
        catch (InvalidOffsetException e) {
            this.resetOffsets(e.partitions(), (Exception)((Object)e));
        }
        return records;
    }

    private void resetOffsets(Set<TopicPartition> partitions, Exception cause) {
        HashSet<String> loggedTopics = new HashSet<String>();
        HashSet<TopicPartition> seekToBeginning = new HashSet<TopicPartition>();
        HashSet<TopicPartition> seekToEnd = new HashSet<TopicPartition>();
        HashSet<TopicPartition> notReset = new HashSet<TopicPartition>();
        block5: for (TopicPartition partition : partitions) {
            OffsetResetStrategy offsetResetStrategy = this.topologyMetadata.offsetResetStrategy(partition.topic());
            if (offsetResetStrategy == null) continue;
            switch (offsetResetStrategy) {
                case EARLIEST: {
                    this.addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics);
                    continue block5;
                }
                case LATEST: {
                    this.addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
                    continue block5;
                }
                case NONE: {
                    if ("earliest".equals(this.originalReset)) {
                        this.addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
                        continue block5;
                    }
                    if ("latest".equals(this.originalReset)) {
                        this.addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
                        continue block5;
                    }
                    notReset.add(partition);
                    continue block5;
                }
            }
            throw new IllegalStateException("Unable to locate topic " + partition.topic() + " in the topology");
        }
        if (notReset.isEmpty()) {
            if (!seekToBeginning.isEmpty()) {
                this.mainConsumer.seekToBeginning(seekToBeginning);
            }
            if (!seekToEnd.isEmpty()) {
                this.mainConsumer.seekToEnd(seekToEnd);
            }
        } else {
            String notResetString = notReset.stream().map(TopicPartition::topic).distinct().collect(Collectors.joining(","));
            String format = String.format("No valid committed offset found for input [%s] and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))", notResetString);
            if (cause == null) {
                throw new StreamsException(format);
            }
            throw new StreamsException(format, cause);
        }
    }

    private void addToResetList(TopicPartition partition, Set<TopicPartition> partitions, String logMessage, String resetPolicy, Set<String> loggedTopics) {
        String topic = partition.topic();
        if (loggedTopics.add(topic)) {
            this.log.info(logMessage, (Object)topic, (Object)resetPolicy);
        }
        partitions.add(partition);
    }

    int maybeCommit() {
        int committed;
        if (this.now - this.lastCommitMs > this.commitTimeMs) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.now - this.lastCommitMs, this.commitTimeMs});
            }
            if ((committed = this.taskManager.commit(this.taskManager.tasks().values().stream().filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING).collect(Collectors.toSet()))) > 0 && this.now - this.lastPurgeMs > this.purgeTimeMs) {
                this.taskManager.maybePurgeCommittedRecords();
                this.lastPurgeMs = this.now;
            }
            if (committed == -1) {
                this.log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
            } else {
                this.lastCommitMs = this.now = this.time.milliseconds();
            }
        } else {
            committed = this.taskManager.maybeCommitActiveTasksPerUserRequested();
        }
        return committed;
    }

    private long advanceNowAndComputeLatency() {
        long previous = this.now;
        this.now = this.time.milliseconds();
        return Math.max(this.now - previous, 0L);
    }

    public void shutdown() {
        this.log.info("Informed to shut down");
        State oldState = this.setState(State.PENDING_SHUTDOWN);
        if (oldState == State.CREATED) {
            this.completeShutdown(true);
        }
    }

    private void completeShutdown(boolean cleanRun) {
        this.setState(State.PENDING_SHUTDOWN);
        this.log.info("Shutting down");
        try {
            this.taskManager.shutdown(cleanRun);
        }
        catch (Throwable e) {
            this.log.error("Failed to close task manager due to the following error:", e);
        }
        try {
            this.topologyMetadata.unregisterThread(this.threadMetadata.threadName());
        }
        catch (Throwable e) {
            this.log.error("Failed to unregister thread due to the following error:", e);
        }
        try {
            this.changelogReader.clear();
        }
        catch (Throwable e) {
            this.log.error("Failed to close changelog reader due to the following error:", e);
        }
        if (this.leaveGroupRequested.get()) {
            this.mainConsumer.unsubscribe();
        }
        try {
            this.mainConsumer.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close consumer due to the following error:", e);
        }
        try {
            this.restoreConsumer.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close restore consumer due to the following error:", e);
        }
        this.streamsMetrics.removeAllThreadLevelSensors(this.getName());
        this.streamsMetrics.removeAllThreadLevelMetrics(this.getName());
        this.setState(State.DEAD);
        this.log.info("Shutdown complete");
    }

    public final ThreadMetadata threadMetadata() {
        return this.threadMetadata;
    }

    StreamThread updateThreadMetadata(String adminClientId) {
        this.threadMetadata = new ThreadMetadataImpl(this.getName(), this.state().name(), ClientUtils.getConsumerClientId(this.getName()), ClientUtils.getRestoreConsumerClientId(this.getName()), this.taskManager.producerClientIds(), adminClientId, Collections.emptySet(), Collections.emptySet());
        return this;
    }

    private void updateThreadMetadata(Map<TaskId, Task> activeTasks, Map<TaskId, Task> standbyTasks) {
        HashSet<TaskMetadata> activeTasksMetadata = new HashSet<TaskMetadata>();
        for (Map.Entry<TaskId, Task> entry : activeTasks.entrySet()) {
            activeTasksMetadata.add(new TaskMetadataImpl(entry.getValue().id(), entry.getValue().inputPartitions(), entry.getValue().committedOffsets(), entry.getValue().highWaterMark(), entry.getValue().timeCurrentIdlingStarted()));
        }
        HashSet<TaskMetadata> standbyTasksMetadata = new HashSet<TaskMetadata>();
        for (Map.Entry<TaskId, Task> entry : standbyTasks.entrySet()) {
            standbyTasksMetadata.add(new TaskMetadataImpl(entry.getValue().id(), entry.getValue().inputPartitions(), entry.getValue().committedOffsets(), entry.getValue().highWaterMark(), entry.getValue().timeCurrentIdlingStarted()));
        }
        String string = this.threadMetadata.adminClientId();
        this.threadMetadata = new ThreadMetadataImpl(this.getName(), this.state().name(), ClientUtils.getConsumerClientId(this.getName()), ClientUtils.getRestoreConsumerClientId(this.getName()), this.taskManager.producerClientIds(), string, activeTasksMetadata, standbyTasksMetadata);
    }

    public Map<TaskId, Task> activeTaskMap() {
        return this.taskManager.activeTaskMap();
    }

    public List<Task> activeTasks() {
        return this.taskManager.activeTaskIterable();
    }

    public Map<TaskId, Task> allTasks() {
        return this.taskManager.tasks();
    }

    @Override
    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        return indent + "\tStreamsThread threadId: " + this.getName() + "\n" + this.taskManager.toString(indent);
    }

    public Optional<String> getGroupInstanceID() {
        return this.getGroupInstanceID;
    }

    public void requestLeaveGroupDuringShutdown() {
        this.leaveGroupRequested.set(true);
    }

    public Map<MetricName, Metric> producerMetrics() {
        return this.taskManager.producerMetrics();
    }

    public Map<MetricName, Metric> consumerMetrics() {
        return ClientUtils.consumerMetrics(this.mainConsumer, this.restoreConsumer);
    }

    public Map<MetricName, Metric> adminClientMetrics() {
        return ClientUtils.adminClientMetrics(this.adminClient);
    }

    public Object getStateLock() {
        return this.stateLock;
    }

    void setNow(long now) {
        this.now = now;
    }

    TaskManager taskManager() {
        return this.taskManager;
    }

    int currentNumIterations() {
        return this.numIterations;
    }

    ConsumerRebalanceListener rebalanceListener() {
        return this.rebalanceListener;
    }

    Consumer<byte[], byte[]> mainConsumer() {
        return this.mainConsumer;
    }

    Consumer<byte[], byte[]> restoreConsumer() {
        return this.restoreConsumer;
    }

    Admin adminClient() {
        return this.adminClient;
    }

    private static final class InternalConsumerConfig
    extends ConsumerConfig {
        private InternalConsumerConfig(Map<String, Object> props) {
            super(ConsumerConfig.appendDeserializerToConfig(props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()), false);
        }
    }

    public static interface StateListener {
        public void onChange(Thread var1, ThreadStateTransitionValidator var2, ThreadStateTransitionValidator var3);
    }

    public static enum State implements ThreadStateTransitionValidator
    {
        CREATED(1, 5),
        STARTING(2, 3, 5),
        PARTITIONS_REVOKED(2, 3, 5),
        PARTITIONS_ASSIGNED(2, 3, 4, 5),
        RUNNING(2, 3, 4, 5),
        PENDING_SHUTDOWN(6),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isAlive() {
            return this.equals(RUNNING) || this.equals(STARTING) || this.equals(PARTITIONS_REVOKED) || this.equals(PARTITIONS_ASSIGNED);
        }

        @Override
        public boolean isValidTransition(ThreadStateTransitionValidator newState) {
            State tmpState = (State)newState;
            return this.validTransitions.contains(tmpState.ordinal());
        }
    }
}

