/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a;

import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.fs.s3base.shaded.com.google.common.base.Preconditions;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DirectBufferPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class S3ADataBlocks {
    private static final Logger LOG = LoggerFactory.getLogger(S3ADataBlocks.class);

    private S3ADataBlocks() {
    }

    static void validateWriteArgs(byte[] b, int off, int len) throws IOException {
        Preconditions.checkNotNull(b);
        if (off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0) {
            throw new IndexOutOfBoundsException("write (b[" + b.length + "], " + off + ", " + len + ')');
        }
    }

    static BlockFactory createFactory(S3AFileSystem owner, String name) {
        switch (name) {
            case "array": {
                return new ArrayBlockFactory(owner);
            }
            case "disk": {
                return new DiskBlockFactory(owner);
            }
            case "bytebuffer": {
                return new ByteBufferBlockFactory(owner);
            }
        }
        throw new IllegalArgumentException("Unsupported block buffer \"" + name + '\"');
    }

    static class DiskBlock
    extends DataBlock {
        private int bytesWritten;
        private final File bufferFile;
        private final int limit;
        private BufferedOutputStream out;
        private final AtomicBoolean closed = new AtomicBoolean(false);

        DiskBlock(File bufferFile, int limit, long index, S3AInstrumentation.OutputStreamStatistics statistics) throws FileNotFoundException {
            super(index, statistics);
            this.limit = limit;
            this.bufferFile = bufferFile;
            this.blockAllocated();
            this.out = new BufferedOutputStream(new FileOutputStream(bufferFile));
        }

        @Override
        int dataSize() {
            return this.bytesWritten;
        }

        @Override
        boolean hasCapacity(long bytes) {
            return (long)this.dataSize() + bytes <= (long)this.limit;
        }

        @Override
        int remainingCapacity() {
            return this.limit - this.bytesWritten;
        }

        @Override
        int write(byte[] b, int offset, int len) throws IOException {
            super.write(b, offset, len);
            int written = Math.min(this.remainingCapacity(), len);
            this.out.write(b, offset, written);
            this.bytesWritten += written;
            return written;
        }

        @Override
        BlockUploadData startUpload() throws IOException {
            super.startUpload();
            try {
                this.out.flush();
            }
            finally {
                this.out.close();
                this.out = null;
            }
            return new BlockUploadData(this.bufferFile);
        }

        @Override
        protected void innerClose() throws IOException {
            DataBlock.DestState state = this.getState();
            LOG.debug("Closing {}", (Object)this);
            switch (state) {
                case Writing: {
                    if (!this.bufferFile.exists()) break;
                    LOG.debug("Block[{}]: Deleting buffer file as upload did not start", (Object)this.index);
                    this.closeBlock();
                    break;
                }
                case Upload: {
                    LOG.debug("Block[{}]: Buffer file {} exists \u2014close upload stream", (Object)this.index, (Object)this.bufferFile);
                    break;
                }
                case Closed: {
                    this.closeBlock();
                    break;
                }
            }
        }

        @Override
        void flush() throws IOException {
            super.flush();
            this.out.flush();
        }

        public String toString() {
            String sb = "FileBlock{index=" + this.index + ", destFile=" + this.bufferFile + ", state=" + (Object)((Object)this.getState()) + ", dataSize=" + this.dataSize() + ", limit=" + this.limit + '}';
            return sb;
        }

        void closeBlock() {
            LOG.debug("block[{}]: closeBlock()", (Object)this.index);
            if (!this.closed.getAndSet(true)) {
                this.blockReleased();
                if (!this.bufferFile.delete() && this.bufferFile.exists()) {
                    LOG.warn("delete({}) returned false", (Object)this.bufferFile.getAbsoluteFile());
                }
            } else {
                LOG.debug("block[{}]: skipping re-entrant closeBlock()", (Object)this.index);
            }
        }
    }

    static class DiskBlockFactory
    extends BlockFactory {
        DiskBlockFactory(S3AFileSystem owner) {
            super(owner);
        }

        @Override
        DataBlock create(long index, int limit, S3AInstrumentation.OutputStreamStatistics statistics) throws IOException {
            File destFile = this.getOwner().createTmpFileForWrite(String.format("s3ablock-%04d-", index), limit, this.getOwner().getConf());
            return new DiskBlock(destFile, limit, index, statistics);
        }
    }

    static class ByteBufferBlockFactory
    extends BlockFactory {
        private final DirectBufferPool bufferPool = new DirectBufferPool();
        private final AtomicInteger buffersOutstanding = new AtomicInteger(0);

        ByteBufferBlockFactory(S3AFileSystem owner) {
            super(owner);
        }

        @Override
        ByteBufferBlock create(long index, int limit, S3AInstrumentation.OutputStreamStatistics statistics) throws IOException {
            return new ByteBufferBlock(index, limit, statistics);
        }

        private ByteBuffer requestBuffer(int limit) {
            LOG.debug("Requesting buffer of size {}", (Object)limit);
            this.buffersOutstanding.incrementAndGet();
            return this.bufferPool.getBuffer(limit);
        }

        private void releaseBuffer(ByteBuffer buffer) {
            LOG.debug("Releasing buffer");
            this.bufferPool.returnBuffer(buffer);
            this.buffersOutstanding.decrementAndGet();
        }

        public int getOutstandingBufferCount() {
            return this.buffersOutstanding.get();
        }

        public String toString() {
            return "ByteBufferBlockFactory{buffersOutstanding=" + this.buffersOutstanding + '}';
        }

        class ByteBufferBlock
        extends DataBlock {
            private ByteBuffer blockBuffer;
            private final int bufferSize;
            private Integer dataSize;

            ByteBufferBlock(long index, int bufferSize, S3AInstrumentation.OutputStreamStatistics statistics) {
                super(index, statistics);
                this.bufferSize = bufferSize;
                this.blockBuffer = ByteBufferBlockFactory.this.requestBuffer(bufferSize);
                this.blockAllocated();
            }

            @Override
            int dataSize() {
                return this.dataSize != null ? this.dataSize.intValue() : this.bufferCapacityUsed();
            }

            @Override
            BlockUploadData startUpload() throws IOException {
                super.startUpload();
                this.dataSize = this.bufferCapacityUsed();
                this.blockBuffer.limit(this.blockBuffer.position());
                this.blockBuffer.position(0);
                return new BlockUploadData(new ByteBufferInputStream(this.dataSize, this.blockBuffer));
            }

            @Override
            public boolean hasCapacity(long bytes) {
                return bytes <= (long)this.remainingCapacity();
            }

            @Override
            public int remainingCapacity() {
                return this.blockBuffer != null ? this.blockBuffer.remaining() : 0;
            }

            private int bufferCapacityUsed() {
                return this.blockBuffer.capacity() - this.blockBuffer.remaining();
            }

            @Override
            int write(byte[] b, int offset, int len) throws IOException {
                super.write(b, offset, len);
                int written = Math.min(this.remainingCapacity(), len);
                this.blockBuffer.put(b, offset, written);
                return written;
            }

            @Override
            protected void innerClose() {
                if (this.blockBuffer != null) {
                    this.blockReleased();
                    ByteBufferBlockFactory.this.releaseBuffer(this.blockBuffer);
                    this.blockBuffer = null;
                }
            }

            public String toString() {
                return "ByteBufferBlock{index=" + this.index + ", state=" + (Object)((Object)this.getState()) + ", dataSize=" + this.dataSize() + ", limit=" + this.bufferSize + ", remainingCapacity=" + this.remainingCapacity() + '}';
            }

            class ByteBufferInputStream
            extends InputStream {
                private final int size;
                private ByteBuffer byteBuffer;

                ByteBufferInputStream(int size, ByteBuffer byteBuffer) {
                    LOG.debug("Creating ByteBufferInputStream of size {}", (Object)size);
                    this.size = size;
                    this.byteBuffer = byteBuffer;
                }

                @Override
                public synchronized void close() {
                    LOG.debug("ByteBufferInputStream.close() for {}", (Object)ByteBufferBlock.super.toString());
                    this.byteBuffer = null;
                }

                private void verifyOpen() throws IOException {
                    if (this.byteBuffer == null) {
                        throw new IOException("Stream is closed!");
                    }
                }

                @Override
                public synchronized int read() throws IOException {
                    if (this.available() > 0) {
                        return this.byteBuffer.get() & 0xFF;
                    }
                    return -1;
                }

                @Override
                public synchronized long skip(long offset) throws IOException {
                    this.verifyOpen();
                    long newPos = (long)this.position() + offset;
                    if (newPos < 0L) {
                        throw new EOFException("Cannot seek to a negative offset");
                    }
                    if (newPos > (long)this.size) {
                        throw new EOFException("Attempted to seek or read past the end of the file");
                    }
                    this.byteBuffer.position((int)newPos);
                    return newPos;
                }

                @Override
                public synchronized int available() {
                    Preconditions.checkState(this.byteBuffer != null, "Stream is closed!");
                    return this.byteBuffer.remaining();
                }

                public synchronized int position() {
                    return this.byteBuffer.position();
                }

                public synchronized boolean hasRemaining() {
                    return this.byteBuffer.hasRemaining();
                }

                @Override
                public synchronized void mark(int readlimit) {
                    LOG.debug("mark at {}", (Object)this.position());
                    this.byteBuffer.mark();
                }

                @Override
                public synchronized void reset() throws IOException {
                    LOG.debug("reset");
                    this.byteBuffer.reset();
                }

                @Override
                public boolean markSupported() {
                    return true;
                }

                @Override
                public synchronized int read(byte[] b, int offset, int length) throws IOException {
                    Preconditions.checkArgument(length >= 0, "length is negative");
                    Preconditions.checkArgument(b != null, "Null buffer");
                    if (b.length - offset < length) {
                        throw new IndexOutOfBoundsException("Requested more bytes than destination buffer size: request length =" + length + ", with offset =" + offset + "; buffer capacity =" + (b.length - offset));
                    }
                    this.verifyOpen();
                    if (!this.hasRemaining()) {
                        return -1;
                    }
                    int toRead = Math.min(length, this.available());
                    this.byteBuffer.get(b, offset, toRead);
                    return toRead;
                }

                public String toString() {
                    StringBuilder sb = new StringBuilder("ByteBufferInputStream{");
                    sb.append("size=").append(this.size);
                    ByteBuffer buf = this.byteBuffer;
                    if (buf != null) {
                        sb.append(", available=").append(buf.remaining());
                    }
                    sb.append(", ").append(ByteBufferBlock.super.toString());
                    sb.append('}');
                    return sb.toString();
                }
            }
        }
    }

    static class ByteArrayBlock
    extends DataBlock {
        private S3AByteArrayOutputStream buffer;
        private final int limit;
        private Integer dataSize;

        ByteArrayBlock(long index, int limit, S3AInstrumentation.OutputStreamStatistics statistics) {
            super(index, statistics);
            this.limit = limit;
            this.buffer = new S3AByteArrayOutputStream(limit);
            this.blockAllocated();
        }

        @Override
        int dataSize() {
            return this.dataSize != null ? this.dataSize.intValue() : this.buffer.size();
        }

        @Override
        BlockUploadData startUpload() throws IOException {
            super.startUpload();
            this.dataSize = this.buffer.size();
            ByteArrayInputStream bufferData = this.buffer.getInputStream();
            this.buffer = null;
            return new BlockUploadData(bufferData);
        }

        @Override
        boolean hasCapacity(long bytes) {
            return (long)this.dataSize() + bytes <= (long)this.limit;
        }

        @Override
        int remainingCapacity() {
            return this.limit - this.dataSize();
        }

        @Override
        int write(byte[] b, int offset, int len) throws IOException {
            super.write(b, offset, len);
            int written = Math.min(this.remainingCapacity(), len);
            this.buffer.write(b, offset, written);
            return written;
        }

        @Override
        protected void innerClose() {
            this.buffer = null;
            this.blockReleased();
        }

        public String toString() {
            return "ByteArrayBlock{index=" + this.index + ", state=" + (Object)((Object)this.getState()) + ", limit=" + this.limit + ", dataSize=" + this.dataSize + '}';
        }
    }

    static class S3AByteArrayOutputStream
    extends ByteArrayOutputStream {
        S3AByteArrayOutputStream(int size) {
            super(size);
        }

        ByteArrayInputStream getInputStream() {
            ByteArrayInputStream bin = new ByteArrayInputStream(this.buf, 0, this.count);
            this.reset();
            this.buf = null;
            return bin;
        }
    }

    static class ArrayBlockFactory
    extends BlockFactory {
        ArrayBlockFactory(S3AFileSystem owner) {
            super(owner);
        }

        @Override
        DataBlock create(long index, int limit, S3AInstrumentation.OutputStreamStatistics statistics) throws IOException {
            return new ByteArrayBlock(0L, limit, statistics);
        }
    }

    static abstract class DataBlock
    implements Closeable {
        private volatile DestState state = DestState.Writing;
        protected final long index;
        protected final S3AInstrumentation.OutputStreamStatistics statistics;

        protected DataBlock(long index, S3AInstrumentation.OutputStreamStatistics statistics) {
            this.index = index;
            this.statistics = statistics;
        }

        protected final synchronized void enterState(DestState current, DestState next) throws IllegalStateException {
            this.verifyState(current);
            LOG.debug("{}: entering state {}", (Object)this, (Object)next);
            this.state = next;
        }

        protected final void verifyState(DestState expected) throws IllegalStateException {
            if (expected != null && this.state != expected) {
                throw new IllegalStateException("Expected stream state " + (Object)((Object)expected) + " -but actual state is " + (Object)((Object)this.state) + " in " + this);
            }
        }

        final DestState getState() {
            return this.state;
        }

        abstract int dataSize();

        abstract boolean hasCapacity(long var1);

        boolean hasData() {
            return this.dataSize() > 0;
        }

        abstract int remainingCapacity();

        int write(byte[] buffer, int offset, int length) throws IOException {
            this.verifyState(DestState.Writing);
            Preconditions.checkArgument(buffer != null, "Null buffer");
            Preconditions.checkArgument(length >= 0, "length is negative");
            Preconditions.checkArgument(offset >= 0, "offset is negative");
            Preconditions.checkArgument(buffer.length - offset >= length, "buffer shorter than amount of data to write");
            return 0;
        }

        void flush() throws IOException {
            this.verifyState(DestState.Writing);
        }

        BlockUploadData startUpload() throws IOException {
            LOG.debug("Start datablock[{}] upload", (Object)this.index);
            this.enterState(DestState.Writing, DestState.Upload);
            return null;
        }

        protected synchronized boolean enterClosedState() {
            if (!this.state.equals((Object)DestState.Closed)) {
                this.enterState(null, DestState.Closed);
                return true;
            }
            return false;
        }

        @Override
        public void close() throws IOException {
            if (this.enterClosedState()) {
                LOG.debug("Closed {}", (Object)this);
                this.innerClose();
            }
        }

        protected void innerClose() throws IOException {
        }

        protected void blockAllocated() {
            if (this.statistics != null) {
                this.statistics.blockAllocated();
            }
        }

        protected void blockReleased() {
            if (this.statistics != null) {
                this.statistics.blockReleased();
            }
        }

        static enum DestState {
            Writing,
            Upload,
            Closed;

        }
    }

    static abstract class BlockFactory
    implements Closeable {
        private final S3AFileSystem owner;

        protected BlockFactory(S3AFileSystem owner) {
            this.owner = owner;
        }

        abstract DataBlock create(long var1, int var3, S3AInstrumentation.OutputStreamStatistics var4) throws IOException;

        @Override
        public void close() throws IOException {
        }

        protected S3AFileSystem getOwner() {
            return this.owner;
        }
    }

    static final class BlockUploadData
    implements Closeable {
        private final File file;
        private final InputStream uploadStream;

        BlockUploadData(File file) {
            Preconditions.checkArgument(file.exists(), "No file: " + file);
            this.file = file;
            this.uploadStream = null;
        }

        BlockUploadData(InputStream uploadStream) {
            Preconditions.checkNotNull(uploadStream, "rawUploadStream");
            this.uploadStream = uploadStream;
            this.file = null;
        }

        boolean hasFile() {
            return this.file != null;
        }

        File getFile() {
            return this.file;
        }

        InputStream getUploadStream() {
            return this.uploadStream;
        }

        @Override
        public void close() throws IOException {
            S3AUtils.closeAll(LOG, this.uploadStream);
        }
    }
}

