package org.apache.flink.streaming.connectors.fs;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/connectors/fs/RollingSink.class */
public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RollingSink.class);
    private static final long DEFAULT_BATCH_SIZE = 402653184;
    private static final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
    private static final String DEFAULT_IN_PROGRESS_PREFIX = "_";
    private static final String DEFAULT_PENDING_SUFFIX = ".pending";
    private static final String DEFAULT_PENDING_PREFIX = "_";
    private static final String DEFAULT_VALID_SUFFIX = ".valid-length";
    private static final String DEFAULT_VALID_PREFIX = "_";
    private static final String DEFAULT_PART_REFIX = "part";
    private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60000;
    private final String basePath;
    private Writer<T> writer;
    private transient Path currentPartPath;
    private transient Path currentBucketDirectory;
    private transient int partCounter;
    private transient boolean isWriterOpen;
    private transient Method refTruncate;
    private transient BucketState bucketState;
    private transient ListState<BucketState> restoredBucketStates;

    @Nullable
    private Configuration fsConfig;
    private transient FileSystem fs;
    private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
    private String inProgressPrefix = "_";
    private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
    private String pendingPrefix = "_";
    private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
    private String validLengthPrefix = "_";
    private String partPrefix = DEFAULT_PART_REFIX;
    private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS;
    private Bucketer bucketer = new DateTimeBucketer();
    private long batchSize = DEFAULT_BATCH_SIZE;
    private Writer<T> writerTemplate = new StringWriter();

    /* loaded from: input_file:org/apache/flink/streaming/connectors/fs/RollingSink$BucketState.class */
    public static final class BucketState implements Serializable {
        private static final long serialVersionUID = 1;
        public String currentFile;
        public long currentFileValidLength = -1;
        public List<String> pendingFiles = new ArrayList();
        public final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap();

        public String toString() {
            return "In-progress=" + this.currentFile + " validLength=" + this.currentFileValidLength + " pendingForNextCheckpoint=" + this.pendingFiles + " pendingForPrevCheckpoints=" + this.pendingFilesPerCheckpoint;
        }
    }

    public RollingSink(String str) {
        this.basePath = str;
    }

    public RollingSink<T> setFSConfig(Configuration configuration) {
        this.fsConfig = new Configuration();
        this.fsConfig.addAll(configuration);
        return this;
    }

    public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration configuration) {
        this.fsConfig = new Configuration();
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            this.fsConfig.setString((String) entry.getKey(), (String) entry.getValue());
        }
        return this;
    }

    public void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        if (this.writerTemplate instanceof InputTypeConfigurable) {
            this.writerTemplate.setInputType(typeInformation, executionConfig);
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        Preconditions.checkArgument(this.restoredBucketStates == null, "The " + getClass().getSimpleName() + " has already been initialized.");
        try {
            initFileSystem();
            if (this.refTruncate == null) {
                this.refTruncate = reflectTruncate(this.fs);
            }
            this.restoredBucketStates = functionInitializationContext.getOperatorStateStore().getSerializableListState("rolling-states");
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            if (!functionInitializationContext.isRestored()) {
                LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask));
                return;
            }
            LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask));
            Iterator<T> it = ((Iterable) this.restoredBucketStates.get()).iterator();
            while (it.hasNext()) {
                handleRestoredBucketState((BucketState) it.next());
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} (taskIdx= {}) restored {}", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), this.bucketState});
            }
        } catch (IOException e) {
            LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.", e);
            throw new RuntimeException("Error while creating FileSystem when initializing the state of the RollingSink.", e);
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.partCounter = 0;
        this.writer = this.writerTemplate.duplicate();
        this.bucketState = new BucketState();
    }

    private void initFileSystem() throws IOException {
        if (this.fs == null) {
            this.fs = BucketingSink.createHadoopFileSystem(new Path(this.basePath), this.fsConfig);
        }
    }

    public void close() throws Exception {
        closeCurrentPartFile();
    }

    public void invoke(T t) throws Exception {
        if (shouldRoll()) {
            openNewPartFile();
        }
        this.writer.write(t);
    }

    private boolean shouldRoll() throws IOException {
        boolean z = false;
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        if (!this.isWriterOpen) {
            z = true;
            LOG.debug("RollingSink {} starting new initial bucket. ", Integer.valueOf(indexOfThisSubtask));
        }
        if (this.bucketer.shouldStartNewBucket(new Path(this.basePath), this.currentBucketDirectory)) {
            z = true;
            LOG.debug("RollingSink {} starting new bucket because {} said we should. ", Integer.valueOf(indexOfThisSubtask), this.bucketer);
            this.partCounter = 0;
        }
        if (this.isWriterOpen) {
            long pos = this.writer.getPos();
            if (this.isWriterOpen && pos > this.batchSize) {
                z = true;
                LOG.debug("RollingSink {} starting new bucket because file position {} is above batch size {}.", new Object[]{Integer.valueOf(indexOfThisSubtask), Long.valueOf(pos), Long.valueOf(this.batchSize)});
            }
        }
        return z;
    }

    private void openNewPartFile() throws Exception {
        closeCurrentPartFile();
        Path nextBucketPath = this.bucketer.getNextBucketPath(new Path(this.basePath));
        if (!nextBucketPath.equals(this.currentBucketDirectory)) {
            this.currentBucketDirectory = nextBucketPath;
            try {
                if (this.fs.mkdirs(this.currentBucketDirectory)) {
                    LOG.debug("Created new bucket directory: {}", this.currentBucketDirectory);
                }
            } catch (IOException e) {
                throw new RuntimeException("Could not create base path for new rolling file.", e);
            }
        }
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        this.currentPartPath = new Path(this.currentBucketDirectory, this.partPrefix + "-" + indexOfThisSubtask + "-" + this.partCounter);
        while (true) {
            if (!this.fs.exists(this.currentPartPath) && !this.fs.exists(getPendingPathFor(this.currentPartPath)) && !this.fs.exists(getInProgressPathFor(this.currentPartPath))) {
                this.partCounter++;
                LOG.debug("Next part path is {}", this.currentPartPath.toString());
                this.writer.open(this.fs, getInProgressPathFor(this.currentPartPath));
                this.isWriterOpen = true;
                return;
            }
            this.partCounter++;
            this.currentPartPath = new Path(this.currentBucketDirectory, this.partPrefix + "-" + indexOfThisSubtask + "-" + this.partCounter);
        }
    }

    private Path getPendingPathFor(Path path) {
        return new Path(path.getParent(), this.pendingPrefix + path.getName()).suffix(this.pendingSuffix);
    }

    private Path getInProgressPathFor(Path path) {
        return new Path(path.getParent(), this.inProgressPrefix + path.getName()).suffix(this.inProgressSuffix);
    }

    private Path getValidLengthPathFor(Path path) {
        return new Path(path.getParent(), this.validLengthPrefix + path.getName()).suffix(this.validLengthSuffix);
    }

    private void closeCurrentPartFile() throws Exception {
        if (this.isWriterOpen) {
            this.writer.close();
            this.isWriterOpen = false;
        }
        if (this.currentPartPath != null) {
            Path inProgressPathFor = getInProgressPathFor(this.currentPartPath);
            Path pendingPathFor = getPendingPathFor(this.currentPartPath);
            this.fs.rename(inProgressPathFor, pendingPathFor);
            LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPathFor, pendingPathFor);
            this.bucketState.pendingFiles.add(this.currentPartPath.toString());
        }
    }

    private Method reflectTruncate(FileSystem fileSystem) {
        Method method = null;
        if (fileSystem != null) {
            try {
                method = fileSystem.getClass().getMethod("truncate", Path.class, Long.TYPE);
                Path path = new Path(UUID.randomUUID().toString());
                try {
                    FSDataOutputStream create = fileSystem.create(path);
                    Throwable th = null;
                    try {
                        try {
                            create.writeUTF("hello");
                            if (create != null) {
                                if (0 != 0) {
                                    try {
                                        create.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    create.close();
                                }
                            }
                            try {
                                method.invoke(fileSystem, path, 2);
                            } catch (IllegalAccessException | InvocationTargetException e) {
                                LOG.debug("Truncate is not supported.", e);
                                method = null;
                            }
                            try {
                                fileSystem.delete(path, false);
                            } catch (IOException e2) {
                                LOG.error("Could not delete truncate test file.", e2);
                                throw new RuntimeException("Could not delete truncate test file.", e2);
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (IOException e3) {
                    LOG.error("Could not create file for checking if truncate works.", e3);
                    throw new RuntimeException("Could not create file for checking if truncate works.", e3);
                }
            } catch (NoSuchMethodException e4) {
                LOG.debug("Truncate not found. Will write a file with suffix '{}'  and prefix '{}' to specify how many bytes in a bucket are valid.", this.validLengthSuffix, this.validLengthPrefix);
                return null;
            }
        }
        return method;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        synchronized (this.bucketState.pendingFilesPerCheckpoint) {
            Iterator<Map.Entry<Long, List<String>>> it = this.bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Long, List<String>> next = it.next();
                Long key = next.getKey();
                if (key.longValue() <= j) {
                    LOG.debug("Moving pending files to final location for checkpoint {}", key);
                    Iterator<String> it2 = next.getValue().iterator();
                    while (it2.hasNext()) {
                        Path path = new Path(it2.next());
                        Path pendingPathFor = getPendingPathFor(path);
                        this.fs.rename(pendingPathFor, path);
                        LOG.debug("Moving pending file {} to final location after complete checkpoint {}.", pendingPathFor, key);
                    }
                    it.remove();
                }
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkNotNull(this.restoredBucketStates, "The " + getClass().getSimpleName() + " has not been properly initialized.");
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        if (this.isWriterOpen) {
            this.bucketState.currentFile = this.currentPartPath.toString();
            this.bucketState.currentFileValidLength = this.writer.flush();
        }
        synchronized (this.bucketState.pendingFilesPerCheckpoint) {
            this.bucketState.pendingFilesPerCheckpoint.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), this.bucketState.pendingFiles);
        }
        this.bucketState.pendingFiles = new ArrayList();
        this.restoredBucketStates.clear();
        this.restoredBucketStates.add(this.bucketState);
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} (taskIdx={}) checkpointed {}.", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), this.bucketState});
        }
    }

    private void handleRestoredBucketState(BucketState bucketState) {
        bucketState.pendingFiles.clear();
        if (bucketState.currentFile != null) {
            Path path = new Path(bucketState.currentFile);
            try {
                Path pendingPathFor = getPendingPathFor(path);
                Path inProgressPathFor = getInProgressPathFor(path);
                if (this.fs.exists(pendingPathFor)) {
                    LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", path);
                    this.fs.rename(pendingPathFor, path);
                } else if (this.fs.exists(inProgressPathFor)) {
                    LOG.debug("In-progress file {} is still in-progress, moving to final location.", path);
                    this.fs.rename(inProgressPathFor, path);
                } else if (this.fs.exists(path)) {
                    LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, path);
                } else {
                    LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, it was moved to final location by a previous snapshot restore", bucketState.currentFile);
                }
                if (this.refTruncate == null) {
                    this.refTruncate = reflectTruncate(this.fs);
                }
                if (this.refTruncate != null) {
                    LOG.debug("Truncating {} to valid length {}", path, Long.valueOf(bucketState.currentFileValidLength));
                    if (this.fs instanceof DistributedFileSystem) {
                        DistributedFileSystem distributedFileSystem = this.fs;
                        LOG.debug("Trying to recover file lease {}", path);
                        distributedFileSystem.recoverLease(path);
                        StopWatch stopWatch = new StopWatch();
                        stopWatch.start();
                        for (boolean isFileClosed = distributedFileSystem.isFileClosed(path); !isFileClosed && stopWatch.getTime() <= this.asyncTimeout; isFileClosed = distributedFileSystem.isFileClosed(path)) {
                            try {
                                Thread.sleep(500L);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                    if (!((Boolean) this.refTruncate.invoke(this.fs, path, Long.valueOf(bucketState.currentFileValidLength))).booleanValue()) {
                        LOG.debug("Truncate did not immediately complete for {}, waiting...", path);
                        StopWatch stopWatch2 = new StopWatch();
                        stopWatch2.start();
                        long len = this.fs.getFileStatus(path).getLen();
                        while (len != bucketState.currentFileValidLength && stopWatch2.getTime() <= this.asyncTimeout) {
                            try {
                                Thread.sleep(500L);
                            } catch (InterruptedException e2) {
                            }
                            len = this.fs.getFileStatus(path).getLen();
                        }
                        if (len != bucketState.currentFileValidLength) {
                            throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + len + ".");
                        }
                    }
                } else {
                    Path validLengthPathFor = getValidLengthPathFor(path);
                    if (!this.fs.exists(validLengthPathFor) && this.fs.exists(path)) {
                        LOG.debug("Writing valid-length file for {} to specify valid length {}", path, Long.valueOf(bucketState.currentFileValidLength));
                        FSDataOutputStream create = this.fs.create(validLengthPathFor);
                        Throwable th = null;
                        try {
                            try {
                                create.writeUTF(Long.toString(bucketState.currentFileValidLength));
                                if (create != null) {
                                    if (0 != 0) {
                                        try {
                                            create.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        create.close();
                                    }
                                }
                            } finally {
                            }
                        } catch (Throwable th3) {
                            if (create != null) {
                                if (th != null) {
                                    try {
                                        create.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    create.close();
                                }
                            }
                            throw th3;
                        }
                    }
                }
                bucketState.currentFile = null;
                bucketState.currentFileValidLength = -1L;
                this.isWriterOpen = false;
            } catch (IOException e3) {
                LOG.error("Error while restoring RollingSink state.", e3);
                throw new RuntimeException("Error while restoring RollingSink state.", e3);
            } catch (IllegalAccessException | InvocationTargetException e4) {
                LOG.error("Could not invoke truncate.", e4);
                throw new RuntimeException("Could not invoke truncate.", e4);
            }
        }
        Set<Long> keySet = bucketState.pendingFilesPerCheckpoint.keySet();
        LOG.debug("Moving pending files to final location on restore.");
        for (Long l : keySet) {
            Iterator<String> it = bucketState.pendingFilesPerCheckpoint.get(l).iterator();
            while (it.hasNext()) {
                Path path2 = new Path(it.next());
                Path pendingPathFor2 = getPendingPathFor(path2);
                try {
                    if (this.fs.exists(pendingPathFor2)) {
                        LOG.debug("(RESTORE) Moving pending file {} to final location after complete checkpoint {}.", pendingPathFor2, l);
                        this.fs.rename(pendingPathFor2, path2);
                    }
                } catch (IOException e5) {
                    LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", new Object[]{pendingPathFor2, path2, e5});
                    throw new RuntimeException("Error while renaming pending file " + pendingPathFor2 + " to final path " + path2, e5);
                }
            }
        }
        synchronized (bucketState.pendingFilesPerCheckpoint) {
            bucketState.pendingFilesPerCheckpoint.clear();
        }
    }

    public RollingSink<T> setBatchSize(long j) {
        this.batchSize = j;
        return this;
    }

    public RollingSink<T> setBucketer(Bucketer bucketer) {
        this.bucketer = bucketer;
        return this;
    }

    public RollingSink<T> setWriter(Writer<T> writer) {
        this.writerTemplate = writer;
        return this;
    }

    public RollingSink<T> setInProgressSuffix(String str) {
        this.inProgressSuffix = str;
        return this;
    }

    public RollingSink<T> setInProgressPrefix(String str) {
        this.inProgressPrefix = str;
        return this;
    }

    public RollingSink<T> setPendingSuffix(String str) {
        this.pendingSuffix = str;
        return this;
    }

    public RollingSink<T> setPendingPrefix(String str) {
        this.pendingPrefix = str;
        return this;
    }

    public RollingSink<T> setValidLengthSuffix(String str) {
        this.validLengthSuffix = str;
        return this;
    }

    public RollingSink<T> setValidLengthPrefix(String str) {
        this.validLengthPrefix = str;
        return this;
    }

    public RollingSink<T> setPartPrefix(String str) {
        this.partPrefix = str;
        return this;
    }

    @Deprecated
    public RollingSink<T> disableCleanupOnOpen() {
        return this;
    }

    public RollingSink<T> setAsyncTimeout(long j) {
        this.asyncTimeout = j;
        return this;
    }
}
