package org.apache.spark.sql.execution.streaming.sources;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.spark.internal.Logging;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog;
import org.apache.spark.sql.execution.streaming.LongOffset;
import org.apache.spark.sql.execution.streaming.LongOffset$;
import org.apache.spark.sql.execution.streaming.SerializedOffset;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.Clock;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.SystemClock;
import org.slf4j.Logger;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.StringOps$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: RateStreamMicroBatchReader.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-e\u0001\u0002\u0011\"\u0001AB\u0001\"\u0013\u0001\u0003\u0002\u0003\u0006IA\u0013\u0005\t\u001d\u0002\u0011\t\u0011)A\u0005\u001f\")A\f\u0001C\u0001;\"A!\r\u0001b\u0001\n\u0003\t3\r\u0003\u0004k\u0001\u0001\u0006I\u0001\u001a\u0005\bW\u0002\u0011\r\u0011\"\u0003m\u0011\u0019\t\b\u0001)A\u0005[\"9!\u000f\u0001b\u0001\n\u0013a\u0007BB:\u0001A\u0003%Q\u000eC\u0004u\u0001\t\u0007I\u0011\u00027\t\rU\u0004\u0001\u0015!\u0003n\u0011!1\bA1A\u0005\u0002\u0005b\u0007BB<\u0001A\u0003%Q\u000eC\u0004y\u0001\u0001\u0007I\u0011\u00027\t\u000fe\u0004\u0001\u0019!C\u0005u\"9\u0011\u0011\u0001\u0001!B\u0013i\u0007bCA\u0006\u0001\u0001\u0007\t\u0019!C\u0005\u0003\u001bA1\"a\u0006\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002\u001a!Y\u0011Q\u0004\u0001A\u0002\u0003\u0005\u000b\u0015BA\b\u0011-\ty\u0002\u0001a\u0001\u0002\u0004%I!!\u0004\t\u0017\u0005\u0005\u0002\u00011AA\u0002\u0013%\u00111\u0005\u0005\f\u0003O\u0001\u0001\u0019!A!B\u0013\ty\u0001C\u0004\u0002*\u0001!\t%a\u000b\t\u000f\u0005e\u0002\u0001\"\u0011\u0002<!9\u0011\u0011\u000b\u0001\u0005B\u0005M\u0003bBA+\u0001\u0011\u0005\u00131\u000b\u0005\b\u0003/\u0002A\u0011IA-\u0011\u001d\ty\u0006\u0001C!\u0003CBq!! \u0001\t\u0003\ny\bC\u0004\u0002\u0004\u0002!\t%!\"\t\u000f\u0005\u001d\u0005\u0001\"\u0011\u0002\n\nQ\"+\u0019;f'R\u0014X-Y7NS\u000e\u0014xNQ1uG\"\u0014V-\u00193fe*\u0011!eI\u0001\bg>,(oY3t\u0015\t!S%A\u0005tiJ,\u0017-\\5oO*\u0011aeJ\u0001\nKb,7-\u001e;j_:T!\u0001K\u0015\u0002\u0007M\fHN\u0003\u0002+W\u0005)1\u000f]1sW*\u0011A&L\u0001\u0007CB\f7\r[3\u000b\u00039\n1a\u001c:h\u0007\u0001\u0019B\u0001A\u0019:\u0007B\u0011!gN\u0007\u0002g)\u0011A'N\u0001\u0005Y\u0006twMC\u00017\u0003\u0011Q\u0017M^1\n\u0005a\u001a$AB(cU\u0016\u001cG\u000f\u0005\u0002;\u00036\t1H\u0003\u0002%y)\u0011QHP\u0001\u0007e\u0016\fG-\u001a:\u000b\u0005}\u0002\u0015A\u0001<3\u0015\t\u0011s%\u0003\u0002Cw\t\u0001R*[2s_\n\u000bGo\u00195SK\u0006$WM\u001d\t\u0003\t\u001ek\u0011!\u0012\u0006\u0003\r&\n\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003\u0011\u0016\u0013q\u0001T8hO&tw-A\u0004paRLwN\\:\u0011\u0005-cU\"\u0001 \n\u00055s$!\u0005#bi\u0006\u001cv.\u001e:dK>\u0003H/[8og\u0006\u00112\r[3dWB|\u0017N\u001c;M_\u000e\fG/[8o!\t\u0001\u0016L\u0004\u0002R/B\u0011!+V\u0007\u0002'*\u0011AkL\u0001\u0007yI|w\u000e\u001e \u000b\u0003Y\u000bQa]2bY\u0006L!\u0001W+\u0002\rA\u0013X\rZ3g\u0013\tQ6L\u0001\u0004TiJLgn\u001a\u0006\u00031V\u000ba\u0001P5oSRtDc\u00010aCB\u0011q\fA\u0007\u0002C!)\u0011j\u0001a\u0001\u0015\")aj\u0001a\u0001\u001f\u0006)1\r\\8dWV\tA\r\u0005\u0002fQ6\taM\u0003\u0002hS\u0005!Q\u000f^5m\u0013\tIgMA\u0003DY>\u001c7.\u0001\u0004dY>\u001c7\u000eI\u0001\u000ee><8\u000fU3s'\u0016\u001cwN\u001c3\u0016\u00035\u0004\"A\\8\u000e\u0003UK!\u0001]+\u0003\t1{gnZ\u0001\u000fe><8\u000fU3s'\u0016\u001cwN\u001c3!\u0003E\u0011\u0018-\u001c9VaRKW.Z*fG>tGm]\u0001\u0013e\u0006l\u0007/\u00169US6,7+Z2p]\u0012\u001c\b%\u0001\u0006nCb\u001cVmY8oIN\f1\"\\1y'\u0016\u001cwN\u001c3tA\u0005q1M]3bi&|g\u000eV5nK6\u001b\u0018aD2sK\u0006$\u0018n\u001c8US6,Wj\u001d\u0011\u0002\u00151\f7\u000f\u001e+j[\u0016l5/\u0001\bmCN$H+[7f\u001bN|F%Z9\u0015\u0005mt\bC\u00018}\u0013\tiXK\u0001\u0003V]&$\bbB@\u0010\u0003\u0003\u0005\r!\\\u0001\u0004q\u0012\n\u0014a\u00037bgR$\u0016.\\3Ng\u0002B3\u0001EA\u0003!\rq\u0017qA\u0005\u0004\u0003\u0013)&\u0001\u0003<pY\u0006$\u0018\u000e\\3\u0002\u000bM$\u0018M\u001d;\u0016\u0005\u0005=\u0001\u0003BA\t\u0003'i\u0011aI\u0005\u0004\u0003+\u0019#A\u0003'p]\u001e|eMZ:fi\u0006I1\u000f^1si~#S-\u001d\u000b\u0004w\u0006m\u0001\u0002C@\u0013\u0003\u0003\u0005\r!a\u0004\u0002\rM$\u0018M\u001d;!\u0003\r)g\u000eZ\u0001\bK:$w\fJ3r)\rY\u0018Q\u0005\u0005\t\u007fV\t\t\u00111\u0001\u0002\u0010\u0005!QM\u001c3!\u0003)\u0011X-\u00193TG\",W.\u0019\u000b\u0003\u0003[\u0001B!a\f\u000265\u0011\u0011\u0011\u0007\u0006\u0004\u0003g9\u0013!\u0002;za\u0016\u001c\u0018\u0002BA\u001c\u0003c\u0011!b\u0015;sk\u000e$H+\u001f9f\u00039\u0019X\r^(gMN,GOU1oO\u0016$Ra_A\u001f\u0003\u001fBq!a\u0003\u0019\u0001\u0004\ty\u0004\u0005\u0004\u0002B\u0005\u0015\u0013\u0011J\u0007\u0003\u0003\u0007R!aZ\u001b\n\t\u0005\u001d\u00131\t\u0002\t\u001fB$\u0018n\u001c8bYB\u0019!(a\u0013\n\u0007\u000553H\u0001\u0004PM\u001a\u001cX\r\u001e\u0005\b\u0003?A\u0002\u0019AA \u000399W\r^*uCJ$xJ\u001a4tKR$\"!!\u0013\u0002\u0019\u001d,G/\u00128e\u001f\u001a47/\u001a;\u0002#\u0011,7/\u001a:jC2L'0Z(gMN,G\u000f\u0006\u0003\u0002J\u0005m\u0003BBA/7\u0001\u0007q*\u0001\u0003kg>t\u0017a\u00059mC:Le\u000e];u!\u0006\u0014H/\u001b;j_:\u001cHCAA2!\u0019\t\t%!\u001a\u0002j%!\u0011qMA\"\u0005\u0011a\u0015n\u001d;\u0011\r\u0005-\u0014QNA9\u001b\u0005a\u0014bAA8y\tq\u0011J\u001c9viB\u000b'\u000f^5uS>t\u0007\u0003BA:\u0003sj!!!\u001e\u000b\u0007\u0005]t%\u0001\u0005dCR\fG._:u\u0013\u0011\tY(!\u001e\u0003\u0017%sG/\u001a:oC2\u0014vn^\u0001\u0007G>lW.\u001b;\u0015\u0007m\f\t\tC\u0004\u0002 u\u0001\r!!\u0013\u0002\tM$x\u000e\u001d\u000b\u0002w\u0006AAo\\*ue&tw\rF\u0001P\u0001")
/* loaded from: input_file:org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.class */
public class RateStreamMicroBatchReader implements MicroBatchReader, Logging {
    private final DataSourceOptions options;
    public final String org$apache$spark$sql$execution$streaming$sources$RateStreamMicroBatchReader$$checkpointLocation;
    private final Clock clock;
    private final long rowsPerSecond;
    private final long rampUpTimeSeconds;
    private final long maxSeconds;
    private final long creationTimeMs;
    private volatile long lastTimeMs;
    private LongOffset start;
    private LongOffset end;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public Clock clock() {
        return this.clock;
    }

