/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.state;

import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.internal.Logging;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.sql.execution.streaming.CheckpointFileManager;
import org.apache.spark.util.NextIterator;
import org.slf4j.Logger;
import org.sparkproject.guava.io.ByteStreams;
import scala.Function0;
import scala.Tuple2;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u000514A\u0001D\u0007\u00019!AQ\u0007\u0001B\u0001B\u0003%a\u0007\u0003\u0005;\u0001\t\u0005\t\u0015!\u0003<\u0011!\u0019\u0005A!A!\u0002\u0013!\u0005\"\u0002&\u0001\t\u0003Y\u0005\"B)\u0001\t\u0013\u0011\u0006b\u0002/\u0001\u0005\u0004%I!\u0018\u0005\u0007C\u0002\u0001\u000b\u0011\u00020\t\u000f\t\u0004!\u0019!C\u0005G\"1A\r\u0001Q\u0001\nMCQ!\u001a\u0001\u0005\u0002\u0019DQA\u001b\u0001\u0005B-\u0014\u0011d\u0015;bi\u0016\u001cFo\u001c:f\u0007\"\fgnZ3m_\u001e\u0014V-\u00193fe*\u0011abD\u0001\u0006gR\fG/\u001a\u0006\u0003!E\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005I\u0019\u0012!C3yK\u000e,H/[8o\u0015\t!R#A\u0002tc2T!AF\f\u0002\u000bM\u0004\u0018M]6\u000b\u0005aI\u0012AB1qC\u000eDWMC\u0001\u001b\u0003\ry'oZ\u0002\u0001'\r\u0001Qd\f\t\u0004=\u0005\u001aS\"A\u0010\u000b\u0005\u0001*\u0012\u0001B;uS2L!AI\u0010\u0003\u00199+\u0007\u0010^%uKJ\fGo\u001c:\u0011\t\u0011:\u0013&K\u0007\u0002K)\ta%A\u0003tG\u0006d\u0017-\u0003\u0002)K\t1A+\u001e9mKJ\u00022\u0001\n\u0016-\u0013\tYSEA\u0003BeJ\f\u0017\u0010\u0005\u0002%[%\u0011a&\n\u0002\u0005\u0005f$X\r\u0005\u00021g5\t\u0011G\u0003\u00023+\u0005A\u0011N\u001c;fe:\fG.\u0003\u00025c\t9Aj\\4hS:<\u0017A\u00014n!\t9\u0004(D\u0001\u0010\u0013\tItBA\u000bDQ\u0016\u001c7\u000e]8j]R4\u0015\u000e\\3NC:\fw-\u001a:\u0002\u0015\u0019LG.\u001a+p%\u0016\fG\r\u0005\u0002=\u00036\tQH\u0003\u0002?\u007f\u0005\u0011am\u001d\u0006\u0003\u0001^\ta\u0001[1e_>\u0004\u0018B\u0001\">\u0005\u0011\u0001\u0016\r\u001e5\u0002!\r|W\u000e\u001d:fgNLwN\\\"pI\u0016\u001c\u0007CA#I\u001b\u00051%BA$\u0016\u0003\tIw.\u0003\u0002J\r\n\u00012i\\7qe\u0016\u001c8/[8o\u0007>$WmY\u0001\u0007y%t\u0017\u000e\u001e \u0015\t1su\n\u0015\t\u0003\u001b\u0002i\u0011!\u0004\u0005\u0006k\u0011\u0001\rA\u000e\u0005\u0006u\u0011\u0001\ra\u000f\u0005\u0006\u0007\u0012\u0001\r\u0001R\u0001\u0011I\u0016\u001cw.\u001c9sKN\u001c8\u000b\u001e:fC6$\"a\u0015.\u0011\u0005QCV\"A+\u000b\u0005\u001d3&\"A,\u0002\t)\fg/Y\u0005\u00033V\u0013q\u0002R1uC&s\u0007/\u001e;TiJ,\u0017-\u001c\u0005\u00067\u0016\u0001\raU\u0001\fS:\u0004X\u000f^*ue\u0016\fW.\u0001\u0007t_V\u00148-Z*ue\u0016\fW.F\u0001_!\tat,\u0003\u0002a{\t\tbi\u0015#bi\u0006Le\u000e];u'R\u0014X-Y7\u0002\u001bM|WO]2f'R\u0014X-Y7!\u0003\u0015Ig\u000e];u+\u0005\u0019\u0016AB5oaV$\b%A\u0003dY>\u001cX\rF\u0001h!\t!\u0003.\u0003\u0002jK\t!QK\\5u\u0003\u001d9W\r\u001e(fqR$\u0012a\t")
public class StateStoreChangelogReader
extends NextIterator<Tuple2<byte[], byte[]>>
implements Logging {
    private final CheckpointFileManager fm;
    private final Path fileToRead;
    private final CompressionCodec compressionCodec;
    private final FSDataInputStream sourceStream;
    private final DataInputStream input;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private DataInputStream decompressStream(DataInputStream inputStream) {
        InputStream compressed = this.compressionCodec.compressedInputStream((InputStream)inputStream);
        return new DataInputStream(compressed);
    }

    private FSDataInputStream sourceStream() {
        return this.sourceStream;
    }

    private DataInputStream input() {
        return this.input;
    }

    public void close() {
        block0: {
            if (this.input() == null) break block0;
            this.input().close();
        }
    }

    public Tuple2<byte[], byte[]> getNext() {
        Tuple2 tuple2;
        int keySize = this.input().readInt();
        if (keySize == -1) {
            this.finished_$eq(true);
            tuple2 = null;
        } else {
            if (keySize < 0) {
                throw new IOException(new StringBuilder(56).append("Error reading streaming state file ").append(this.fileToRead).append(": key size cannot be ").append(keySize).toString());
            }
            byte[] keyBuffer = new byte[keySize];
            ByteStreams.readFully((InputStream)this.input(), (byte[])keyBuffer, (int)0, (int)keySize);
            int valueSize = this.input().readInt();
            if (valueSize < 0) {
                tuple2 = new Tuple2((Object)keyBuffer, null);
            } else {
                byte[] valueBuffer = new byte[valueSize];
                ByteStreams.readFully((InputStream)this.input(), (byte[])valueBuffer, (int)0, (int)valueSize);
                tuple2 = new Tuple2((Object)keyBuffer, (Object)valueBuffer);
            }
        }
        return tuple2;
    }

    private final /* synthetic */ FSDataInputStream liftedTree1$1() {
        FSDataInputStream fSDataInputStream;
        try {
            fSDataInputStream = this.fm.open(this.fileToRead);
        }
        catch (FileNotFoundException f) {
            throw new IllegalStateException(new StringBuilder(152).append(new StringBuilder(55).append("Error reading streaming state file of ").append(this.fileToRead).append(" does not exist. ").toString()).append("If the stream job is restarted with a new or updated state operation, please").append(" create a new checkpoint location or clear the existing checkpoint location.").toString(), f);
        }
        return fSDataInputStream;
    }

    public StateStoreChangelogReader(CheckpointFileManager fm, Path fileToRead, CompressionCodec compressionCodec) {
        this.fm = fm;
        this.fileToRead = fileToRead;
        this.compressionCodec = compressionCodec;
        Logging.$init$((Logging)this);
        this.sourceStream = this.liftedTree1$1();
        this.input = this.decompressStream((DataInputStream)this.sourceStream());
    }
}

