/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.source.batch;

import com.google.gson.Gson;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.BatchSourceTriggerer;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchSourceExecutor<T>
implements Source<T> {
    private static final Logger log = LoggerFactory.getLogger(BatchSourceExecutor.class);
    private Map<String, Object> config;
    private SourceContext sourceContext;
    private BatchSourceTriggerer discoveryTriggerer;
    private Consumer<byte[]> intermediateTopicConsumer;
    private Message<byte[]> currentTask;
    private BatchSourceConfig batchSourceConfig;
    private String batchSourceClassName;
    private BatchSource<T> batchSource;
    private String intermediateTopicName;
    private volatile Exception currentError = null;
    private volatile boolean isRunning = false;
    private ExecutorService discoveryThread;
    volatile boolean discoverInProgress = false;

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.config = config;
        this.sourceContext = sourceContext;
        this.intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName((String)sourceContext.getTenant(), (String)sourceContext.getNamespace(), (String)sourceContext.getSourceName()).toString();
        this.discoveryThread = Executors.newSingleThreadExecutor((ThreadFactory)new DefaultThreadFactory(String.format("%s-batch-source-discovery", FunctionCommon.getFullyQualifiedName((String)sourceContext.getTenant(), (String)sourceContext.getNamespace(), (String)sourceContext.getSourceName()))));
        this.getBatchSourceConfigs(config);
        this.initializeBatchSource();
        this.start();
    }

    public Record<T> read() throws Exception {
        Record retval;
        while (true) {
            if (this.currentError != null) {
                throw this.currentError;
            }
            if (this.currentTask == null) {
                this.currentTask = this.retrieveNextTask();
                this.prepareInternal(this.currentTask);
            }
            if ((retval = this.batchSource.readNext()) != null) break;
            this.intermediateTopicConsumer.acknowledgeAsync(this.currentTask.getMessageId()).exceptionally(throwable -> {
                log.error("Encountered error when acknowledging completed task with id {}", (Object)this.currentTask.getMessageId(), throwable);
                this.setCurrentError((Throwable)throwable);
                return null;
            });
            this.currentTask = null;
        }
        return retval;
    }

    private void getBatchSourceConfigs(Map<String, Object> config) {
        if (!config.containsKey("__BATCHSOURCECONFIGS__") || !config.containsKey("__BATCHSOURCECLASSNAME__")) {
            throw new IllegalArgumentException("Batch Configs cannot be found");
        }
        String batchSourceConfigJson = (String)config.get("__BATCHSOURCECONFIGS__");
        this.batchSourceConfig = (BatchSourceConfig)new Gson().fromJson(batchSourceConfigJson, BatchSourceConfig.class);
        this.batchSourceClassName = (String)config.get("__BATCHSOURCECLASSNAME__");
    }

    private void initializeBatchSource() {
        ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
        Object userClassObject = Reflections.createInstance((String)this.batchSourceClassName, (ClassLoader)clsLoader);
        if (!(userClassObject instanceof BatchSource)) {
            throw new IllegalArgumentException("BatchSource does not implement the correct interface");
        }
        this.batchSource = (BatchSource)userClassObject;
        Object discoveryClassObject = Reflections.createInstance((String)this.batchSourceConfig.getDiscoveryTriggererClassName(), (ClassLoader)clsLoader);
        if (!(discoveryClassObject instanceof BatchSourceTriggerer)) {
            throw new IllegalArgumentException("BatchSourceTriggerer does not implement the correct interface");
        }
        this.discoveryTriggerer = (BatchSourceTriggerer)discoveryClassObject;
    }

    private void start() throws Exception {
        this.isRunning = true;
        this.createIntermediateTopicConsumer();
        this.batchSource.open(this.config, this.sourceContext);
        if (this.sourceContext.getInstanceId() == 0) {
            this.discoveryTriggerer.init(this.batchSourceConfig.getDiscoveryTriggererConfig(), this.sourceContext);
            this.discoveryTriggerer.start(this::triggerDiscover);
        }
    }

    private synchronized void triggerDiscover(String discoveredEvent) {
        if (this.discoverInProgress) {
            log.info("Discovery is already in progress");
            return;
        }
        this.discoverInProgress = true;
        this.discoveryThread.execute(() -> {
            try {
                this.batchSource.discover(task -> this.taskEater(discoveredEvent, (byte[])task));
            }
            catch (Exception e) {
                if (this.isRunning || !(e instanceof InterruptedException)) {
                    log.error("Encountered error during task discovery", (Throwable)e);
                    this.setCurrentError(e);
                }
            }
            finally {
                this.discoverInProgress = false;
            }
        });
    }

    private void taskEater(String discoveredEvent, byte[] task) {
        try {
            HashMap<String, String> properties = new HashMap<String, String>();
            properties.put("discoveredEvent", discoveredEvent);
            properties.put("produceTime", String.valueOf(System.currentTimeMillis()));
            TypedMessageBuilder message = this.sourceContext.newOutputMessage(this.intermediateTopicName, Schema.BYTES);
            message.value((Object)task).properties(properties);
            message.send();
        }
        catch (Exception e) {
            log.error("error writing discovered task to intermediate topic", (Throwable)e);
            throw new RuntimeException("error writing discovered task to intermediate topic");
        }
    }

    private void prepareInternal(Message<byte[]> task) {
        try {
            this.batchSource.prepare((byte[])task.getValue());
        }
        catch (Exception e) {
            log.error("Error on prepare", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public void close() throws Exception {
        this.stop();
    }

    private void stop() throws Exception {
        Exception ex;
        block13: {
            this.isRunning = false;
            ex = null;
            if (this.discoveryTriggerer != null) {
                try {
                    this.discoveryTriggerer.stop();
                }
                catch (Exception e) {
                    log.error("Encountered exception when closing Batch Source Triggerer", (Throwable)e);
                    ex = e;
                }
                this.discoveryTriggerer = null;
            }
            this.discoveryThread.shutdownNow();
            try {
                this.discoveryThread.awaitTermination(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                log.warn("Shutdown of discovery thread was interrupted");
                Thread.currentThread().interrupt();
            }
            if (this.intermediateTopicConsumer != null) {
                block12: {
                    try {
                        this.intermediateTopicConsumer.close();
                    }
                    catch (Exception e) {
                        log.error("Encountered exception when closing intermediate topic of Batch Source", (Throwable)e);
                        if (ex == null) break block12;
                        ex = e;
                    }
                }
                this.intermediateTopicConsumer = null;
            }
            if (this.batchSource != null) {
                try {
                    this.batchSource.close();
                }
                catch (Exception e) {
                    log.error("Encountered exception when closing Batch Source", (Throwable)e);
                    if (ex == null) break block13;
                    ex = e;
                }
            }
        }
        if (ex != null) {
            throw ex;
        }
    }

    private void createIntermediateTopicConsumer() {
        String subName = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName((String)this.sourceContext.getTenant(), (String)this.sourceContext.getNamespace(), (String)this.sourceContext.getSourceName());
        String fqfn = FunctionCommon.getFullyQualifiedName((String)this.sourceContext.getTenant(), (String)this.sourceContext.getNamespace(), (String)this.sourceContext.getSourceName());
        try {
            Actions.newBuilder().addAction(Actions.Action.builder().actionName(String.format("Setting up instance consumer for BatchSource intermediate topic for function %s", fqfn)).numRetries(10).sleepBetweenInvocationsMs(1000L).supplier(() -> {
                try {
                    CompletableFuture cf = this.sourceContext.newConsumerBuilder(Schema.BYTES).subscriptionName(subName).subscriptionType(SubscriptionType.Shared).topic(new String[]{this.intermediateTopicName}).properties(InstanceUtils.getProperties(Function.FunctionDetails.ComponentType.SOURCE, fqfn, this.sourceContext.getInstanceId())).subscribeAsync();
                    this.intermediateTopicConsumer = (Consumer)cf.join();
                    return Actions.ActionResult.builder().success(true).build();
                }
                catch (Exception e) {
                    return Actions.ActionResult.builder().success(false).errorMsg(e.getMessage()).build();
                }
            }).build()).run();
        }
        catch (InterruptedException e) {
            log.error("Error setting up instance subscription for intermediate topic", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private Message<byte[]> retrieveNextTask() throws Exception {
        Message taskMessage;
        do {
            if (this.currentError == null) continue;
            throw this.currentError;
        } while ((taskMessage = this.intermediateTopicConsumer.receive(5, TimeUnit.SECONDS)) == null);
        return taskMessage;
    }

    private void setCurrentError(Throwable error) {
        this.currentError = error instanceof Exception ? (Exception)error : new RuntimeException(error.getCause());
    }
}

