/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConsumerConfig;
import org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleConsumerPulsarSource<T>
extends PulsarSource<T> {
    private static final Logger log = LoggerFactory.getLogger(SingleConsumerPulsarSource.class);
    private final PulsarClient pulsarClient;
    private final SingleConsumerPulsarSourceConfig pulsarSourceConfig;
    private final Map<String, String> properties;
    private final ClassLoader functionClassLoader;
    private final TopicSchema topicSchema;
    private Consumer<T> consumer;
    private final List<Consumer<T>> inputConsumers = new LinkedList<Consumer<T>>();

    public SingleConsumerPulsarSource(PulsarClient pulsarClient, SingleConsumerPulsarSourceConfig pulsarSourceConfig, Map<String, String> properties, ClassLoader functionClassLoader) {
        super(pulsarClient, pulsarSourceConfig, properties, functionClassLoader);
        this.pulsarClient = pulsarClient;
        this.pulsarSourceConfig = pulsarSourceConfig;
        this.topicSchema = new TopicSchema(pulsarClient, functionClassLoader);
        this.properties = properties;
        this.functionClassLoader = functionClassLoader;
    }

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        log.info("Opening pulsar source with config: {}", (Object)this.pulsarSourceConfig);
        Class typeArg = Reflections.loadClass((String)this.pulsarSourceConfig.getTypeClassName(), (ClassLoader)this.functionClassLoader);
        Preconditions.checkArgument((!Void.class.equals((Object)typeArg) ? 1 : 0) != 0, (Object)"Input type of Pulsar Function cannot be Void");
        String topic = this.pulsarSourceConfig.getTopic();
        PulsarSourceConsumerConfig pulsarSourceConsumerConfig = this.buildPulsarSourceConsumerConfig(topic, this.pulsarSourceConfig.getConsumerConfig(), typeArg);
        log.info("Creating consumer for topic : {}, schema : {}, schemaInfo: {}", new Object[]{topic, pulsarSourceConsumerConfig.getSchema(), pulsarSourceConsumerConfig.getSchema().getSchemaInfo()});
        ConsumerBuilder cb = this.createConsumeBuilder(topic, pulsarSourceConsumerConfig);
        this.consumer = (Consumer)cb.subscribeAsync().join();
        if (this.pulsarSourceConfig.getSkipToLatest() != null && this.pulsarSourceConfig.getSkipToLatest().booleanValue()) {
            this.consumer.seek(MessageId.latest);
        }
        this.inputConsumers.add(this.consumer);
    }

    public Record<T> read() throws Exception {
        Message message = this.consumer.receive();
        return this.buildRecord(this.consumer, message);
    }

    @VisibleForTesting
    Consumer<T> getInputConsumer() {
        return this.consumer;
    }

    @Override
    public List<Consumer<T>> getInputConsumers() {
        return this.inputConsumers;
    }

    public void close() throws Exception {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }
}

