/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener.adapter;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.WildcardType;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.expression.MapAccessor;
import org.springframework.core.MethodParameter;
import org.springframework.expression.BeanResolver;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ParserContext;
import org.springframework.expression.PropertyAccessor;
import org.springframework.expression.TypeConverter;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeConverter;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.adapter.HandlerAdapter;
import org.springframework.kafka.listener.adapter.InvocationResult;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public abstract class MessagingMessageListenerAdapter<K, V>
implements ConsumerSeekAware {
    private static final SpelExpressionParser PARSER = new SpelExpressionParser();
    private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
    private final Object bean;
    protected final Log logger = LogFactory.getLog(this.getClass());
    private final Type inferredType;
    private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
    private HandlerAdapter handlerMethod;
    private boolean isConsumerRecordList;
    private boolean isConsumerRecords;
    private boolean isMessageList;
    private RecordMessageConverter messageConverter = new MessagingMessageConverter();
    private Type fallbackType = Object.class;
    private Expression replyTopicExpression;
    private KafkaTemplate replyTemplate;
    private boolean hasAckParameter;
    private boolean messageReturnType;

    public MessagingMessageListenerAdapter(Object bean, Method method) {
        this.bean = bean;
        this.inferredType = this.determineInferredType(method);
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    protected final RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    protected Type getType() {
        return this.inferredType == null ? this.fallbackType : this.inferredType;
    }

    public void setFallbackType(Class<?> fallbackType) {
        this.fallbackType = fallbackType;
    }

    public void setHandlerMethod(HandlerAdapter handlerMethod) {
        this.handlerMethod = handlerMethod;
    }

    protected boolean isConsumerRecordList() {
        return this.isConsumerRecordList;
    }

    public boolean isConsumerRecords() {
        return this.isConsumerRecords;
    }

    public void setReplyTopic(String replyTopicParam) {
        String replyTopic = replyTopicParam;
        if (!StringUtils.hasText((String)replyTopic)) {
            replyTopic = PARSER_CONTEXT.getExpressionPrefix() + "source.headers['" + "kafka_replyTopic" + "']" + PARSER_CONTEXT.getExpressionSuffix();
        }
        this.replyTopicExpression = replyTopic.contains(PARSER_CONTEXT.getExpressionPrefix()) ? PARSER.parseExpression(replyTopic, PARSER_CONTEXT) : new LiteralExpression(replyTopic);
    }

    public void setReplyTemplate(KafkaTemplate<?, ?> replyTemplate) {
        this.replyTemplate = replyTemplate;
    }

    public void setBeanResolver(BeanResolver beanResolver) {
        this.evaluationContext.setBeanResolver(beanResolver);
        this.evaluationContext.setTypeConverter((TypeConverter)new StandardTypeConverter());
        this.evaluationContext.addPropertyAccessor((PropertyAccessor)new MapAccessor());
    }

    protected boolean isMessageList() {
        return this.isMessageList;
    }

    @Override
    public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback) {
        if (this.bean instanceof ConsumerSeekAware) {
            ((ConsumerSeekAware)this.bean).registerSeekCallback(callback);
        }
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
        if (this.bean instanceof ConsumerSeekAware) {
            ((ConsumerSeekAware)this.bean).onPartitionsAssigned(assignments, callback);
        }
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
        if (this.bean instanceof ConsumerSeekAware) {
            ((ConsumerSeekAware)this.bean).onIdleContainer(assignments, callback);
        }
    }

    protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        return this.getMessageConverter().toMessage(record, acknowledgment, consumer, this.getType());
    }

    protected final Object invokeHandler(Object data, Acknowledgment acknowledgment, Message<?> message, Consumer<?, ?> consumer) {
        try {
            if (data instanceof List && !this.isConsumerRecordList) {
                return this.handlerMethod.invoke(message, acknowledgment, consumer);
            }
            return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
        }
        catch (MessageConversionException ex) {
            if (this.hasAckParameter && acknowledgment == null) {
                throw new ListenerExecutionFailedException("invokeHandler Failed", new IllegalStateException("No Acknowledgment available as an argument, the listener container must have a MANUAL Ackmode to populate the Acknowledgment.", ex));
            }
            throw new ListenerExecutionFailedException(this.createMessagingErrorMessage("Listener method could not be invoked with the incoming message", message.getPayload()), new MessageConversionException("Cannot handle message", (Throwable)ex));
        }
        catch (MessagingException ex) {
            throw new ListenerExecutionFailedException(this.createMessagingErrorMessage("Listener method could not be invoked with the incoming message", message.getPayload()), ex);
        }
        catch (Exception ex) {
            throw new ListenerExecutionFailedException("Listener method '" + this.handlerMethod.getMethodAsString(message.getPayload()) + "' threw exception", ex);
        }
    }

    protected void handleResult(Object resultArg, Object request, Object source) {
        boolean isInvocationResult;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Listener method returned result [" + resultArg + "] - generating response message for it"));
        }
        Object result = (isInvocationResult = resultArg instanceof InvocationResult) ? ((InvocationResult)resultArg).getResult() : resultArg;
        String replyTopic = this.evaluateReplyTopic(request, source, resultArg);
        Assert.state((replyTopic == null || this.replyTemplate != null ? 1 : 0) != 0, (String)"a KafkaTemplate is required to support replies");
        this.sendResponse(result, replyTopic, source, isInvocationResult ? ((InvocationResult)resultArg).isMessageReturnType() : this.messageReturnType);
    }

    private String evaluateReplyTopic(Object request, Object source, Object result) {
        String replyTo = null;
        if (result instanceof InvocationResult) {
            replyTo = this.evaluateTopic(request, source, result, ((InvocationResult)result).getSendTo());
        } else if (this.replyTopicExpression != null) {
            replyTo = this.evaluateTopic(request, source, result, this.replyTopicExpression);
        }
        return replyTo;
    }

    private String evaluateTopic(Object request, Object source, Object result, Expression sendTo) {
        if (sendTo instanceof LiteralExpression) {
            return (String)sendTo.getValue(String.class);
        }
        Object value = sendTo == null ? null : sendTo.getValue((EvaluationContext)this.evaluationContext, (Object)new ReplyExpressionRoot(request, source, result));
        boolean isByteArray = value instanceof byte[];
        if (value != null && !(value instanceof String) && !isByteArray) {
            throw new IllegalStateException("replyTopic expression must evaluate to a String or byte[], it is: " + value.getClass().getName());
        }
        if (isByteArray) {
            return new String((byte[])value, StandardCharsets.UTF_8);
        }
        return (String)value;
    }

    @Deprecated
    protected void sendResponse(Object result, String topic) {
        this.sendResponse(result, topic, null, false);
    }

    protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {
        if (!messageReturnType && topic == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("No replyTopic to handle the reply: " + result));
            }
        } else if (result instanceof Message) {
            this.replyTemplate.send((Message)result);
        } else if (result instanceof Collection) {
            ((Collection)result).forEach(v -> {
                if (v instanceof Message) {
                    this.replyTemplate.send((Message)v);
                } else {
                    this.replyTemplate.send(topic, v);
                }
            });
        } else {
            byte[] correlationId = null;
            boolean sourceIsMessage = source instanceof Message;
            if (sourceIsMessage && ((Message)source).getHeaders().get((Object)"kafka_correlationId") != null) {
                correlationId = (byte[])((Message)source).getHeaders().get((Object)"kafka_correlationId", byte[].class);
            }
            if (sourceIsMessage) {
                MessageBuilder builder = MessageBuilder.withPayload((Object)result).setHeader("kafka_topic", (Object)topic);
                if (correlationId != null) {
                    builder.setHeader("kafka_correlationId", (Object)correlationId);
                }
                this.setPartition((MessageBuilder<Object>)builder, (Message)source);
                this.replyTemplate.send(builder.build());
            } else {
                this.replyTemplate.send(topic, result);
            }
        }
    }

    private void setPartition(MessageBuilder<Object> builder, Message<?> source) {
        byte[] partitionBytes = (byte[])source.getHeaders().get((Object)"kafka_replyPartition", byte[].class);
        if (partitionBytes != null) {
            builder.setHeader("kafka_partitionId", (Object)ByteBuffer.wrap(partitionBytes).getInt());
        }
    }

    protected final String createMessagingErrorMessage(String description, Object payload) {
        return description + "\nEndpoint handler details:\nMethod [" + this.handlerMethod.getMethodAsString(payload) + "]\nBean [" + this.handlerMethod.getBean() + "]";
    }

    protected Type determineInferredType(Method method) {
        boolean validParametersForBatch;
        if (method == null) {
            return null;
        }
        Type genericParameterType = null;
        boolean hasConsumerParameter = false;
        for (int i = 0; i < method.getParameterCount(); ++i) {
            Type parameterType;
            MethodParameter methodParameter = new MethodParameter(method, i);
            if (this.eligibleParameter(methodParameter) && (methodParameter.getParameterAnnotations().length == 0 || methodParameter.hasParameterAnnotation(Payload.class))) {
                if (genericParameterType == null) {
                    genericParameterType = methodParameter.getGenericParameterType();
                    if (!(genericParameterType instanceof ParameterizedType)) continue;
                    ParameterizedType parameterizedType = (ParameterizedType)genericParameterType;
                    if (parameterizedType.getRawType().equals(Message.class)) {
                        genericParameterType = ((ParameterizedType)genericParameterType).getActualTypeArguments()[0];
                        continue;
                    }
                    if (parameterizedType.getRawType().equals(List.class) && parameterizedType.getActualTypeArguments().length == 1) {
                        Type paramType = parameterizedType.getActualTypeArguments()[0];
                        this.isConsumerRecordList = paramType.equals(ConsumerRecord.class) || paramType instanceof ParameterizedType && ((ParameterizedType)paramType).getRawType().equals(ConsumerRecord.class) || paramType instanceof WildcardType && ((WildcardType)paramType).getUpperBounds() != null && ((WildcardType)paramType).getUpperBounds().length > 0 && ((WildcardType)paramType).getUpperBounds()[0] instanceof ParameterizedType && ((ParameterizedType)((WildcardType)paramType).getUpperBounds()[0]).getRawType().equals(ConsumerRecord.class);
                        boolean messageHasGeneric = paramType instanceof ParameterizedType && ((ParameterizedType)paramType).getRawType().equals(Message.class);
                        boolean bl = this.isMessageList = paramType.equals(Message.class) || messageHasGeneric;
                        if (!messageHasGeneric) continue;
                        genericParameterType = ((ParameterizedType)paramType).getActualTypeArguments()[0];
                        continue;
                    }
                    this.isConsumerRecords = parameterizedType.getRawType().equals(ConsumerRecords.class);
                    continue;
                }
                if (!this.logger.isDebugEnabled()) break;
                this.logger.debug((Object)("Ambiguous parameters for target payload for method " + method + "; no inferred type available"));
                break;
            }
            if (methodParameter.getGenericParameterType().equals(Acknowledgment.class)) {
                this.hasAckParameter = true;
                continue;
            }
            hasConsumerParameter = methodParameter.getGenericParameterType().equals(Consumer.class) ? true : (parameterType = methodParameter.getGenericParameterType()) instanceof ParameterizedType && ((ParameterizedType)parameterType).getRawType().equals(Consumer.class);
        }
        if (!(validParametersForBatch = this.validParametersForBatch(method.getGenericParameterTypes().length, this.hasAckParameter, hasConsumerParameter))) {
            String stateMessage = "A parameter of type '%s' must be the only parameter (except for an optional 'Acknowledgment' and/or 'Consumer')";
            Assert.state((!this.isConsumerRecords ? 1 : 0) != 0, () -> String.format(stateMessage, "ConsumerRecords"));
            Assert.state((!this.isConsumerRecordList ? 1 : 0) != 0, () -> String.format(stateMessage, "List<ConsumerRecord>"));
            Assert.state((!this.isMessageList ? 1 : 0) != 0, () -> String.format(stateMessage, "List<Message<?>>"));
        }
        this.messageReturnType = KafkaUtils.returnTypeMessageOrCollectionOf(method);
        return genericParameterType;
    }

    private boolean validParametersForBatch(int parameterCount, boolean hasAck, boolean hasConsumer) {
        if (hasAck) {
            return parameterCount == 2 || hasConsumer && parameterCount == 3;
        }
        if (hasConsumer) {
            return parameterCount == 2;
        }
        return parameterCount == 1;
    }

    private boolean eligibleParameter(MethodParameter methodParameter) {
        Type parameterType = methodParameter.getGenericParameterType();
        if (parameterType.equals(Acknowledgment.class) || parameterType.equals(ConsumerRecord.class) || parameterType.equals(Consumer.class)) {
            return false;
        }
        if (parameterType instanceof ParameterizedType) {
            ParameterizedType parameterizedType = (ParameterizedType)parameterType;
            Type rawType = parameterizedType.getRawType();
            if (rawType.equals(ConsumerRecord.class) || rawType.equals(Consumer.class)) {
                return false;
            }
            if (rawType.equals(Message.class)) {
                return !(parameterizedType.getActualTypeArguments()[0] instanceof WildcardType);
            }
        }
        return !parameterType.equals(Message.class);
    }

    public static final class ReplyExpressionRoot {
        private final Object request;
        private final Object source;
        private final Object result;

        public ReplyExpressionRoot(Object request, Object source, Object result) {
            this.request = request;
            this.source = source;
            this.result = result;
        }

        public Object getRequest() {
            return this.request;
        }

        public Object getSource() {
            return this.source;
        }

        public Object getResult() {
            return this.result;
        }
    }
}

