package alluxio.underfs;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.retry.CountingRetry;
import alluxio.retry.RetryPolicy;
import alluxio.retry.RetryUtils;
import alluxio.util.CommonUtils;
import alluxio.util.io.PathUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/underfs/ObjectLowLevelOutputStream.class */
public abstract class ObjectLowLevelOutputStream extends OutputStream implements ContentHashable {
    protected static final Logger LOG = LoggerFactory.getLogger(ObjectLowLevelOutputStream.class);
    protected final List<String> mTmpDirs;
    protected static final long UPLOAD_THRESHOLD = 5242880;
    protected final String mBucketName;
    protected final String mKey;

    @Nullable
    protected MessageDigest mHash;
    protected long mPartitionOffset;
    protected final long mPartitionSize;

    @Nullable
    protected File mFile;

    @Nullable
    protected OutputStream mLocalOutputStream;
    private final AtomicInteger mPartNumber;
    private final ListeningExecutorService mExecutor;

    @Nullable
    private Long mUploadPartTimeoutMills;
    protected final RetryPolicy mRetryPolicy = new CountingRetry(5);
    protected final byte[] mSingleCharWrite = new byte[1];
    protected boolean mClosed = false;
    private final List<ListenableFuture<?>> mFutures = new ArrayList();
    private boolean mMultiPartUploadInitialized = false;

