package org.springframework.integration.kafka.inbound;

import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringBatchAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-2.1.0.RELEASE.jar:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.class */
public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSupport implements OrderlyShutdownCapable {
    private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
    private final RecordMessagingMessageListenerAdapter<K, V> recordListener;
    private final BatchMessagingMessageListenerAdapter<K, V> batchListener;
    private final ListenerMode mode;
    private RecordFilterStrategy<K, V> recordFilterStrategy;
    private boolean ackDiscarded;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<Void> recoveryCallback;
    private boolean filterInRetry;

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-2.1.0.RELEASE.jar:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.class */
    private class IntegrationBatchMessageListener extends BatchMessagingMessageListenerAdapter<K, V> {
        IntegrationBatchMessageListener() {
            super(null, null);
        }

        @Override // org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter, org.springframework.kafka.listener.GenericAcknowledgingMessageListener
        public void onMessage(List<ConsumerRecord<K, V>> list, Acknowledgment acknowledgment) {
            KafkaMessageDrivenChannelAdapter.this.sendMessage(toMessagingMessage(list, acknowledgment));
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-2.1.0.RELEASE.jar:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.class */
    private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V> {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        @Override // org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter, org.springframework.kafka.listener.GenericAcknowledgingMessageListener
        public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment) {
            KafkaMessageDrivenChannelAdapter.this.sendMessage(toMessagingMessage(consumerRecord, acknowledgment));
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-2.1.0.RELEASE.jar:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter$ListenerMode.class */
    public enum ListenerMode {
        record,
        batch
    }

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> abstractMessageListenerContainer) {
        this(abstractMessageListenerContainer, ListenerMode.record);
    }

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> abstractMessageListenerContainer, ListenerMode listenerMode) {
        this.recordListener = new IntegrationRecordMessageListener();
        this.batchListener = new IntegrationBatchMessageListener();
        Assert.notNull(abstractMessageListenerContainer, "messageListenerContainer is required");
        Assert.isNull(abstractMessageListenerContainer.getContainerProperties().getMessageListener(), "Container must not already have a listener");
        this.messageListenerContainer = abstractMessageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.mode = listenerMode;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        if (messageConverter instanceof RecordMessageConverter) {
            this.recordListener.setMessageConverter((RecordMessageConverter) messageConverter);
        } else {
            if (!(messageConverter instanceof BatchMessageConverter)) {
                throw new IllegalArgumentException("Message converter must be a 'RecordMessageConverter' or 'BatchMessageConverter'");
            }
            this.batchListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
        }
    }

    public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.recordListener.setMessageConverter(recordMessageConverter);
    }

    public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
        this.batchListener.setBatchMessageConverter(batchMessageConverter);
    }

    public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
        this.recordFilterStrategy = recordFilterStrategy;
    }

    public void setAckDiscarded(boolean z) {
        this.ackDiscarded = z;
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        Assert.isTrue(retryTemplate == null || this.mode.equals(ListenerMode.record), "Retry is not supported with mode=batch");
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<Void> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setFilterInRetry(boolean z) {
        this.filterInRetry = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        if (!this.mode.equals(ListenerMode.record)) {
            BatchAcknowledgingMessageListener batchAcknowledgingMessageListener = this.batchListener;
            if (this.recordFilterStrategy != null) {
                batchAcknowledgingMessageListener = new FilteringBatchAcknowledgingMessageListenerAdapter(batchAcknowledgingMessageListener, this.recordFilterStrategy, this.ackDiscarded);
            }
            this.messageListenerContainer.getContainerProperties().setMessageListener(batchAcknowledgingMessageListener);
            return;
        }
        AcknowledgingMessageListener acknowledgingMessageListener = this.recordListener;
        if ((!this.filterInRetry || this.retryTemplate == null || this.recordFilterStrategy == null) ? false : true) {
            acknowledgingMessageListener = new RetryingAcknowledgingMessageListenerAdapter(new FilteringAcknowledgingMessageListenerAdapter(acknowledgingMessageListener, this.recordFilterStrategy, this.ackDiscarded), this.retryTemplate, this.recoveryCallback);
        } else {
            if (this.retryTemplate != null) {
                acknowledgingMessageListener = new RetryingAcknowledgingMessageListenerAdapter(acknowledgingMessageListener, this.retryTemplate, this.recoveryCallback);
            }
            if (this.recordFilterStrategy != null) {
                acknowledgingMessageListener = new FilteringAcknowledgingMessageListenerAdapter(acknowledgingMessageListener, this.recordFilterStrategy, this.ackDiscarded);
            }
        }
        this.messageListenerContainer.getContainerProperties().setMessageListener(acknowledgingMessageListener);
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStart() {
        this.messageListenerContainer.start();
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    @Override // org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "kafka:message-driven-channel-adapter";
    }

    @Override // org.springframework.integration.context.OrderlyShutdownCapable
    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return getPhase();
    }

    @Override // org.springframework.integration.context.OrderlyShutdownCapable
    public int afterShutdown() {
        return getPhase();
    }
}
