/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.pulsar;

import java.io.Serializable;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.spark.SparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.json.JSONOptionsInRead;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.ReadAllAvailable;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.pulsar.PerTopicOffset;
import org.apache.spark.sql.pulsar.PulsarHelper;
import org.apache.spark.sql.pulsar.PulsarOffsetRange;
import org.apache.spark.sql.pulsar.PulsarOffsetRange$;
import org.apache.spark.sql.pulsar.PulsarSourceInitialOffsetWriter;
import org.apache.spark.sql.pulsar.PulsarSourceRDD;
import org.apache.spark.sql.pulsar.PulsarSourceUtils$;
import org.apache.spark.sql.pulsar.ReadMaxBytes;
import org.apache.spark.sql.pulsar.SchemaInfoSerializable;
import org.apache.spark.sql.pulsar.SchemaUtils$;
import org.apache.spark.sql.pulsar.SpecificPulsarOffset;
import org.apache.spark.sql.pulsar.SpecificPulsarOffset$;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.GenSet;
import scala.collection.Seq;
import scala.collection.SetLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\t\u001da!\u0002\u0011\"\u0001\u0005Z\u0003\u0002C&\u0001\u0005\u0003\u0005\u000b\u0011B'\t\u0011E\u0003!\u0011!Q\u0001\nIC\u0001B\u0016\u0001\u0003\u0002\u0003\u0006Ia\u0016\u0005\tU\u0002\u0011\t\u0011)A\u0005/\"A1\u000e\u0001B\u0001B\u0003%Q\f\u0003\u0005m\u0001\t\u0005\t\u0015!\u0003n\u0011!\u0001\bA!A!\u0002\u0013\t\b\u0002C;\u0001\u0005\u0003\u0005\u000b\u0011\u0002<\t\u0011e\u0004!\u0011!Q\u0001\niD\u0001\" \u0001\u0003\u0002\u0003\u0006I!\u0018\u0005\t}\u0002\u0011\t\u0011)A\u0005\u007f\"9\u0011q\u0002\u0001\u0005\u0002\u0005E\u0001\"CA\u0016\u0001\t\u0007I\u0011BA\u0017\u0011!\t9\u0004\u0001Q\u0001\n\u0005=\u0002\"CA\u001d\u0001\t\u0007I\u0011AA\u001e\u0011!\tI\u0005\u0001Q\u0001\n\u0005u\u0002\"CA&\u0001\u0001\u0007I\u0011BA'\u0011%\ty\u0005\u0001a\u0001\n\u0013\t\t\u0006C\u0004\u0002X\u0001\u0001\u000b\u0015\u0002>\t\u0015\u0005e\u0003\u0001#b\u0001\n\u0013\tY\u0006C\u0005\u0002d\u0001\u0001\r\u0011\"\u0003\u0002f!I\u00111\u0011\u0001A\u0002\u0013%\u0011Q\u0011\u0005\t\u0003\u0013\u0003\u0001\u0015)\u0003\u0002h!Q\u00111\u0012\u0001\t\u0006\u0004%I!!$\t\u000f\u0005U\u0005\u0001\"\u0011\u0002 \"9\u0011Q\u0016\u0001\u0005B\u0005=\u0006bBA]\u0001\u0011\u0005\u00131\u0018\u0005\b\u0003\u001f\u0004A\u0011IAi\u0011\u001d\t\u0019\u000e\u0001C!\u0003+Dq!!@\u0001\t\u0003\ny\u0010C\u0004\u0003\u0004\u0001!\tE!\u0002\u0003\u0019A+Hn]1s'>,(oY3\u000b\u0005\t\u001a\u0013A\u00029vYN\f'O\u0003\u0002%K\u0005\u00191/\u001d7\u000b\u0005\u0019:\u0013!B:qCJ\\'B\u0001\u0015*\u0003\u0019\t\u0007/Y2iK*\t!&A\u0002pe\u001e\u001cR\u0001\u0001\u00175y\t\u0003\"!\f\u001a\u000e\u00039R!a\f\u0019\u0002\t1\fgn\u001a\u0006\u0002c\u0005!!.\u0019<b\u0013\t\u0019dF\u0001\u0004PE*,7\r\u001e\t\u0003kij\u0011A\u000e\u0006\u0003oa\n\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005e\u001a\u0013!C3yK\u000e,H/[8o\u0013\tYdG\u0001\u0004T_V\u00148-\u001a\t\u0003{\u0001k\u0011A\u0010\u0006\u0003\u007f\u0015\n\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003\u0003z\u0012q\u0001T8hO&tw\r\u0005\u0002D\u00136\tAI\u0003\u00028\u000b*\u0011aiR\u0001\u0005e\u0016\fGM\u0003\u0002IG\u0005I1m\u001c8oK\u000e$xN]\u0005\u0003\u0015\u0012\u0013\u0001dU;qa>\u0014Ho]!e[&\u001c8/[8o\u0007>tGO]8m\u0003)\u0019\u0018\u000f\\\"p]R,\u0007\u0010^\u0002\u0001!\tqu*D\u0001$\u0013\t\u00016E\u0001\u0006T#2\u001buN\u001c;fqR\fA\u0002];mg\u0006\u0014\b*\u001a7qKJ\u0004\"a\u0015+\u000e\u0003\u0005J!!V\u0011\u0003\u0019A+Hn]1s\u0011\u0016d\u0007/\u001a:\u0002\u0015\rd\u0017.\u001a8u\u0007>tg\r\u0005\u0003Y7vcS\"A-\u000b\u0005i\u0003\u0014\u0001B;uS2L!\u0001X-\u0003\u00075\u000b\u0007\u000f\u0005\u0002_O:\u0011q,\u001a\t\u0003A\u000el\u0011!\u0019\u0006\u0003E2\u000ba\u0001\u0010:p_Rt$\"\u00013\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0019\u001c\u0017A\u0002)sK\u0012,g-\u0003\u0002iS\n11\u000b\u001e:j]\u001eT!AZ2\u0002\u0015I,\u0017\rZ3s\u0007>tg-\u0001\u0007nKR\fG-\u0019;b!\u0006$\b.A\bti\u0006\u0014H/\u001b8h\u001f\u001a47/\u001a;t!\t\u0019f.\u0003\u0002pC\tq\u0001+\u001a:U_BL7m\u00144gg\u0016$\u0018!\u00049pY2$\u0016.\\3pkRl5\u000f\u0005\u0002sg6\t1-\u0003\u0002uG\n\u0019\u0011J\u001c;\u0002%5\f\u0007PQ=uKN\u0004VM\u001d+sS\u001e<WM\u001d\t\u0003e^L!\u0001_2\u0003\t1{gnZ\u0001\u000fM\u0006LGn\u00148ECR\fGj\\:t!\t\u001180\u0003\u0002}G\n9!i\\8mK\u0006t\u0017AF:vEN\u001c'/\u001b9uS>tg*Y7f!J,g-\u001b=\u0002\u0017)\u001cxN\\(qi&|gn\u001d\t\u0005\u0003\u0003\tY!\u0004\u0002\u0002\u0004)!\u0011QAA\u0004\u0003\u0011Q7o\u001c8\u000b\u0007\u0005%1%\u0001\u0005dCR\fG._:u\u0013\u0011\ti!a\u0001\u0003#)\u001bvJT(qi&|gn]%o%\u0016\fG-\u0001\u0004=S:LGO\u0010\u000b\u0019\u0003'\t)\"a\u0006\u0002\u001a\u0005m\u0011QDA\u0010\u0003C\t\u0019#!\n\u0002(\u0005%\u0002CA*\u0001\u0011\u0015YE\u00021\u0001N\u0011\u0015\tF\u00021\u0001S\u0011\u00151F\u00021\u0001X\u0011\u0015QG\u00021\u0001X\u0011\u0015YG\u00021\u0001^\u0011\u0015aG\u00021\u0001n\u0011\u0015\u0001H\u00021\u0001r\u0011\u0015)H\u00021\u0001w\u0011\u0015IH\u00021\u0001{\u0011\u0015iH\u00021\u0001^\u0011\u0015qH\u00021\u0001\u0000\u0003\t\u00198-\u0006\u0002\u00020A!\u0011\u0011GA\u001a\u001b\u0005)\u0013bAA\u001bK\ta1\u000b]1sW\u000e{g\u000e^3yi\u0006\u00191o\u0019\u0011\u0002\u001dI,\u0007o\u001c:u\t\u0006$\u0018\rT8tgV\u0011\u0011Q\b\t\u0007e\u0006}R,a\u0011\n\u0007\u0005\u00053MA\u0005Gk:\u001cG/[8ocA\u0019!/!\u0012\n\u0007\u0005\u001d3M\u0001\u0003V]&$\u0018a\u0004:fa>\u0014H\u000fR1uC2{7o\u001d\u0011\u0002\u000fM$x\u000e\u001d9fIV\t!0A\u0006ti>\u0004\b/\u001a3`I\u0015\fH\u0003BA\"\u0003'B\u0001\"!\u0016\u0013\u0003\u0003\u0005\rA_\u0001\u0004q\u0012\n\u0014\u0001C:u_B\u0004X\r\u001a\u0011\u0002'%t\u0017\u000e^5bYR{\u0007/[2PM\u001a\u001cX\r^:\u0016\u0005\u0005u\u0003cA*\u0002`%\u0019\u0011\u0011M\u0011\u0003)M\u0003XmY5gS\u000e\u0004V\u000f\\:be>3gm]3u\u0003M\u0019WO\u001d:f]R$v\u000e]5d\u001f\u001a47/\u001a;t+\t\t9\u0007E\u0003s\u0003S\ni'C\u0002\u0002l\r\u0014aa\u00149uS>t\u0007C\u00020\u0002pu\u000b\t(\u0003\u0002]SB!\u00111OA@\u001b\t\t)H\u0003\u0003\u0002x\u0005e\u0014aA1qS*!\u00111PA?\u0003\u0019\u0019G.[3oi*\u0011!eJ\u0005\u0005\u0003\u0003\u000b)HA\u0005NKN\u001c\u0018mZ3JI\u000692-\u001e:sK:$Hk\u001c9jG>3gm]3ug~#S-\u001d\u000b\u0005\u0003\u0007\n9\tC\u0005\u0002VY\t\t\u00111\u0001\u0002h\u0005!2-\u001e:sK:$Hk\u001c9jG>3gm]3ug\u0002\nA\u0002];mg\u0006\u00148k\u00195f[\u0006,\"!a$\u0011\t\u0005E\u00151T\u0007\u0003\u0003'SA!!&\u0002\u0018\u000611o\u00195f[\u0006TA!!'\u0002~\u000511m\\7n_:LA!!(\u0002\u0014\nQ1k\u00195f[\u0006LeNZ8\u0015\u0005\u0005\u0005\u0006\u0003BAR\u0003Sk!!!*\u000b\u0007\u0005\u001d6%A\u0003usB,7/\u0003\u0003\u0002,\u0006\u0015&AC*ueV\u001cG\u000fV=qK\u0006Iq-\u001a;PM\u001a\u001cX\r^\u000b\u0003\u0003c\u0003RA]A5\u0003g\u00032!NA[\u0013\r\t9L\u000e\u0002\u0007\u001f\u001a47/\u001a;\u0002\u00191\fG/Z:u\u001f\u001a47/\u001a;\u0015\r\u0005u\u0016\u0011YAc!\r\u0019\u0015qX\u0005\u0004\u0003o#\u0005bBAb7\u0001\u0007\u0011QX\u0001\u000fgR\f'\u000f^5oO>3gm]3u\u0011\u001d\t9m\u0007a\u0001\u0003\u0013\f\u0011B]3bI2KW.\u001b;\u0011\u0007\r\u000bY-C\u0002\u0002N\u0012\u0013\u0011BU3bI2KW.\u001b;\u0002'\u001d,G\u000fR3gCVdGOU3bI2KW.\u001b;\u0015\u0005\u0005%\u0017\u0001C4fi\n\u000bGo\u00195\u0015\r\u0005]\u0017Q_A}!\u0011\tI.a<\u000f\t\u0005m\u00171\u001e\b\u0005\u0003;\fIO\u0004\u0003\u0002`\u0006\u001dh\u0002BAq\u0003Kt1\u0001YAr\u0013\u0005Q\u0013B\u0001\u0015*\u0013\t1s%\u0003\u0002%K%\u0019\u0011Q^\u0012\u0002\u000fA\f7m[1hK&!\u0011\u0011_Az\u0005%!\u0015\r^1Ge\u0006lWMC\u0002\u0002n\u000eBq!a>\u001e\u0001\u0004\t\t,A\u0003ti\u0006\u0014H\u000fC\u0004\u0002|v\u0001\r!a-\u0002\u0007\u0015tG-\u0001\u0004d_6l\u0017\u000e\u001e\u000b\u0005\u0003\u0007\u0012\t\u0001C\u0004\u0002|z\u0001\r!a-\u0002\tM$x\u000e\u001d\u000b\u0003\u0003\u0007\u0002")
public class PulsarSource
implements Source,
Logging,
SupportsAdmissionControl {
    private SpecificPulsarOffset initialTopicOffsets;
    private SchemaInfo pulsarSchema;
    private final SQLContext sqlContext;
    private final PulsarHelper pulsarHelper;
    private final Map<String, Object> clientConf;
    private final Map<String, Object> readerConf;
    private final String metadataPath;
    private PerTopicOffset startingOffsets;
    private final int pollTimeoutMs;
    private final long maxBytesPerTrigger;
    private final boolean failOnDataLoss;
    private final String subscriptionNamePrefix;
    private final JSONOptionsInRead jsonOptions;
    private final SparkContext sc;
    private final Function1<String, BoxedUnit> reportDataLoss;
    private boolean stopped;
    private Option<scala.collection.immutable.Map<String, MessageId>> currentTopicOffsets;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    public Offset reportLatestOffset() {
        return super.reportLatestOffset();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Offset initialOffset() {
        return Source.initialOffset$((Source)this);
    }

    public Offset deserializeOffset(String json) {
        return Source.deserializeOffset$((Source)this, (String)json);
    }

    public void commit(Offset end) {
        Source.commit$((Source)this, (Offset)end);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private SparkContext sc() {
        return this.sc;
    }

    public Function1<String, BoxedUnit> reportDataLoss() {
        return this.reportDataLoss;
    }

    private boolean stopped() {
        return this.stopped;
    }

    private void stopped_$eq(boolean x$1) {
        this.stopped = x$1;
    }

    private SpecificPulsarOffset initialTopicOffsets$lzycompute() {
        PulsarSource pulsarSource = this;
        synchronized (pulsarSource) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                PulsarSourceInitialOffsetWriter metadataLog = new PulsarSourceInitialOffsetWriter(this.sqlContext.sparkSession(), this.metadataPath);
                this.initialTopicOffsets = metadataLog.getInitialOffset(this.pulsarHelper, this.startingOffsets, this.pollTimeoutMs, this.reportDataLoss());
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        this.startingOffsets = null;
        return this.initialTopicOffsets;
    }

    private SpecificPulsarOffset initialTopicOffsets() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.initialTopicOffsets$lzycompute();
        }
        return this.initialTopicOffsets;
    }

    private Option<scala.collection.immutable.Map<String, MessageId>> currentTopicOffsets() {
        return this.currentTopicOffsets;
    }

    private void currentTopicOffsets_$eq(Option<scala.collection.immutable.Map<String, MessageId>> x$1) {
        this.currentTopicOffsets = x$1;
    }

    private SchemaInfo pulsarSchema$lzycompute() {
        PulsarSource pulsarSource = this;
        synchronized (pulsarSource) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.pulsarSchema = this.pulsarHelper.getPulsarSchema();
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.pulsarSchema;
    }

    private SchemaInfo pulsarSchema() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.pulsarSchema$lzycompute();
        }
        return this.pulsarSchema;
    }

    public StructType schema() {
        return SchemaUtils$.MODULE$.pulsarSourceSchema(this.pulsarSchema());
    }

    public Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        throw new UnsupportedOperationException("latestOffset(Offset, ReadLimit) should be called instead of this method");
    }

    public Offset latestOffset(Offset startingOffset, ReadLimit readLimit) {
        this.initialTopicOffsets();
        ReadLimit readLimit2 = readLimit;
        if (readLimit2 instanceof ReadMaxBytes) {
            ReadMaxBytes readMaxBytes = (ReadMaxBytes)readLimit2;
            long maxBytes = readMaxBytes.maxBytes();
            Offset offset = startingOffset;
            if (offset == null) {
                return this.pulsarHelper.latestOffsets((Offset)this.initialTopicOffsets(), maxBytes);
            }
            return this.pulsarHelper.latestOffsets(offset, maxBytes);
        }
        if (readLimit2 instanceof ReadAllAvailable) {
            return this.pulsarHelper.fetchLatestOffsets();
        }
        throw new MatchError((Object)readLimit2);
    }

    public ReadLimit getDefaultReadLimit() {
        if (this.maxBytesPerTrigger == 0L) {
            return ReadLimit.allAvailable();
        }
        return new ReadMaxBytes(this.maxBytesPerTrigger);
    }

    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> start, org.apache.spark.sql.execution.streaming.Offset end) {
        scala.collection.immutable.Map<String, MessageId> map;
        Option<org.apache.spark.sql.execution.streaming.Offset> option;
        this.initialTopicOffsets();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(37).append("getBatch called with start = ").append(start).append(", end = ").append(end).toString());
        scala.collection.immutable.Map<String, MessageId> endTopicOffsets = SpecificPulsarOffset$.MODULE$.getTopicOffsets(end);
        if (this.currentTopicOffsets().isEmpty()) {
            this.currentTopicOffsets_$eq((Option<scala.collection.immutable.Map<String, MessageId>>)new Some(endTopicOffsets));
        }
        if (start.isDefined()) {
            Object object = start.get();
            org.apache.spark.sql.execution.streaming.Offset offset = end;
            if (!(object != null ? !object.equals(offset) : offset != null)) {
                return this.sqlContext.internalCreateDataFrame(this.sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), this.schema(), true);
            }
        }
        if ((option = start) instanceof Some) {
            Some some = (Some)option;
            org.apache.spark.sql.execution.streaming.Offset prevBatchEndOffset = (org.apache.spark.sql.execution.streaming.Offset)some.value();
            map = SpecificPulsarOffset$.MODULE$.getTopicOffsets(prevBatchEndOffset);
        } else if (None$.MODULE$.equals(option)) {
            map = this.initialTopicOffsets().topicOffsets();
        } else {
            throw new MatchError(option);
        }
        scala.collection.immutable.Map<String, MessageId> fromTopicOffsets = map;
        String[] sortedExecutors = PulsarSourceUtils$.MODULE$.getSortedExecutorList(this.sc());
        int numExecutors = sortedExecutors.length;
        Set newTopics = (Set)endTopicOffsets.keySet().diff((GenSet)fromTopicOffsets.keySet());
        scala.collection.immutable.Map<String, MessageId> newTopicOffsets = this.pulsarHelper.fetchEarliestOffsets((Seq<String>)newTopics.toSeq());
        Set deletedPartitions = (Set)fromTopicOffsets.keySet().diff((GenSet)endTopicOffsets.keySet());
        Object object = deletedPartitions.nonEmpty() ? this.reportDataLoss().apply((Object)new StringBuilder(41).append(deletedPartitions).append(" are gone. Some data may have been missed").toString()) : BoxedUnit.UNIT;
        scala.collection.immutable.Map newFromTopicOffsets = fromTopicOffsets.$plus$plus(newTopicOffsets);
        Seq offsetRanges = ((SetLike)((TraversableLike)endTopicOffsets.keySet().map((Function1 & Serializable & scala.Serializable)tp -> {
            MessageId fromOffset = (MessageId)newFromTopicOffsets.getOrElse(tp, (Function0 & Serializable & scala.Serializable)() -> {
                throw new IllegalStateException(new StringBuilder(27).append((String)tp).append(" doesn't have a from offset").toString());
            });
            MessageId untilOffset = (MessageId)endTopicOffsets.apply(tp);
            None$ preferredLoc = numExecutors > 0 ? new Some((Object)sortedExecutors[Math.floorMod(tp.hashCode(), numExecutors)]) : None$.MODULE$;
            return PulsarOffsetRange$.MODULE$.apply((String)tp, fromOffset, untilOffset, (Option<String>)preferredLoc);
        }, Set$.MODULE$.canBuildFrom())).filter((Function1 & Serializable & scala.Serializable)range -> BoxesRunTime.boxToBoolean((boolean)PulsarSource.$anonfun$getBatch$4(this, range)))).toSeq();
        PulsarSourceRDD rdd = new PulsarSourceRDD(this.sc(), new SchemaInfoSerializable(this.pulsarSchema()), this.clientConf, this.readerConf, (Seq<PulsarOffsetRange>)offsetRanges, this.pollTimeoutMs, this.failOnDataLoss, this.subscriptionNamePrefix, this.jsonOptions);
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(41).append("GetBatch generating RDD of offset range: ").append(((TraversableOnce)offsetRanges.sortBy((Function1 & Serializable & scala.Serializable)x$1 -> x$1.topic(), (Ordering)Ordering.String$.MODULE$)).mkString(", ")).toString());
        return this.sqlContext.internalCreateDataFrame(rdd.setName("pulsar"), this.schema(), true);
    }

    public void commit(org.apache.spark.sql.execution.streaming.Offset end) {
        scala.collection.immutable.Map<String, MessageId> off = SpecificPulsarOffset$.MODULE$.getTopicOffsets(end);
        this.pulsarHelper.commitCursorToOffset(off);
    }

    public synchronized void stop() {
        if (!this.stopped()) {
            this.pulsarHelper.removeCursor();
            this.pulsarHelper.close();
            this.stopped_$eq(true);
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$getBatch$4(PulsarSource $this, PulsarOffsetRange range) {
        if (range.untilOffset().compareTo(range.fromOffset()) < 0) {
            MessageIdImpl messageIdImpl = (MessageIdImpl)range.fromOffset();
            MessageId messageId = MessageId.latest;
            if (messageIdImpl == null ? messageId != null : !((Object)messageIdImpl).equals(messageId)) {
                $this.reportDataLoss().apply((Object)new StringBuilder(64).append(range.topic()).append("'s offset was changed ").append("from ").append(range.fromOffset()).append(" to ").append(range.untilOffset()).append(", ").append("some data might has been missed").toString());
                return false;
            }
        }
        if (range.untilOffset().compareTo(range.fromOffset()) < 0) {
            MessageIdImpl messageIdImpl = (MessageIdImpl)range.fromOffset();
            MessageId messageId = MessageId.latest;
            if (!(messageIdImpl != null ? !((Object)messageIdImpl).equals(messageId) : messageId != null)) {
                return false;
            }
        }
        return true;
    }

    public PulsarSource(SQLContext sqlContext, PulsarHelper pulsarHelper, Map<String, Object> clientConf, Map<String, Object> readerConf, String metadataPath, PerTopicOffset startingOffsets, int pollTimeoutMs, long maxBytesPerTrigger, boolean failOnDataLoss, String subscriptionNamePrefix, JSONOptionsInRead jsonOptions) {
        this.sqlContext = sqlContext;
        this.pulsarHelper = pulsarHelper;
        this.clientConf = clientConf;
        this.readerConf = readerConf;
        this.metadataPath = metadataPath;
        this.startingOffsets = startingOffsets;
        this.pollTimeoutMs = pollTimeoutMs;
        this.maxBytesPerTrigger = maxBytesPerTrigger;
        this.failOnDataLoss = failOnDataLoss;
        this.subscriptionNamePrefix = subscriptionNamePrefix;
        this.jsonOptions = jsonOptions;
        Source.$init$((Source)this);
        Logging.$init$((Logging)this);
        this.sc = sqlContext.sparkContext();
        this.reportDataLoss = PulsarSourceUtils$.MODULE$.reportDataLossFunc(failOnDataLoss);
        this.stopped = false;
        this.currentTopicOffsets = None$.MODULE$;
    }
}

