/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Arrays;
import java.util.function.Function;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V>
implements ProcessorSupplier<K, Change<V>> {
    private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
    private final Function<V, KO> foreignKeyExtractor;
    private final String foreignKeySerdeTopic;
    private final String valueSerdeTopic;
    private final boolean leftJoin;
    private Serializer<KO> foreignKeySerializer;
    private Serializer<V> valueSerializer;

    public ForeignJoinSubscriptionSendProcessorSupplier(Function<V, KO> foreignKeyExtractor, String foreignKeySerdeTopic, String valueSerdeTopic, Serde<KO> foreignKeySerde, Serializer<V> valueSerializer, boolean leftJoin) {
        this.foreignKeyExtractor = foreignKeyExtractor;
        this.foreignKeySerdeTopic = foreignKeySerdeTopic;
        this.valueSerdeTopic = valueSerdeTopic;
        this.valueSerializer = valueSerializer;
        this.leftJoin = leftJoin;
        this.foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
    }

    @Override
    public Processor<K, Change<V>> get() {
        return new UnbindChangeProcessor();
    }

    private class UnbindChangeProcessor
    extends AbstractProcessor<K, Change<V>> {
        private Sensor droppedRecordsSensor;

        private UnbindChangeProcessor() {
        }

        @Override
        public void init(ProcessorContext context) {
            super.init(context);
            if (ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer == null) {
                ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer = context.keySerde().serializer();
            }
            if (ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerializer == null) {
                ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerializer = context.valueSerde().serializer();
            }
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), (StreamsMetricsImpl)context.metrics());
        }

        @Override
        public void process(K key, Change<V> change) {
            long[] currentHash;
            long[] lArray = currentHash = change.newValue == null ? null : Murmur3.hash128(ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerializer.serialize(ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerdeTopic, change.newValue));
            if (change.oldValue != null) {
                Object oldForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeyExtractor.apply(change.oldValue);
                if (oldForeignKey == null) {
                    LOG.warn("Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", new Object[]{change.oldValue, this.context().topic(), this.context().partition(), this.context().offset()});
                    this.droppedRecordsSensor.record();
                    return;
                }
                if (change.newValue != null) {
                    Object newForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeyExtractor.apply(change.newValue);
                    if (newForeignKey == null) {
                        LOG.warn("Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", new Object[]{change.newValue, this.context().topic(), this.context().partition(), this.context().offset()});
                        this.droppedRecordsSensor.record();
                        return;
                    }
                    byte[] serialOldForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer.serialize(ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerdeTopic, oldForeignKey);
                    byte[] serialNewForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer.serialize(ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerdeTopic, newForeignKey);
                    if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) {
                        this.context().forward(oldForeignKey, new SubscriptionWrapper(currentHash, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, key));
                    }
                    this.context().forward(newForeignKey, new SubscriptionWrapper(currentHash, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, key));
                } else {
                    this.context().forward(oldForeignKey, new SubscriptionWrapper(currentHash, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, key));
                }
            } else if (change.newValue != null) {
                SubscriptionWrapper.Instruction instruction = ForeignJoinSubscriptionSendProcessorSupplier.this.leftJoin ? SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE : SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
                Object newForeignKey = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeyExtractor.apply(change.newValue);
                if (newForeignKey == null) {
                    LOG.warn("Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", new Object[]{change.newValue, this.context().topic(), this.context().partition(), this.context().offset()});
                    this.droppedRecordsSensor.record();
                } else {
                    this.context().forward(newForeignKey, new SubscriptionWrapper(currentHash, instruction, key));
                }
            }
        }
    }
}

