/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.pulsar;

import java.io.Serializable;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.spark.SparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.json.JSONOptionsInRead;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.pulsar.PerTopicOffset;
import org.apache.spark.sql.pulsar.PulsarHelper;
import org.apache.spark.sql.pulsar.PulsarOffsetRange;
import org.apache.spark.sql.pulsar.PulsarOffsetRange$;
import org.apache.spark.sql.pulsar.PulsarSourceInitialOffsetWriter;
import org.apache.spark.sql.pulsar.PulsarSourceRDD;
import org.apache.spark.sql.pulsar.PulsarSourceUtils$;
import org.apache.spark.sql.pulsar.SchemaInfoSerializable;
import org.apache.spark.sql.pulsar.SchemaUtils$;
import org.apache.spark.sql.pulsar.SpecificPulsarOffset;
import org.apache.spark.sql.pulsar.SpecificPulsarOffset$;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.GenSet;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.SetLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005-g!B\u000f\u001f\u0001yA\u0003\u0002C \u0001\u0005\u0003\u0005\u000b\u0011B!\t\u0011\u0015\u0003!\u0011!Q\u0001\n\u0019C\u0001B\u0013\u0001\u0003\u0002\u0003\u0006Ia\u0013\u0005\t=\u0002\u0011\t\u0011)A\u0005\u0017\"Aq\f\u0001B\u0001B\u0003%\u0011\u000b\u0003\u0005a\u0001\t\u0005\t\u0015!\u0003b\u0011!!\u0007A!A!\u0002\u0013)\u0007\u0002C5\u0001\u0005\u0003\u0005\u000b\u0011\u00026\t\u00115\u0004!\u0011!Q\u0001\nEC\u0001B\u001c\u0001\u0003\u0002\u0003\u0006Ia\u001c\u0005\u0006o\u0002!\t\u0001\u001f\u0005\n\u0003\u0013\u0001!\u0019!C\u0005\u0003\u0017A\u0001\"!\u0006\u0001A\u0003%\u0011Q\u0002\u0005\n\u0003/\u0001!\u0019!C\u0001\u00033A\u0001\"a\n\u0001A\u0003%\u00111\u0004\u0005\n\u0003S\u0001\u0001\u0019!C\u0005\u0003WA\u0011\"!\f\u0001\u0001\u0004%I!a\f\t\u000f\u0005U\u0002\u0001)Q\u0005U\"Q\u0011q\u0007\u0001\t\u0006\u0004%I!!\u000f\t\u0013\u0005\u0005\u0003\u00011A\u0005\n\u0005\r\u0003\"CA1\u0001\u0001\u0007I\u0011BA2\u0011!\t9\u0007\u0001Q!\n\u0005\u0015\u0003BCA5\u0001!\u0015\r\u0011\"\u0003\u0002l!9\u00111\u000f\u0001\u0005B\u0005u\u0004bBAF\u0001\u0011\u0005\u0013Q\u0012\u0005\b\u0003/\u0003A\u0011IAM\u0011\u001d\t\t\r\u0001C!\u0003\u0007Dq!a2\u0001\t\u0003\nIM\u0001\u0007Qk2\u001c\u0018M]*pkJ\u001cWM\u0003\u0002 A\u00051\u0001/\u001e7tCJT!!\t\u0012\u0002\u0007M\fHN\u0003\u0002$I\u0005)1\u000f]1sW*\u0011QEJ\u0001\u0007CB\f7\r[3\u000b\u0003\u001d\n1a\u001c:h'\u0011\u0001\u0011&M\u001d\u0011\u0005)zS\"A\u0016\u000b\u00051j\u0013\u0001\u00027b]\u001eT\u0011AL\u0001\u0005U\u00064\u0018-\u0003\u00021W\t1qJ\u00196fGR\u0004\"AM\u001c\u000e\u0003MR!\u0001N\u001b\u0002\u0013M$(/Z1nS:<'B\u0001\u001c!\u0003%)\u00070Z2vi&|g.\u0003\u00029g\t11k\\;sG\u0016\u0004\"AO\u001f\u000e\u0003mR!\u0001\u0010\u0012\u0002\u0011%tG/\u001a:oC2L!AP\u001e\u0003\u000f1{wmZ5oO\u0006Q1/\u001d7D_:$X\r\u001f;\u0004\u0001A\u0011!iQ\u0007\u0002A%\u0011A\t\t\u0002\u000b'Fc5i\u001c8uKb$\u0018\u0001\u00049vYN\f'\u000fS3ma\u0016\u0014\bCA$I\u001b\u0005q\u0012BA%\u001f\u00051\u0001V\u000f\\:be\"+G\u000e]3s\u0003)\u0019G.[3oi\u000e{gN\u001a\t\u0005\u0019>\u000b\u0016&D\u0001N\u0015\tqU&\u0001\u0003vi&d\u0017B\u0001)N\u0005\ri\u0015\r\u001d\t\u0003%ns!aU-\u0011\u0005Q;V\"A+\u000b\u0005Y\u0003\u0015A\u0002\u001fs_>$hHC\u0001Y\u0003\u0015\u00198-\u00197b\u0013\tQv+\u0001\u0004Qe\u0016$WMZ\u0005\u00039v\u0013aa\u0015;sS:<'B\u0001.X\u0003)\u0011X-\u00193fe\u000e{gNZ\u0001\r[\u0016$\u0018\rZ1uCB\u000bG\u000f[\u0001\u0010gR\f'\u000f^5oO>3gm]3ugB\u0011qIY\u0005\u0003Gz\u0011a\u0002U3s)>\u0004\u0018nY(gMN,G/A\u0007q_2dG+[7f_V$Xj\u001d\t\u0003M\u001el\u0011aV\u0005\u0003Q^\u00131!\u00138u\u000391\u0017-\u001b7P]\u0012\u000bG/\u0019'pgN\u0004\"AZ6\n\u00051<&a\u0002\"p_2,\u0017M\\\u0001\u0017gV\u00147o\u0019:jaRLwN\u001c(b[\u0016\u0004&/\u001a4jq\u0006Y!n]8o\u001fB$\u0018n\u001c8t!\t\u0001X/D\u0001r\u0015\t\u00118/\u0001\u0003kg>t'B\u0001;!\u0003!\u0019\u0017\r^1msN$\u0018B\u0001<r\u0005EQ5k\u0014(PaRLwN\\:J]J+\u0017\rZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u001feT8\u0010`?\u007f\u007f\u0006\u0005\u00111AA\u0003\u0003\u000f\u0001\"a\u0012\u0001\t\u000b}Z\u0001\u0019A!\t\u000b\u0015[\u0001\u0019\u0001$\t\u000b)[\u0001\u0019A&\t\u000by[\u0001\u0019A&\t\u000b}[\u0001\u0019A)\t\u000b\u0001\\\u0001\u0019A1\t\u000b\u0011\\\u0001\u0019A3\t\u000b%\\\u0001\u0019\u00016\t\u000b5\\\u0001\u0019A)\t\u000b9\\\u0001\u0019A8\u0002\u0005M\u001cWCAA\u0007!\u0011\ty!!\u0005\u000e\u0003\tJ1!a\u0005#\u00051\u0019\u0006/\u0019:l\u0007>tG/\u001a=u\u0003\r\u00198\rI\u0001\u000fe\u0016\u0004xN\u001d;ECR\fGj\\:t+\t\tY\u0002\u0005\u0004g\u0003;\t\u0016\u0011E\u0005\u0004\u0003?9&!\u0003$v]\u000e$\u0018n\u001c82!\r1\u00171E\u0005\u0004\u0003K9&\u0001B+oSR\fqB]3q_J$H)\u0019;b\u0019>\u001c8\u000fI\u0001\bgR|\u0007\u000f]3e+\u0005Q\u0017aC:u_B\u0004X\rZ0%KF$B!!\t\u00022!A\u00111G\t\u0002\u0002\u0003\u0007!.A\u0002yIE\n\u0001b\u001d;paB,G\rI\u0001\u0014S:LG/[1m)>\u0004\u0018nY(gMN,Go]\u000b\u0003\u0003w\u00012aRA\u001f\u0013\r\tyD\b\u0002\u0015'B,7-\u001b4jGB+Hn]1s\u001f\u001a47/\u001a;\u0002'\r,(O]3oiR{\u0007/[2PM\u001a\u001cX\r^:\u0016\u0005\u0005\u0015\u0003#\u00024\u0002H\u0005-\u0013bAA%/\n1q\n\u001d;j_:\u0004bAUA'#\u0006=\u0013B\u0001)^!\u0011\t\t&!\u0018\u000e\u0005\u0005M#\u0002BA+\u0003/\n1!\u00199j\u0015\u0011\tI&a\u0017\u0002\r\rd\u0017.\u001a8u\u0015\tyB%\u0003\u0003\u0002`\u0005M#!C'fgN\fw-Z%e\u0003]\u0019WO\u001d:f]R$v\u000e]5d\u001f\u001a47/\u001a;t?\u0012*\u0017\u000f\u0006\u0003\u0002\"\u0005\u0015\u0004\"CA\u001a+\u0005\u0005\t\u0019AA#\u0003Q\u0019WO\u001d:f]R$v\u000e]5d\u001f\u001a47/\u001a;tA\u0005a\u0001/\u001e7tCJ\u001c6\r[3nCV\u0011\u0011Q\u000e\t\u0005\u0003_\nI(\u0004\u0002\u0002r)!\u00111OA;\u0003\u0019\u00198\r[3nC*!\u0011qOA.\u0003\u0019\u0019w.\\7p]&!\u00111PA9\u0005)\u00196\r[3nC&sgm\u001c\u000b\u0003\u0003\u007f\u0002B!!!\u0002\b6\u0011\u00111\u0011\u0006\u0004\u0003\u000b\u0003\u0013!\u0002;za\u0016\u001c\u0018\u0002BAE\u0003\u0007\u0013!b\u0015;sk\u000e$H+\u001f9f\u0003%9W\r^(gMN,G/\u0006\u0002\u0002\u0010B)a-a\u0012\u0002\u0012B\u0019!'a%\n\u0007\u0005U5G\u0001\u0004PM\u001a\u001cX\r^\u0001\tO\u0016$()\u0019;dQR1\u00111TA]\u0003{\u0003B!!(\u00024:!\u0011qTAX\u001d\u0011\t\t+!,\u000f\t\u0005\r\u00161\u0016\b\u0005\u0003K\u000bIKD\u0002U\u0003OK\u0011aJ\u0005\u0003K\u0019J!a\t\u0013\n\u0005\u0005\u0012\u0013bAAYA\u00059\u0001/Y2lC\u001e,\u0017\u0002BA[\u0003o\u0013\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\u0007\u0005E\u0006\u0005C\u0004\u0002<j\u0001\r!a$\u0002\u000bM$\u0018M\u001d;\t\u000f\u0005}&\u00041\u0001\u0002\u0012\u0006\u0019QM\u001c3\u0002\r\r|W.\\5u)\u0011\t\t#!2\t\u000f\u0005}6\u00041\u0001\u0002\u0012\u0006!1\u000f^8q)\t\t\t\u0003")
public class PulsarSource
implements Source,
Logging {
    private SpecificPulsarOffset initialTopicOffsets;
    private SchemaInfo pulsarSchema;
    private final SQLContext sqlContext;
    private final PulsarHelper pulsarHelper;
    private final Map<String, Object> clientConf;
    private final Map<String, Object> readerConf;
    private final String metadataPath;
    private final PerTopicOffset startingOffsets;
    private final int pollTimeoutMs;
    private final boolean failOnDataLoss;
    private final String subscriptionNamePrefix;
    private final JSONOptionsInRead jsonOptions;
    private final SparkContext sc;
    private final Function1<String, BoxedUnit> reportDataLoss;
    private boolean stopped;
    private Option<scala.collection.immutable.Map<String, MessageId>> currentTopicOffsets;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Offset initialOffset() {
        return Source.initialOffset$((Source)this);
    }

    public Offset deserializeOffset(String json) {
        return Source.deserializeOffset$((Source)this, (String)json);
    }

    public void commit(Offset end) {
        Source.commit$((Source)this, (Offset)end);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private SparkContext sc() {
        return this.sc;
    }

    public Function1<String, BoxedUnit> reportDataLoss() {
        return this.reportDataLoss;
    }

    private boolean stopped() {
        return this.stopped;
    }

    private void stopped_$eq(boolean x$1) {
        this.stopped = x$1;
    }

    private SpecificPulsarOffset initialTopicOffsets$lzycompute() {
        PulsarSource pulsarSource = this;
        synchronized (pulsarSource) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                PulsarSourceInitialOffsetWriter metadataLog = new PulsarSourceInitialOffsetWriter(this.sqlContext.sparkSession(), this.metadataPath);
                this.initialTopicOffsets = metadataLog.getInitialOffset(this.pulsarHelper, this.startingOffsets, this.pollTimeoutMs, this.reportDataLoss());
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        this.startingOffsets = null;
        return this.initialTopicOffsets;
    }

    private SpecificPulsarOffset initialTopicOffsets() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.initialTopicOffsets$lzycompute() : this.initialTopicOffsets;
    }

    private Option<scala.collection.immutable.Map<String, MessageId>> currentTopicOffsets() {
        return this.currentTopicOffsets;
    }

    private void currentTopicOffsets_$eq(Option<scala.collection.immutable.Map<String, MessageId>> x$1) {
        this.currentTopicOffsets = x$1;
    }

    private SchemaInfo pulsarSchema$lzycompute() {
        PulsarSource pulsarSource = this;
        synchronized (pulsarSource) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.pulsarSchema = this.pulsarHelper.getPulsarSchema();
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.pulsarSchema;
    }

    private SchemaInfo pulsarSchema() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.pulsarSchema$lzycompute() : this.pulsarSchema;
    }

    public StructType schema() {
        return SchemaUtils$.MODULE$.pulsarSourceSchema(this.pulsarSchema());
    }

    public Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        this.initialTopicOffsets();
        SpecificPulsarOffset latest = this.pulsarHelper.fetchLatestOffsets();
        this.currentTopicOffsets_$eq((Option<scala.collection.immutable.Map<String, MessageId>>)new Some(latest.topicOffsets()));
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(11).append("GetOffset: ").append(((SeqLike)latest.topicOffsets().toSeq().map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.toString(), Seq$.MODULE$.canBuildFrom())).sorted((Ordering)Ordering.String$.MODULE$)).toString());
        return new Some((Object)latest);
    }

    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> start, org.apache.spark.sql.execution.streaming.Offset end) {
        scala.collection.immutable.Map<String, MessageId> map;
        Option<org.apache.spark.sql.execution.streaming.Offset> option;
        this.initialTopicOffsets();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(37).append("getBatch called with start = ").append(start).append(", end = ").append(end).toString());
        scala.collection.immutable.Map<String, MessageId> endTopicOffsets = SpecificPulsarOffset$.MODULE$.getTopicOffsets(end);
        if (this.currentTopicOffsets().isEmpty()) {
            this.currentTopicOffsets_$eq((Option<scala.collection.immutable.Map<String, MessageId>>)new Some(endTopicOffsets));
        }
        if (start.isDefined()) {
            Object object = start.get();
            org.apache.spark.sql.execution.streaming.Offset offset = end;
            if (!(object != null ? !object.equals(offset) : offset != null)) {
                return this.sqlContext.internalCreateDataFrame(this.sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), this.schema(), true);
            }
        }
        if ((option = start) instanceof Some) {
            Some some = (Some)option;
            org.apache.spark.sql.execution.streaming.Offset prevBatchEndOffset = (org.apache.spark.sql.execution.streaming.Offset)some.value();
            map = SpecificPulsarOffset$.MODULE$.getTopicOffsets(prevBatchEndOffset);
        } else if (None$.MODULE$.equals(option)) {
            map = this.initialTopicOffsets().topicOffsets();
        } else {
            throw new MatchError(option);
        }
        scala.collection.immutable.Map<String, MessageId> fromTopicOffsets = map;
        String[] sortedExecutors = PulsarSourceUtils$.MODULE$.getSortedExecutorList(this.sc());
        int numExecutors = sortedExecutors.length;
        Set newTopics = (Set)endTopicOffsets.keySet().diff((GenSet)fromTopicOffsets.keySet());
        scala.collection.immutable.Map<String, MessageId> newTopicOffsets = this.pulsarHelper.fetchEarliestOffsets((Seq<String>)newTopics.toSeq());
        Set deletedPartitions = (Set)fromTopicOffsets.keySet().diff((GenSet)endTopicOffsets.keySet());
        Object object = deletedPartitions.nonEmpty() ? this.reportDataLoss().apply((Object)new StringBuilder(41).append(deletedPartitions).append(" are gone. Some data may have been missed").toString()) : BoxedUnit.UNIT;
        scala.collection.immutable.Map newFromTopicOffsets = fromTopicOffsets.$plus$plus(newTopicOffsets);
        Seq offsetRanges = ((SetLike)((TraversableLike)endTopicOffsets.keySet().map((Function1 & Serializable & scala.Serializable)tp -> {
            MessageId fromOffset = (MessageId)newFromTopicOffsets.getOrElse(tp, (Function0 & Serializable & scala.Serializable)() -> {
                throw new IllegalStateException(new StringBuilder(27).append((String)tp).append(" doesn't have a from offset").toString());
            });
            MessageId untilOffset = (MessageId)endTopicOffsets.apply(tp);
            None$ preferredLoc = numExecutors > 0 ? new Some((Object)sortedExecutors[Math.floorMod(tp.hashCode(), numExecutors)]) : None$.MODULE$;
            return PulsarOffsetRange$.MODULE$.apply((String)tp, fromOffset, untilOffset, (Option<String>)preferredLoc);
        }, Set$.MODULE$.canBuildFrom())).filter((Function1 & Serializable & scala.Serializable)range -> BoxesRunTime.boxToBoolean((boolean)PulsarSource.$anonfun$getBatch$4(this, range)))).toSeq();
        PulsarSourceRDD rdd = new PulsarSourceRDD(this.sc(), new SchemaInfoSerializable(this.pulsarSchema()), this.clientConf, this.readerConf, (Seq<PulsarOffsetRange>)offsetRanges, this.pollTimeoutMs, this.failOnDataLoss, this.subscriptionNamePrefix, this.jsonOptions);
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(41).append("GetBatch generating RDD of offset range: ").append(((TraversableOnce)offsetRanges.sortBy((Function1 & Serializable & scala.Serializable)x$2 -> x$2.topic(), (Ordering)Ordering.String$.MODULE$)).mkString(", ")).toString());
        return this.sqlContext.internalCreateDataFrame(rdd.setName("pulsar"), this.schema(), true);
    }

    public void commit(org.apache.spark.sql.execution.streaming.Offset end) {
        scala.collection.immutable.Map<String, MessageId> off = SpecificPulsarOffset$.MODULE$.getTopicOffsets(end);
        this.pulsarHelper.commitCursorToOffset(off);
    }

    public synchronized void stop() {
        block0: {
            if (this.stopped()) break block0;
            this.pulsarHelper.removeCursor();
            this.pulsarHelper.close();
            this.stopped_$eq(true);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static final /* synthetic */ boolean $anonfun$getBatch$4(PulsarSource $this, PulsarOffsetRange range) {
        if (range.untilOffset().compareTo(range.fromOffset()) < 0) {
            MessageIdImpl messageIdImpl = (MessageIdImpl)range.fromOffset();
            MessageId messageId = MessageId.latest;
            if (messageIdImpl == null ? messageId != null : !((Object)messageIdImpl).equals(messageId)) {
                $this.reportDataLoss().apply((Object)new StringBuilder(64).append(range.topic()).append("'s offset was changed ").append("from ").append(range.fromOffset()).append(" to ").append(range.untilOffset()).append(", ").append("some data might has been missed").toString());
                return false;
            }
        }
        if (range.untilOffset().compareTo(range.fromOffset()) >= 0) return true;
        MessageIdImpl messageIdImpl = (MessageIdImpl)range.fromOffset();
        MessageId messageId = MessageId.latest;
        if (messageIdImpl != null) {
            if (!((Object)messageIdImpl).equals(messageId)) return true;
            return false;
        }
        if (messageId == null) return false;
        return true;
    }

    public PulsarSource(SQLContext sqlContext, PulsarHelper pulsarHelper, Map<String, Object> clientConf, Map<String, Object> readerConf, String metadataPath, PerTopicOffset startingOffsets, int pollTimeoutMs, boolean failOnDataLoss, String subscriptionNamePrefix, JSONOptionsInRead jsonOptions) {
        this.sqlContext = sqlContext;
        this.pulsarHelper = pulsarHelper;
        this.clientConf = clientConf;
        this.readerConf = readerConf;
        this.metadataPath = metadataPath;
        this.startingOffsets = startingOffsets;
        this.pollTimeoutMs = pollTimeoutMs;
        this.failOnDataLoss = failOnDataLoss;
        this.subscriptionNamePrefix = subscriptionNamePrefix;
        this.jsonOptions = jsonOptions;
        Source.$init$((Source)this);
        Logging.$init$((Logging)this);
        this.sc = sqlContext.sparkContext();
        this.reportDataLoss = PulsarSourceUtils$.MODULE$.reportDataLossFunc(failOnDataLoss);
        this.stopped = false;
        this.currentTopicOffsets = None$.MODULE$;
    }
}

