/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.S3ADataBlocks;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3ABlockOutputStream
extends OutputStream {
    private static final Logger LOG = LoggerFactory.getLogger(S3ABlockOutputStream.class);
    private final S3AFileSystem fs;
    private final String key;
    private final int blockSize;
    private long bytesSubmitted;
    private final ProgressListener progressListener;
    private final ListeningExecutorService executorService;
    private final RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithProportionalSleep((int)5, (long)2000L, (TimeUnit)TimeUnit.MILLISECONDS);
    private final S3ADataBlocks.BlockFactory blockFactory;
    private final byte[] singleCharWrite = new byte[1];
    private MultiPartUpload multiPartUpload;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private S3ADataBlocks.DataBlock activeBlock;
    private long blockCount = 0L;
    private final S3AInstrumentation.OutputStreamStatistics statistics;
    private final S3AFileSystem.WriteOperationHelper writeOperationHelper;

    S3ABlockOutputStream(S3AFileSystem fs, String key, ExecutorService executorService, Progressable progress, long blockSize, S3ADataBlocks.BlockFactory blockFactory, S3AInstrumentation.OutputStreamStatistics statistics, S3AFileSystem.WriteOperationHelper writeOperationHelper) throws IOException {
        this.fs = fs;
        this.key = key;
        this.blockFactory = blockFactory;
        this.blockSize = (int)blockSize;
        this.statistics = statistics;
        this.writeOperationHelper = writeOperationHelper;
        Preconditions.checkArgument((blockSize >= 0x500000L ? 1 : 0) != 0, (String)"Block size is too small: %d", (Object[])new Object[]{blockSize});
        this.executorService = MoreExecutors.listeningDecorator((ExecutorService)executorService);
        this.multiPartUpload = null;
        this.progressListener = progress instanceof ProgressListener ? (ProgressListener)progress : new ProgressableListener(progress);
        this.createBlockIfNeeded();
        LOG.debug("Initialized S3ABlockOutputStream for {} output to {}", (Object)writeOperationHelper, (Object)this.activeBlock);
    }

    private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded() throws IOException {
        if (this.activeBlock == null) {
            ++this.blockCount;
            if (this.blockCount >= 10000L) {
                LOG.error("Number of partitions in stream exceeds limit for S3: 10000 write may fail.");
            }
            this.activeBlock = this.blockFactory.create(this.blockCount, this.blockSize, this.statistics);
        }
        return this.activeBlock;
    }

    private synchronized S3ADataBlocks.DataBlock getActiveBlock() {
        return this.activeBlock;
    }

    private synchronized boolean hasActiveBlock() {
        return this.activeBlock != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearActiveBlock() {
        if (this.activeBlock != null) {
            LOG.debug("Clearing active block");
        }
        S3ABlockOutputStream s3ABlockOutputStream = this;
        synchronized (s3ABlockOutputStream) {
            this.activeBlock = null;
        }
    }

    void checkOpen() throws IOException {
        if (this.closed.get()) {
            throw new IOException("Filesystem " + this.writeOperationHelper + " closed");
        }
    }

    @Override
    public synchronized void flush() throws IOException {
        this.checkOpen();
        S3ADataBlocks.DataBlock dataBlock = this.getActiveBlock();
        if (dataBlock != null) {
            dataBlock.flush();
        }
    }

    @Override
    public synchronized void write(int b) throws IOException {
        this.singleCharWrite[0] = (byte)b;
        this.write(this.singleCharWrite, 0, 1);
    }

    @Override
    public synchronized void write(byte[] source, int offset, int len) throws IOException {
        S3ADataBlocks.validateWriteArgs(source, offset, len);
        this.checkOpen();
        if (len == 0) {
            return;
        }
        S3ADataBlocks.DataBlock block = this.createBlockIfNeeded();
        int written = block.write(source, offset, len);
        int remainingCapacity = block.remainingCapacity();
        if (written < len) {
            LOG.debug("writing more data than block has capacity -triggering upload");
            this.uploadCurrentBlock();
            this.write(source, offset + written, len - written);
        } else if (remainingCapacity == 0) {
            this.uploadCurrentBlock();
        }
    }

    private synchronized void uploadCurrentBlock() throws IOException {
        Preconditions.checkState((boolean)this.hasActiveBlock(), (Object)"No active block");
        LOG.debug("Writing block # {}", (Object)this.blockCount);
        if (this.multiPartUpload == null) {
            LOG.debug("Initiating Multipart upload");
            this.multiPartUpload = new MultiPartUpload();
        }
        try {
            this.multiPartUpload.uploadBlockAsync(this.getActiveBlock());
            this.bytesSubmitted += (long)this.getActiveBlock().dataSize();
        }
        finally {
            this.clearActiveBlock();
        }
    }

    @Override
    public void close() throws IOException {
        if (this.closed.getAndSet(true)) {
            LOG.debug("Ignoring close() as stream is already closed");
            return;
        }
        S3ADataBlocks.DataBlock block = this.getActiveBlock();
        boolean hasBlock = this.hasActiveBlock();
        LOG.debug("{}: Closing block #{}: current block= {}", new Object[]{this, this.blockCount, hasBlock ? block : "(none)"});
        long bytes = 0L;
        try {
            if (this.multiPartUpload == null) {
                if (hasBlock) {
                    bytes = this.putObject();
                }
            } else {
                if (hasBlock && block.hasData()) {
                    this.uploadCurrentBlock();
                }
                List partETags = this.multiPartUpload.waitForAllPartUploads();
                this.multiPartUpload.complete(partETags);
                bytes = this.bytesSubmitted;
            }
            LOG.debug("Upload complete for {}", (Object)this.writeOperationHelper);
        }
        catch (IOException ioe) {
            try {
                this.writeOperationHelper.writeFailed(ioe);
                throw ioe;
            }
            catch (Throwable throwable) {
                S3AUtils.closeAll(LOG, block, this.blockFactory);
                LOG.debug("Statistics: {}", (Object)this.statistics);
                S3AUtils.closeAll(LOG, this.statistics);
                this.clearActiveBlock();
                throw throwable;
            }
        }
        S3AUtils.closeAll(LOG, block, this.blockFactory);
        LOG.debug("Statistics: {}", (Object)this.statistics);
        S3AUtils.closeAll(LOG, this.statistics);
        this.clearActiveBlock();
        this.writeOperationHelper.writeSuccessful(bytes);
    }

    private int putObject() throws IOException {
        LOG.debug("Executing regular upload for {}", (Object)this.writeOperationHelper);
        final S3ADataBlocks.DataBlock block = this.getActiveBlock();
        int size = block.dataSize();
        final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
        final PutObjectRequest putObjectRequest = uploadData.hasFile() ? this.writeOperationHelper.newPutRequest(uploadData.getFile()) : this.writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size);
        long transferQueueTime = this.now();
        BlockUploadProgress callback = new BlockUploadProgress(block, this.progressListener, transferQueueTime);
        putObjectRequest.setGeneralProgressListener((ProgressListener)callback);
        this.statistics.blockUploadQueued(size);
        ListenableFuture putObjectResult = this.executorService.submit((Callable)new Callable<PutObjectResult>(){

            @Override
            public PutObjectResult call() throws Exception {
                PutObjectResult result;
                try {
                    result = S3ABlockOutputStream.this.writeOperationHelper.putObject(putObjectRequest);
                }
                catch (Throwable throwable) {
                    S3AUtils.closeAll(LOG, uploadData, block);
                    throw throwable;
                }
                S3AUtils.closeAll(LOG, uploadData, block);
                return result;
            }
        });
        this.clearActiveBlock();
        try {
            putObjectResult.get();
            return size;
        }
        catch (InterruptedException ie) {
            LOG.warn("Interrupted object upload", (Throwable)ie);
            Thread.currentThread().interrupt();
            return 0;
        }
        catch (ExecutionException ee) {
            throw S3AUtils.extractException("regular upload", this.key, ee);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("S3ABlockOutputStream{");
        sb.append(this.writeOperationHelper.toString());
        sb.append(", blockSize=").append(this.blockSize);
        S3ADataBlocks.DataBlock block = this.activeBlock;
        if (block != null) {
            sb.append(", activeBlock=").append(block);
        }
        sb.append('}');
        return sb.toString();
    }

    private void incrementWriteOperations() {
        this.fs.incrementWriteOperations();
    }

    private long now() {
        return System.currentTimeMillis();
    }

    S3AInstrumentation.OutputStreamStatistics getStatistics() {
        return this.statistics;
    }

    private static class ProgressableListener
    implements ProgressListener {
        private final Progressable progress;

        public ProgressableListener(Progressable progress) {
            this.progress = progress;
        }

        public void progressChanged(ProgressEvent progressEvent) {
            if (this.progress != null) {
                this.progress.progress();
            }
        }
    }

    private final class BlockUploadProgress
    implements ProgressListener {
        private final S3ADataBlocks.DataBlock block;
        private final ProgressListener nextListener;
        private final long transferQueueTime;
        private long transferStartTime;

        private BlockUploadProgress(S3ADataBlocks.DataBlock block, ProgressListener nextListener, long transferQueueTime) {
            this.block = block;
            this.transferQueueTime = transferQueueTime;
            this.nextListener = nextListener;
        }

        public void progressChanged(ProgressEvent progressEvent) {
            ProgressEventType eventType = progressEvent.getEventType();
            long bytesTransferred = progressEvent.getBytesTransferred();
            int size = this.block.dataSize();
            switch (eventType) {
                case REQUEST_BYTE_TRANSFER_EVENT: {
                    S3ABlockOutputStream.this.statistics.bytesTransferred(bytesTransferred);
                    break;
                }
                case TRANSFER_PART_STARTED_EVENT: {
                    this.transferStartTime = S3ABlockOutputStream.this.now();
                    S3ABlockOutputStream.this.statistics.blockUploadStarted(this.transferStartTime - this.transferQueueTime, size);
                    S3ABlockOutputStream.this.incrementWriteOperations();
                    break;
                }
                case TRANSFER_PART_COMPLETED_EVENT: {
                    S3ABlockOutputStream.this.statistics.blockUploadCompleted(S3ABlockOutputStream.this.now() - this.transferStartTime, size);
                    break;
                }
                case TRANSFER_PART_FAILED_EVENT: {
                    S3ABlockOutputStream.this.statistics.blockUploadFailed(S3ABlockOutputStream.this.now() - this.transferStartTime, size);
                    LOG.warn("Transfer failure of block {}", (Object)this.block);
                    break;
                }
            }
            if (this.nextListener != null) {
                this.nextListener.progressChanged(progressEvent);
            }
        }
    }

    private class MultiPartUpload {
        private final String uploadId;
        private final List<ListenableFuture<PartETag>> partETagsFutures;

        MultiPartUpload() throws IOException {
            this.uploadId = S3ABlockOutputStream.this.writeOperationHelper.initiateMultiPartUpload();
            this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>(2);
            LOG.debug("Initiated multi-part upload for {} with id '{}'", (Object)S3ABlockOutputStream.this.writeOperationHelper, (Object)this.uploadId);
        }

        private void uploadBlockAsync(final S3ADataBlocks.DataBlock block) throws IOException {
            LOG.debug("Queueing upload of {}", (Object)block);
            int size = block.dataSize();
            final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
            final int currentPartNumber = this.partETagsFutures.size() + 1;
            final UploadPartRequest request = S3ABlockOutputStream.this.writeOperationHelper.newUploadPartRequest(this.uploadId, currentPartNumber, size, uploadData.getUploadStream(), uploadData.getFile());
            long transferQueueTime = S3ABlockOutputStream.this.now();
            BlockUploadProgress callback = new BlockUploadProgress(block, S3ABlockOutputStream.this.progressListener, transferQueueTime);
            request.setGeneralProgressListener((ProgressListener)callback);
            S3ABlockOutputStream.this.statistics.blockUploadQueued(block.dataSize());
            ListenableFuture partETagFuture = S3ABlockOutputStream.this.executorService.submit((Callable)new Callable<PartETag>(){

                @Override
                public PartETag call() throws Exception {
                    PartETag partETag;
                    LOG.debug("Uploading part {} for id '{}'", (Object)currentPartNumber, (Object)MultiPartUpload.this.uploadId);
                    try {
                        partETag = S3ABlockOutputStream.this.fs.uploadPart(request).getPartETag();
                        LOG.debug("Completed upload of {} to part {}", (Object)block, (Object)partETag.getETag());
                        LOG.debug("Stream statistics of {}", (Object)S3ABlockOutputStream.this.statistics);
                    }
                    catch (Throwable throwable) {
                        S3AUtils.closeAll(LOG, uploadData, block);
                        throw throwable;
                    }
                    S3AUtils.closeAll(LOG, uploadData, block);
                    return partETag;
                }
            });
            this.partETagsFutures.add((ListenableFuture<PartETag>)partETagFuture);
        }

        private List<PartETag> waitForAllPartUploads() throws IOException {
            LOG.debug("Waiting for {} uploads to complete", (Object)this.partETagsFutures.size());
            try {
                return (List)Futures.allAsList(this.partETagsFutures).get();
            }
            catch (InterruptedException ie) {
                LOG.warn("Interrupted partUpload", (Throwable)ie);
                Thread.currentThread().interrupt();
                return null;
            }
            catch (ExecutionException ee) {
                LOG.debug("While waiting for upload completion", (Throwable)ee);
                LOG.debug("Cancelling futures");
                for (ListenableFuture<PartETag> future : this.partETagsFutures) {
                    future.cancel(true);
                }
                this.abort();
                throw S3AUtils.extractException("Multi-part upload with id '" + this.uploadId + "' to " + S3ABlockOutputStream.this.key, S3ABlockOutputStream.this.key, ee);
            }
        }

        private CompleteMultipartUploadResult complete(List<PartETag> partETags) throws IOException {
            int retryCount = 0;
            String operation = String.format("Completing multi-part upload for key '%s', id '%s' with %s partitions ", S3ABlockOutputStream.this.key, this.uploadId, partETags.size());
            while (true) {
                try {
                    LOG.debug(operation);
                    return S3ABlockOutputStream.this.writeOperationHelper.completeMultipartUpload(this.uploadId, partETags);
                }
                catch (AmazonClientException e) {
                    AmazonClientException lastException = e;
                    S3ABlockOutputStream.this.statistics.exceptionInMultipartComplete();
                    if (this.shouldRetry(operation, lastException, retryCount++)) continue;
                    throw S3AUtils.translateException(operation, S3ABlockOutputStream.this.key, lastException);
                }
                break;
            }
        }

        public void abort() {
            int retryCount = 0;
            S3ABlockOutputStream.this.fs.incrementStatistic(Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED);
            String operation = String.format("Aborting multi-part upload for '%s', id '%s", S3ABlockOutputStream.this.writeOperationHelper, this.uploadId);
            while (true) {
                try {
                    LOG.debug(operation);
                    S3ABlockOutputStream.this.writeOperationHelper.abortMultipartUpload(this.uploadId);
                    return;
                }
                catch (AmazonClientException e) {
                    AmazonClientException lastException = e;
                    S3ABlockOutputStream.this.statistics.exceptionInMultipartAbort();
                    if (this.shouldRetry(operation, lastException, retryCount++)) continue;
                    LOG.warn("Unable to abort multipart upload, you may need to purge  uploaded parts", (Throwable)lastException);
                    return;
                }
                break;
            }
        }

        private boolean shouldRetry(String operation, AmazonClientException e, int retryCount) {
            try {
                boolean retry;
                RetryPolicy.RetryAction retryAction = S3ABlockOutputStream.this.retryPolicy.shouldRetry((Exception)((Object)e), retryCount, 0, true);
                boolean bl = retry = retryAction == RetryPolicy.RetryAction.RETRY;
                if (retry) {
                    S3ABlockOutputStream.this.fs.incrementStatistic(Statistic.IGNORED_ERRORS);
                    LOG.info("Retrying {} after exception ", (Object)operation, (Object)e);
                    Thread.sleep(retryAction.delayMillis);
                }
                return retry;
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                return false;
            }
            catch (Exception ignored) {
                return false;
            }
        }
    }
}