    public ObjectLowLevelOutputStream(String str, String str2, ListeningExecutorService listeningExecutorService, long j, AlluxioConfiguration alluxioConfiguration) {
        Preconditions.checkArgument((str == null || str.isEmpty()) ? false : true, "Bucket name must not be null or empty.");
        this.mBucketName = str;
        this.mTmpDirs = alluxioConfiguration.getList(PropertyKey.TMP_DIRS);
        Preconditions.checkArgument(!this.mTmpDirs.isEmpty(), "No temporary directories available");
        this.mExecutor = listeningExecutorService;
        this.mKey = str2;
        initHash();
        this.mPartitionSize = Math.max(UPLOAD_THRESHOLD, j);
        this.mPartNumber = new AtomicInteger(1);
        if (alluxioConfiguration.isSet(PropertyKey.UNDERFS_OBJECT_STORE_STREAMING_UPLOAD_PART_TIMEOUT)) {
            this.mUploadPartTimeoutMills = Long.valueOf(alluxioConfiguration.getDuration(PropertyKey.UNDERFS_OBJECT_STORE_STREAMING_UPLOAD_PART_TIMEOUT).toMillis());
        }
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        this.mSingleCharWrite[0] = (byte) i;
        write(this.mSingleCharWrite);
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr) throws IOException {
        write(bArr, 0, bArr.length);
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr, int i, int i2) throws IOException {
        if (bArr == null || i2 == 0) {
            return;
        }
        Preconditions.checkNotNull(bArr);
        Preconditions.checkArgument(i >= 0 && i <= bArr.length && i2 >= 0 && i + i2 <= bArr.length);
        if (this.mFile == null) {
            initNewFile();
        }
        if (this.mPartitionOffset + i2 <= this.mPartitionSize) {
            this.mLocalOutputStream.write(bArr, i, i2);
            this.mPartitionOffset += i2;
            return;
        }
        int i3 = (int) (this.mPartitionSize - this.mPartitionOffset);
        this.mLocalOutputStream.write(bArr, i, i3);
        this.mPartitionOffset += i3;
        uploadPart();
        write(bArr, i + i3, i2 - i3);
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public void flush() throws IOException {
        if (this.mMultiPartUploadInitialized) {
            if (this.mLocalOutputStream != null) {
                this.mLocalOutputStream.flush();
            }
            if (this.mPartitionOffset > UPLOAD_THRESHOLD) {
                uploadPart();
            }
            waitForAllPartsUpload();
        }
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        this.mClosed = true;
        if (this.mMultiPartUploadInitialized) {
            try {
                if (this.mFile != null) {
                    this.mLocalOutputStream.close();
                    uploadPart(this.mFile, this.mPartNumber.getAndIncrement(), true);
                }
                waitForAllPartsUpload();
                RetryUtils.retry("complete multipart upload", this::completeMultiPartUploadInternal, this.mRetryPolicy);
                return;
            } catch (Exception e) {
                LOG.error("Failed to upload {}", this.mKey, e);
                throw new IOException(e);
            }
        }
        if (this.mFile == null) {
            LOG.debug("Streaming upload output stream closed without uploading any data.");
            RetryUtils.retry("put empty object for key" + this.mKey, () -> {
                createEmptyObject(this.mKey);
            }, this.mRetryPolicy);
            return;
        }
        try {
            this.mLocalOutputStream.close();
            String encodeBase64String = this.mHash != null ? Base64.encodeBase64String(this.mHash.digest()) : null;
            RetryUtils.retry("put object for key" + this.mKey, () -> {
                putObject(this.mKey, this.mFile, encodeBase64String);
            }, this.mRetryPolicy);
        } finally {
            if (!this.mFile.delete()) {
                LOG.error("Failed to delete temporary file @ {}", this.mFile.getPath());
            }
        }
    }

    private void initNewFile() throws IOException {
        this.mFile = new File(PathUtils.concatPath(CommonUtils.getTmpDir(this.mTmpDirs), UUID.randomUUID()));
        initHash();
        if (this.mHash != null) {
            this.mLocalOutputStream = new BufferedOutputStream(new DigestOutputStream(new FileOutputStream(this.mFile), this.mHash));
        } else {
            this.mLocalOutputStream = new BufferedOutputStream(new FileOutputStream(this.mFile));
        }
        this.mPartitionOffset = 0L;
        LOG.debug("Init new temp file @ {}", this.mFile.getPath());
    }

    private void initHash() {
        try {
            this.mHash = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            LOG.warn("Algorithm not available for MD5 hash.", e);
            this.mHash = null;
        }
    }

    protected void uploadPart() throws IOException {
        if (this.mFile == null) {
            return;
        }
        if (!this.mMultiPartUploadInitialized) {
            RetryUtils.retry("init multipart upload", this::initMultiPartUploadInternal, this.mRetryPolicy);
            this.mMultiPartUploadInitialized = true;
        }
        this.mLocalOutputStream.close();
        uploadPart(new File(this.mFile.getPath()), this.mPartNumber.getAndIncrement(), false);
        this.mFile = null;
        this.mLocalOutputStream = null;
    }

    protected void uploadPart(File file, int i, boolean z) {
        String encodeBase64String = this.mHash != null ? Base64.encodeBase64String(this.mHash.digest()) : null;
        this.mFutures.add(this.mExecutor.submit(() -> {
            try {
                RetryUtils.retry("upload part for key " + this.mKey + " and part number " + i, () -> {
                    uploadPartInternal(file, i, z, encodeBase64String);
                }, this.mRetryPolicy);
                if (!file.delete()) {
                    LOG.error("Failed to delete temporary file @ {}", file.getPath());
                }
                return null;
            } catch (Throwable th) {
                if (!file.delete()) {
                    LOG.error("Failed to delete temporary file @ {}", file.getPath());
                }
                throw th;
            }
        }));
        LOG.debug("Submit upload part request. key={}, partNum={}, file={}, fileSize={}, lastPart={}.", new Object[]{this.mKey, Integer.valueOf(i), file.getPath(), Long.valueOf(file.length()), Boolean.valueOf(z)});
    }

    protected void abortMultiPartUpload() throws IOException {
        RetryUtils.retry("abort multipart upload for key " + this.mKey, this::abortMultiPartUploadInternal, this.mRetryPolicy);
    }

    protected void waitForAllPartsUpload() throws IOException {
        try {
            for (ListenableFuture<?> listenableFuture : this.mFutures) {
                if (this.mUploadPartTimeoutMills == null) {
                    listenableFuture.get();
                } else {
                    listenableFuture.get(this.mUploadPartTimeoutMills.longValue(), TimeUnit.MILLISECONDS);
                }
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted object upload.", e);
            Futures.allAsList(this.mFutures).cancel(true);
            abortMultiPartUpload();
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            Futures.allAsList(this.mFutures).cancel(true);
            abortMultiPartUpload();
            throw new IOException("Part upload failed in multipart upload with to " + this.mKey, e2);
        } catch (TimeoutException e3) {
            LOG.error("timeout when upload part");
            Futures.allAsList(this.mFutures).cancel(true);
            abortMultiPartUpload();
            throw new IOException("timeout when upload part " + this.mKey, e3);
        }
        this.mFutures.clear();
    }

    @VisibleForTesting
    public int getPartNumber() {
        return this.mPartNumber.get();
    }

    protected abstract void uploadPartInternal(File file, int i, boolean z, @Nullable String str) throws IOException;

    protected abstract void initMultiPartUploadInternal() throws IOException;

    protected abstract void completeMultiPartUploadInternal() throws IOException;

    protected abstract void abortMultiPartUploadInternal() throws IOException;

    protected abstract void createEmptyObject(String str) throws IOException;

    protected abstract void putObject(String str, File file, @Nullable String str2) throws IOException;
}
