/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Objects;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener;
import org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KTableSource<KIn, VIn>
implements ProcessorSupplier<KIn, VIn, KIn, Change<VIn>> {
    private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
    private final String storeName;
    private String queryableName;
    private boolean sendOldValues;

    public KTableSource(String storeName, String queryableName) {
        Objects.requireNonNull(storeName, "storeName can't be null");
        this.storeName = storeName;
        this.queryableName = queryableName;
        this.sendOldValues = false;
    }

    public String queryableName() {
        return this.queryableName;
    }

    @Override
    public Processor<KIn, VIn, KIn, Change<VIn>> get() {
        return new KTableSourceProcessor();
    }

    public void enableSendingOldValues() {
        this.sendOldValues = true;
        this.queryableName = this.storeName;
    }

    public void materialize() {
        this.queryableName = this.storeName;
    }

    public boolean materialized() {
        return this.queryableName != null;
    }

    private class KTableSourceProcessor
    implements Processor<KIn, VIn, KIn, Change<VIn>> {
        private ProcessorContext<KIn, Change<VIn>> context;
        private TimestampedKeyValueStore<KIn, VIn> store;
        private TimestampedTupleForwarder<KIn, VIn> tupleForwarder;
        private Sensor droppedRecordsSensor;

        private KTableSourceProcessor() {
        }

        @Override
        public void init(ProcessorContext<KIn, Change<VIn>> context) {
            this.context = context;
            StreamsMetricsImpl metrics = (StreamsMetricsImpl)context.metrics();
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
            if (KTableSource.this.queryableName != null) {
                this.store = (TimestampedKeyValueStore)context.getStateStore(KTableSource.this.queryableName);
                this.tupleForwarder = new TimestampedTupleForwarder(this.store, context, new TimestampedCacheFlushListener(context), KTableSource.this.sendOldValues);
            }
        }

        @Override
        public void process(Record<KIn, VIn> record) {
            if (record.key() == null) {
                if (this.context.recordMetadata().isPresent()) {
                    RecordMetadata recordMetadata = this.context.recordMetadata().get();
                    LOG.warn("Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
                } else {
                    LOG.warn("Skipping record due to null key. Topic, partition, and offset not known.");
                }
                this.droppedRecordsSensor.record();
                return;
            }
            if (KTableSource.this.queryableName != null) {
                Object oldValue;
                ValueAndTimestamp oldValueAndTimestamp = (ValueAndTimestamp)this.store.get(record.key());
                if (oldValueAndTimestamp != null) {
                    oldValue = oldValueAndTimestamp.value();
                    if (record.timestamp() < oldValueAndTimestamp.timestamp()) {
                        if (this.context.recordMetadata().isPresent()) {
                            RecordMetadata recordMetadata = this.context.recordMetadata().get();
                            LOG.warn("Detected out-of-order KTable update for {}, old timestamp=[{}] new timestamp=[{}]. topic=[{}] partition=[{}] offset=[{}].", new Object[]{this.store.name(), oldValueAndTimestamp.timestamp(), record.timestamp(), recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
                        } else {
                            LOG.warn("Detected out-of-order KTable update for {}, old timestamp=[{}] new timestamp=[{}]. Topic, partition and offset not known.", new Object[]{this.store.name(), oldValueAndTimestamp.timestamp(), record.timestamp()});
                        }
                    }
                } else {
                    oldValue = null;
                }
                this.store.put(record.key(), ValueAndTimestamp.make(record.value(), record.timestamp()));
                this.tupleForwarder.maybeForward(record.withValue(new Change<Object>(record.value(), oldValue)));
            } else {
                this.context.forward(record.withValue(new Change<Object>(record.value(), null)));
            }
        }
    }
}

