package org.springframework.cloud.stream.binder.kafka.streams;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsMessageConversionDelegate.class */
public class KafkaStreamsMessageConversionDelegate {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsMessageConversionDelegate.class);
    private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal<>();
    private final CompositeMessageConverter compositeMessageConverter;
    private final SendToDlqAndContinue sendToDlqAndContinue;
    private final KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue;
    private final KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties;
    Exception[] failedWithDeserException = new Exception[1];

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsMessageConversionDelegate$PerRecordContentTypeHolder.class */
    public static class PerRecordContentTypeHolder {
        String contentType;

        private PerRecordContentTypeHolder() {
        }

        void setContentType(String str) {
            this.contentType = str;
        }

        void unsetContentType() {
            this.contentType = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreamsMessageConversionDelegate(CompositeMessageConverter compositeMessageConverter, SendToDlqAndContinue sendToDlqAndContinue, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
        this.compositeMessageConverter = compositeMessageConverter;
        this.sendToDlqAndContinue = sendToDlqAndContinue;
        this.kstreamBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.kstreamBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
    }

    public KStream serializeOnOutbound(KStream<?, ?> kStream) {
        String contentType = this.kstreamBindingInformationCatalogue.getContentType(kStream);
        CompositeMessageConverter compositeMessageConverter = this.compositeMessageConverter;
        ThreadLocal withInitial = ThreadLocal.withInitial(PerRecordContentTypeHolder::new);
        KStream mapValues = kStream.filter((obj, obj2) -> {
            return obj2 != null;
        }).mapValues(obj3 -> {
            Message build = obj3 instanceof Message ? (Message) obj3 : MessageBuilder.withPayload(obj3).build();
            HashMap hashMap = new HashMap((Map) build.getHeaders());
            if (StringUtils.hasText(contentType)) {
                hashMap.put("contentType", contentType);
            }
            MessageHeaders messageHeaders = new MessageHeaders(hashMap);
            Message message = compositeMessageConverter.toMessage(build.getPayload(), messageHeaders);
            ((PerRecordContentTypeHolder) withInitial.get()).setContentType((String) messageHeaders.get("contentType"));
            return ((Message) Objects.requireNonNull(message)).getPayload();
        });
        mapValues.process(() -> {
            return new Processor() { // from class: org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.1
                ProcessorContext context;

                public void init(ProcessorContext processorContext) {
                    this.context = processorContext;
                }

                public void process(Record record) {
                    if (((PerRecordContentTypeHolder) withInitial.get()).contentType != null) {
                        record.headers().remove("contentType");
                        try {
                            record.headers().add(new RecordHeader("contentType", new ObjectMapper().writeValueAsBytes(((PerRecordContentTypeHolder) withInitial.get()).contentType)));
                        } catch (Exception e) {
                            if (KafkaStreamsMessageConversionDelegate.LOG.isDebugEnabled()) {
                                KafkaStreamsMessageConversionDelegate.LOG.debug("Could not add content type header");
                            }
                        }
                        ((PerRecordContentTypeHolder) withInitial.get()).unsetContentType();
                    }
                }

                public void close() {
                }
            };
        }, new String[0]);
        return mapValues;
    }

    public KStream deserializeOnInbound(Class<?> cls, KStream<?, ?> kStream) {
        CompositeMessageConverter compositeMessageConverter = this.compositeMessageConverter;
        PerRecordContentTypeHolder perRecordContentTypeHolder = new PerRecordContentTypeHolder();
        resolvePerRecordContentType(kStream, perRecordContentTypeHolder);
        KStream<?, ?>[] kStreamArr = (KStream[]) kStream.split().branch((obj, obj2) -> {
            Message<?> build;
            boolean z = false;
            try {
                if (obj2 != null) {
                    if ((obj2 instanceof Message) || (obj2 instanceof String) || (obj2 instanceof byte[])) {
                        if (obj2 instanceof Message) {
                            Message<?> message = (Message) obj2;
                            build = perRecordContentTypeHolder.contentType != null ? MessageBuilder.fromMessage(message).setHeader("contentType", perRecordContentTypeHolder.contentType).build() : message;
                        } else {
                            build = perRecordContentTypeHolder.contentType != null ? MessageBuilder.withPayload(obj2).setHeader("contentType", perRecordContentTypeHolder.contentType).build() : MessageBuilder.withPayload(obj2).build();
                        }
                        convertAndSetMessage(obj, cls, compositeMessageConverter, build);
                    } else {
                        keyValueThreadLocal.set(new KeyValue<>(obj, obj2));
                    }
                    z = true;
                } else {
                    LOG.info("Received a tombstone record. This will be skipped from further processing.");
                }
            } catch (Exception e) {
                LOG.warn("Deserialization has failed. This will be skipped from further processing.", e);
                this.failedWithDeserException[0] = e;
            }
            return z;
        }).branch((obj3, obj4) -> {
            return true;
        }).noDefaultBranch().values().toArray(new KStream[0]);
        processErrorFromDeserialization(kStream, kStreamArr[1], this.failedWithDeserException);
        return kStreamArr[0].mapValues(obj5 -> {
            Object obj5 = keyValueThreadLocal.get().value;
            keyValueThreadLocal.remove();
            return obj5;
        });
    }

    private void resolvePerRecordContentType(KStream<?, ?> kStream, PerRecordContentTypeHolder perRecordContentTypeHolder) {
        kStream.process(() -> {
            return new Processor() { // from class: org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.2
                ProcessorContext context;

                public void init(ProcessorContext processorContext) {
                    this.context = processorContext;
                }

                public void process(Record record) {
                    Iterable headers = record.headers().headers("contentType");
                    if (headers == null || !headers.iterator().hasNext()) {
                        return;
                    }
                    perRecordContentTypeHolder.setContentType(StringUtils.replace(new String(((Header) headers.iterator().next()).value()), "\"", ""));
                }

                public void close() {
                }
            };
        }, new String[0]);
    }

    private void convertAndSetMessage(Object obj, Class<?> cls, MessageConverter messageConverter, Message<?> message) {
        Object payload = cls.isAssignableFrom(message.getPayload().getClass()) ? message.getPayload() : messageConverter.fromMessage(message, cls);
        Assert.notNull(payload, "Failed to convert message " + message);
        keyValueThreadLocal.set(new KeyValue<>(obj, payload));
    }

    private void processErrorFromDeserialization(KStream<?, ?> kStream, KStream<?, ?> kStream2, Exception[] excArr) {
        kStream2.process(() -> {
            return new Processor() { // from class: org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.3
                ProcessorContext context;

                public void init(ProcessorContext processorContext) {
                    this.context = processorContext;
                }

                public void process(Record record) {
                    Object key = record.key();
                    Object value = record.value();
                    if (value != null) {
                        if (!KafkaStreamsMessageConversionDelegate.this.kstreamBindingInformationCatalogue.isDlqEnabled(kStream)) {
                            if (KafkaStreamsMessageConversionDelegate.this.kstreamBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndFail) {
                                throw new IllegalStateException("Inbound deserialization failed. Stopping further processing of records.");
                            }
                            if (KafkaStreamsMessageConversionDelegate.this.kstreamBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndContinue) {
                                KafkaStreamsMessageConversionDelegate.LOG.error("Inbound deserialization failed. Skipping this record and continuing.");
                                return;
                            }
                            return;
                        }
                        if (!(value instanceof Message)) {
                            RecordMetadata recordMetadata = (RecordMetadata) this.context.recordMetadata().get();
                            KafkaStreamsMessageConversionDelegate.this.sendToDlqAndContinue.sendToDlq(new ConsumerRecord<>(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), key, value), excArr[0]);
                            return;
                        }
                        Message message = (Message) value;
                        byte[] serialize = KafkaStreamsMessageConversionDelegate.this.kstreamBindingInformationCatalogue.getKeySerde(kStream).serializer().serialize((String) null, key);
                        if (this.context.recordMetadata().isPresent()) {
                            RecordMetadata recordMetadata2 = (RecordMetadata) this.context.recordMetadata().get();
                            KafkaStreamsMessageConversionDelegate.this.sendToDlqAndContinue.sendToDlq(new ConsumerRecord<>(recordMetadata2.topic(), recordMetadata2.partition(), recordMetadata2.offset(), serialize, message.getPayload()), excArr[0]);
                        }
                    }
                }

                public void close() {
                }
            };
        }, new String[0]);
    }
}