    private long rowsPerSecond() {
        return this.rowsPerSecond;
    }

    private long rampUpTimeSeconds() {
        return this.rampUpTimeSeconds;
    }

    private long maxSeconds() {
        return this.maxSeconds;
    }

    public long creationTimeMs() {
        return this.creationTimeMs;
    }

    private long lastTimeMs() {
        return this.lastTimeMs;
    }

    private void lastTimeMs_$eq(long j) {
        this.lastTimeMs = j;
    }

    private LongOffset start() {
        return this.start;
    }

    private void start_$eq(LongOffset longOffset) {
        this.start = longOffset;
    }

    private LongOffset end() {
        return this.end;
    }

    private void end_$eq(LongOffset longOffset) {
        this.end = longOffset;
    }

    @Override // org.apache.spark.sql.sources.v2.reader.DataSourceReader
    public StructType readSchema() {
        return RateStreamProvider$.MODULE$.SCHEMA();
    }

    @Override // org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
    public void setOffsetRange(Optional<Offset> optional, Optional<Offset> optional2) {
        start_$eq((LongOffset) optional.orElse(new LongOffset(0L)));
        long timeMillis = clock().getTimeMillis();
        if (lastTimeMs() < timeMillis) {
            lastTimeMs_$eq(timeMillis);
        }
        end_$eq((LongOffset) optional2.orElse(new LongOffset(TimeUnit.MILLISECONDS.toSeconds(lastTimeMs() - creationTimeMs()))));
    }

