/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManagerUtil;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;

public class StandbyTask
extends AbstractTask
implements Task {
    private final Sensor closeTaskSensor;
    private final boolean eosEnabled;
    private final InternalProcessorContext processorContext;
    private final StreamsMetricsImpl streamsMetrics;

    StandbyTask(TaskId id, Set<TopicPartition> inputPartitions, ProcessorTopology topology, StreamsConfig config, StreamsMetricsImpl streamsMetrics, ProcessorStateManager stateMgr, StateDirectory stateDirectory, ThreadCache cache, InternalProcessorContext processorContext) {
        super(id, topology, stateDirectory, stateMgr, inputPartitions, config.getLong("task.timeout.ms"), "standby-task", StandbyTask.class);
        this.processorContext = processorContext;
        this.streamsMetrics = streamsMetrics;
        processorContext.transitionToStandby(cache);
        this.closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
        this.eosEnabled = StreamThread.eosEnabled(config);
    }

    @Override
    public boolean isActive() {
        return false;
    }

    @Override
    public void initializeIfNeeded() {
        if (this.state() == Task.State.CREATED) {
            StateManagerUtil.registerStateStores(this.log, this.logPrefix, this.topology, this.stateMgr, this.stateDirectory, this.processorContext);
            this.offsetSnapshotSinceLastFlush = Collections.emptyMap();
            this.transitionTo(Task.State.RESTORING);
            this.transitionTo(Task.State.RUNNING);
            this.processorContext.initialize();
            this.log.info("Initialized");
        } else if (this.state() == Task.State.RESTORING) {
            throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while initializing standby task " + this.id);
        }
    }

    @Override
    public void completeRestoration(Consumer<Set<TopicPartition>> offsetResetter) {
        throw new IllegalStateException("Standby task " + this.id + " should never be completing restoration");
    }

    @Override
    public void suspend() {
        switch (this.state()) {
            case CREATED: {
                this.log.info("Suspended created");
                this.transitionTo(Task.State.SUSPENDED);
                break;
            }
            case RUNNING: {
                this.log.info("Suspended running");
                this.transitionTo(Task.State.SUSPENDED);
                break;
            }
            case SUSPENDED: {
                this.log.info("Skip suspending since state is {}", (Object)this.state());
                break;
            }
            case RESTORING: 
            case CLOSED: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while suspending standby task " + this.id);
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while suspending standby task " + this.id);
            }
        }
    }

    @Override
    public void resume() {
        if (this.state() == Task.State.RESTORING) {
            throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while resuming standby task " + this.id);
        }
        this.log.trace("No-op resume with state {}", (Object)this.state());
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
        switch (this.state()) {
            case CREATED: {
                this.log.debug("Skipped preparing created task for commit");
                break;
            }
            case RUNNING: 
            case SUSPENDED: {
                this.log.debug("Prepared {} task for committing", (Object)this.state());
                break;
            }
            default: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while preparing standby task " + this.id + " for committing ");
            }
        }
        return Collections.emptyMap();
    }

    @Override
    public void postCommit(boolean enforceCheckpoint) {
        switch (this.state()) {
            case CREATED: {
                this.log.debug("Skipped writing checkpoint for created task");
                break;
            }
            case RUNNING: 
            case SUSPENDED: {
                this.maybeWriteCheckpoint(enforceCheckpoint);
                this.log.debug("Finalized commit for {} task", (Object)this.state());
                break;
            }
            default: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while post committing standby task " + this.id);
            }
        }
    }

    @Override
    public void closeClean() {
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        this.close(true);
        this.log.info("Closed clean");
    }

    @Override
    public void closeDirty() {
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        this.close(false);
        this.log.info("Closed dirty");
    }

    @Override
    public void closeCleanAndRecycleState() {
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        if (this.state() != Task.State.SUSPENDED) {
            throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while closing standby task " + this.id);
        }
        this.stateMgr.recycle();
        this.closeTaskSensor.record();
        this.transitionTo(Task.State.CLOSED);
        this.log.info("Closed clean and recycled state");
    }

    private void close(boolean clean) {
        switch (this.state()) {
            case SUSPENDED: {
                TaskManager.executeAndMaybeSwallow(clean, () -> StateManagerUtil.closeStateManager(this.log, this.logPrefix, clean, this.eosEnabled, this.stateMgr, this.stateDirectory, Task.TaskType.STANDBY), "state manager close", this.log);
                break;
            }
            case CLOSED: {
                this.log.trace("Skip closing since state is {}", (Object)this.state());
                return;
            }
            case CREATED: 
            case RUNNING: 
            case RESTORING: {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.state()) + " while closing standby task " + this.id);
            }
            default: {
                throw new IllegalStateException("Unknown state " + (Object)((Object)this.state()) + " while closing standby task " + this.id);
            }
        }
        this.closeTaskSensor.record();
        this.transitionTo(Task.State.CLOSED);
    }

    @Override
    public boolean commitNeeded() {
        return StateManagerUtil.checkpointNeeded(false, this.offsetSnapshotSinceLastFlush, this.stateMgr.changelogOffsets());
    }

    @Override
    public Map<TopicPartition, Long> changelogOffsets() {
        return Collections.unmodifiableMap(this.stateMgr.changelogOffsets());
    }

    @Override
    public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
        throw new IllegalStateException("Attempted to add records to task " + this.id() + " for invalid input partition " + partition);
    }

    InternalProcessorContext processorContext() {
        return this.processorContext;
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder sb = new StringBuilder();
        sb.append(indent);
        sb.append("TaskId: ");
        sb.append(this.id);
        sb.append("\n");
        if (this.topology != null) {
            sb.append(indent).append(this.topology.toString(indent + "\t"));
        }
        return sb.toString();
    }
}

