/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.PunctuationQueue;
import org.apache.kafka.streams.processor.internals.PunctuationSchedule;
import org.apache.kafka.streams.processor.internals.Punctuator;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StampedRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamTask
extends AbstractTask
implements Punctuator {
    private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord("__null_topic__", -1, -1L, null, null);
    private final int maxBufferedSize;
    private final PartitionGroup partitionGroup;
    private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
    private final PunctuationQueue punctuationQueue = new PunctuationQueue();
    private final Map<TopicPartition, Long> consumedOffsets;
    private final RecordCollector recordCollector;
    private boolean commitRequested = false;
    private boolean commitOffsetNeeded = false;
    private StampedRecord currRecord = null;
    private ProcessorNode currNode = null;
    private boolean requiresPoll = true;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamTask(TaskId id, String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config, StreamsMetrics metrics) {
        super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, false);
        this.maxBufferedSize = config.getInt("buffered.records.per.partition");
        HashMap<TopicPartition, RecordQueue> partitionQueues = new HashMap<TopicPartition, RecordQueue>();
        for (TopicPartition partition : partitions) {
            SourceNode source = topology.source(partition.topic());
            RecordQueue queue = this.createRecordQueue(partition, source);
            partitionQueues.put(partition, queue);
        }
        TimestampExtractor timestampExtractor = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);
        this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
        this.consumedOffsets = new HashMap<TopicPartition, Long>();
        this.recordCollector = new RecordCollector(producer);
        log.info("Creating restoration consumer client for stream task #" + this.id());
        this.processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, this.stateMgr, metrics);
        this.initializeStateStores();
        Iterator<ProcessorNode> i$ = this.topology.processors().iterator();
        while (i$.hasNext()) {
            ProcessorNode node;
            this.currNode = node = i$.next();
            try {
                node.init(this.processorContext);
            }
            finally {
                this.currNode = null;
            }
        }
        ((ProcessorContextImpl)this.processorContext).initialized();
    }

    public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
        int queueSize = this.partitionGroup.addRawRecords(partition, records);
        if (queueSize > this.maxBufferedSize) {
            this.consumer.pause(Collections.singleton(partition));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int process() {
        StreamTask streamTask = this;
        synchronized (streamTask) {
            StampedRecord record = this.partitionGroup.nextRecord(this.recordInfo);
            if (record == null) {
                this.requiresPoll = true;
                return 0;
            }
            this.requiresPoll = false;
            try {
                this.currRecord = record;
                this.currNode = this.recordInfo.node();
                TopicPartition partition = this.recordInfo.partition();
                log.debug("Start processing one record [{}]", (Object)this.currRecord);
                this.currNode.process(this.currRecord.key(), this.currRecord.value());
                log.debug("Completed processing one record [{}]", (Object)this.currRecord);
                this.consumedOffsets.put(partition, this.currRecord.offset());
                this.commitOffsetNeeded = true;
                if (this.recordInfo.queue().size() == this.maxBufferedSize) {
                    this.consumer.resume(Collections.singleton(partition));
                    this.requiresPoll = true;
                }
                if (this.partitionGroup.topQueueSize() <= this.maxBufferedSize) {
                    this.requiresPoll = true;
                }
            }
            finally {
                this.currRecord = null;
                this.currNode = null;
            }
            return this.partitionGroup.numBuffered();
        }
    }

    public boolean requiresPoll() {
        return this.requiresPoll;
    }

    public boolean maybePunctuate() {
        long timestamp = this.partitionGroup.timestamp();
        if (timestamp == -1L) {
            return false;
        }
        return this.punctuationQueue.mayPunctuate(timestamp, this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void punctuate(ProcessorNode node, long timestamp) {
        if (this.currNode != null) {
            throw new IllegalStateException("Current node is not null");
        }
        this.currNode = node;
        this.currRecord = new StampedRecord(DUMMY_RECORD, timestamp);
        try {
            node.processor().punctuate(timestamp);
        }
        finally {
            this.currNode = null;
            this.currRecord = null;
        }
    }

    public StampedRecord record() {
        return this.currRecord;
    }

    public ProcessorNode node() {
        return this.currNode;
    }

    @Override
    public void commit() {
        this.stateMgr.flush();
        this.recordCollector.flush();
        if (this.commitOffsetNeeded) {
            HashMap<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<TopicPartition, OffsetAndMetadata>(this.consumedOffsets.size());
            for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
                TopicPartition partition = entry.getKey();
                long offset = entry.getValue() + 1L;
                consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
                this.stateMgr.putOffsetLimit(partition, offset);
            }
            this.consumer.commitSync(consumedOffsetsAndMetadata);
            this.commitOffsetNeeded = false;
        }
        this.commitRequested = false;
    }

    public boolean commitNeeded() {
        return this.commitRequested;
    }

    public void needCommit() {
        this.commitRequested = true;
    }

    public void schedule(long interval) {
        if (this.currNode == null) {
            throw new IllegalStateException("Current node is null");
        }
        this.punctuationQueue.schedule(new PunctuationSchedule(this.currNode, interval));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        this.partitionGroup.close();
        this.consumedOffsets.clear();
        RuntimeException exception = null;
        Iterator<ProcessorNode> i$ = this.topology.processors().iterator();
        while (i$.hasNext()) {
            ProcessorNode node;
            this.currNode = node = i$.next();
            try {
                node.close();
            }
            catch (RuntimeException e) {
                exception = e;
            }
            finally {
                this.currNode = null;
            }
        }
        super.close();
        if (exception != null) {
            throw exception;
        }
    }

    @Override
    protected Map<TopicPartition, Long> recordCollectorOffsets() {
        return this.recordCollector.offsets();
    }

    private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
        return new RecordQueue(partition, source);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> void forward(K key, V value) {
        ProcessorNode thisNode = this.currNode;
        try {
            Iterator<ProcessorNode<?, ?>> i$ = thisNode.children().iterator();
            while (i$.hasNext()) {
                ProcessorNode<?, ?> childNode;
                this.currNode = childNode = i$.next();
                childNode.process(key, value);
            }
        }
        finally {
            this.currNode = thisNode;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> void forward(K key, V value, int childIndex) {
        ProcessorNode<?, ?> childNode;
        ProcessorNode thisNode = this.currNode;
        this.currNode = childNode = thisNode.children().get(childIndex);
        try {
            childNode.process(key, value);
        }
        finally {
            this.currNode = thisNode;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> void forward(K key, V value, String childName) {
        ProcessorNode thisNode = this.currNode;
        for (ProcessorNode<?, ?> childNode : thisNode.children()) {
            if (!childNode.name().equals(childName)) continue;
            this.currNode = childNode;
            try {
                childNode.process(key, value);
                break;
            }
            finally {
                this.currNode = thisNode;
            }
        }
    }
}