    @Override // org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
    public Offset getStartOffset() {
        if (start() == null) {
            throw new IllegalStateException("start offset not set");
        }
        return start();
    }

    @Override // org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
    public Offset getEndOffset() {
        if (end() == null) {
            throw new IllegalStateException("end offset not set");
        }
        return end();
    }

    @Override // org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
    public Offset deserializeOffset(String str) {
        return new LongOffset(new StringOps(Predef$.MODULE$.augmentString(str)).toLong());
    }

    @Override // org.apache.spark.sql.sources.v2.reader.DataSourceReader
    public List<InputPartition<InternalRow>> planInputPartitions() {
        long unboxToLong = BoxesRunTime.unboxToLong(LongOffset$.MODULE$.convert(start()).map(longOffset -> {
            return BoxesRunTime.boxToLong(longOffset.offset());
        }).getOrElse(() -> {
            return 0L;
        }));
        long unboxToLong2 = BoxesRunTime.unboxToLong(LongOffset$.MODULE$.convert(end()).map(longOffset2 -> {
            return BoxesRunTime.boxToLong(longOffset2.offset());
        }).getOrElse(() -> {
            return 0L;
        }));
        Predef$.MODULE$.assert(unboxToLong <= unboxToLong2, () -> {
            return new StringBuilder(29).append("startSeconds(").append(unboxToLong).append(") > endSeconds(").append(unboxToLong2).append(")").toString();
        });
        if (unboxToLong2 > maxSeconds()) {
            throw new ArithmeticException(new StringBuilder(34).append("Integer overflow. Max offset with ").append(new StringBuilder(34).append(rowsPerSecond()).append(" rowsPerSecond is ").append(maxSeconds()).append(", but it's ").append(unboxToLong2).append(" now.").toString()).toString());
        }
        if (lastTimeMs() < TimeUnit.SECONDS.toMillis(unboxToLong2) + creationTimeMs()) {
            lastTimeMs_$eq(TimeUnit.SECONDS.toMillis(unboxToLong2) + creationTimeMs());
        }
        long valueAtSecond = RateStreamProvider$.MODULE$.valueAtSecond(unboxToLong, rowsPerSecond(), rampUpTimeSeconds());
        long valueAtSecond2 = RateStreamProvider$.MODULE$.valueAtSecond(unboxToLong2, rowsPerSecond(), rampUpTimeSeconds());
        logDebug(() -> {
            return new StringBuilder(30).append("startSeconds: ").append(unboxToLong).append(", endSeconds: ").append(unboxToLong2).append(", ").append(new StringBuilder(24).append("rangeStart: ").append(valueAtSecond).append(", rangeEnd: ").append(valueAtSecond2).toString()).toString();
        });
        if (valueAtSecond == valueAtSecond2) {
            return (List) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.empty()).asJava();
        }
        long creationTimeMs = creationTimeMs() + TimeUnit.SECONDS.toMillis(unboxToLong);
        double millis = TimeUnit.SECONDS.toMillis(unboxToLong2 - unboxToLong) / (valueAtSecond2 - valueAtSecond);
        Option<SparkSession> activeSession = SparkSession$.MODULE$.getActiveSession();
        Predef$.MODULE$.require(activeSession.isDefined());
        int unboxToInt = BoxesRunTime.unboxToInt(Option$.MODULE$.apply(this.options.get(RateStreamProvider$.MODULE$.NUM_PARTITIONS()).orElse(null)).map(str -> {
            return BoxesRunTime.boxToInteger($anonfun$planInputPartitions$7(str));
        }).getOrElse(() -> {
            return ((SparkSession) activeSession.get()).sparkContext().defaultParallelism();
        }));
        return (List) JavaConverters$.MODULE$.seqAsJavaListConverter(((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), unboxToInt).map(obj -> {
            return $anonfun$planInputPartitions$9(unboxToInt, valueAtSecond, valueAtSecond2, creationTimeMs, millis, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toList()).asJava();
    }

    @Override // org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
    public void commit(Offset offset) {
    }

    @Override // org.apache.spark.sql.execution.streaming.BaseStreamingSource
    public void stop() {
    }

    public String toString() {
        return new StringBuilder(29).append("RateStreamV2[rowsPerSecond=").append(rowsPerSecond()).append(", ").append(new StringBuilder(20).append("rampUpTimeSeconds=").append(rampUpTimeSeconds()).append(", ").toString()).append(new StringBuilder(14).append("numPartitions=").append((Object) this.options.get(RateStreamProvider$.MODULE$.NUM_PARTITIONS()).orElse("default")).toString()).toString();
    }

    public static final /* synthetic */ int $anonfun$planInputPartitions$7(String str) {
        return new StringOps(Predef$.MODULE$.augmentString(str)).toInt();
    }

    public static final /* synthetic */ InputPartition $anonfun$planInputPartitions$9(int i, long j, long j2, long j3, double d, int i2) {
        return new RateStreamMicroBatchInputPartition(i2, i, j, j2, j3, d);
    }

    public RateStreamMicroBatchReader(DataSourceOptions dataSourceOptions, String str) {
        this.options = dataSourceOptions;
        this.org$apache$spark$sql$execution$streaming$sources$RateStreamMicroBatchReader$$checkpointLocation = str;
        Logging.$init$(this);
        this.clock = dataSourceOptions.getBoolean("useManualClock", false) ? new ManualClock() : new SystemClock();
        this.rowsPerSecond = new StringOps(Predef$.MODULE$.augmentString(dataSourceOptions.get(RateStreamProvider$.MODULE$.ROWS_PER_SECOND()).orElse("1"))).toLong();
        this.rampUpTimeSeconds = BoxesRunTime.unboxToLong(Option$.MODULE$.apply(dataSourceOptions.get(RateStreamProvider$.MODULE$.RAMP_UP_TIME()).orElse(null)).map(str2 -> {
            return BoxesRunTime.boxToLong(JavaUtils.timeStringAsSec(str2));
        }).getOrElse(() -> {
            return 0L;
        }));
        this.maxSeconds = Long.MAX_VALUE / rowsPerSecond();
        if (rampUpTimeSeconds() > maxSeconds()) {
            throw new ArithmeticException(new StringBuilder(48).append("Integer overflow. Max offset with ").append(rowsPerSecond()).append(" rowsPerSecond").append(new StringBuilder(34).append(" is ").append(maxSeconds()).append(", but 'rampUpTimeSeconds' is ").append(rampUpTimeSeconds()).append(".").toString()).toString());
        }
        final Option orElse = SparkSession$.MODULE$.getActiveSession().orElse(() -> {
            return SparkSession$.MODULE$.getDefaultSession();
        });
        Predef$.MODULE$.require(orElse.isDefined());
        HDFSMetadataLog<LongOffset> hDFSMetadataLog = new HDFSMetadataLog<LongOffset>(this, orElse) { // from class: org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader$$anon$1
            @Override // org.apache.spark.sql.execution.streaming.HDFSMetadataLog
            public void serialize(LongOffset longOffset, OutputStream outputStream) {
                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
                bufferedWriter.write(new StringBuilder(2).append("v").append(RateStreamProvider$.MODULE$.VERSION()).append("\n").toString());
                bufferedWriter.write(longOffset.json());
                bufferedWriter.flush();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.spark.sql.execution.streaming.HDFSMetadataLog
            public LongOffset deserialize(InputStream inputStream) {
                String iOUtils = IOUtils.toString(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                Predef$.MODULE$.assert(iOUtils.length() != 0);
                if (StringOps$.MODULE$.apply$extension(Predef$.MODULE$.augmentString(iOUtils), 0) != 'v') {
                    throw new IllegalStateException("Log file was malformed: failed to detect the log file version line.");
                }
                int indexOf = iOUtils.indexOf("\n");
                if (indexOf <= 0) {
                    throw new IllegalStateException("Log file was malformed: failed to detect the log file version line.");
                }
                parseVersion(iOUtils.substring(0, indexOf), RateStreamProvider$.MODULE$.VERSION());
                return LongOffset$.MODULE$.apply(new SerializedOffset(iOUtils.substring(indexOf + 1)));
            }

            {
                super((SparkSession) orElse.get(), this.org$apache$spark$sql$execution$streaming$sources$RateStreamMicroBatchReader$$checkpointLocation, ClassTag$.MODULE$.apply(LongOffset.class));
            }
        };
        this.creationTimeMs = ((LongOffset) hDFSMetadataLog.get(0L).getOrElse(() -> {
            LongOffset longOffset = new LongOffset(this.clock().getTimeMillis());
            hDFSMetadataLog.add(0L, longOffset);
            this.logInfo(() -> {
                return new StringBuilder(12).append("Start time: ").append(longOffset).toString();
            });
            return longOffset;
        })).offset();
        this.lastTimeMs = creationTimeMs();
    }
}
