/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.offload.impl;

import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.google.common.base.Strings;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
import org.apache.pulsar.broker.offload.impl.BlockAwareSegmentInputStreamImpl;
import org.apache.pulsar.broker.offload.impl.S3BackedReadHandleImpl;
import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3ManagedLedgerOffloader
implements LedgerOffloader {
    private static final Logger log = LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
    public static final String DRIVER_NAME = "S3";
    static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion";
    static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
    static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
    static final String CURRENT_VERSION = String.valueOf(1);
    private final VersionCheck VERSION_CHECK = (key, metadata) -> {
        String version = (String)metadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY);
        if (version == null || !version.equals(CURRENT_VERSION)) {
            throw new IOException(String.format("Invalid object version %s for %s, expect %s", version, key, CURRENT_VERSION));
        }
    };
    private final OrderedScheduler scheduler;
    private final AmazonS3 s3client;
    private final String bucket;
    private int maxBlockSize;
    private final int readBufferSize;

    public static S3ManagedLedgerOffloader create(ServiceConfiguration conf, OrderedScheduler scheduler) throws PulsarServerException {
        String region = conf.getS3ManagedLedgerOffloadRegion();
        String bucket = conf.getS3ManagedLedgerOffloadBucket();
        String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
        int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
        int readBufferSize = conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes();
        if (Strings.isNullOrEmpty((String)region) && Strings.isNullOrEmpty((String)endpoint)) {
            throw new PulsarServerException("Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set if s3 offload enabled");
        }
        if (Strings.isNullOrEmpty((String)bucket)) {
            throw new PulsarServerException("s3ManagedLedgerOffloadBucket cannot be empty if s3 offload enabled");
        }
        if (maxBlockSize < 0x500000) {
            throw new PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB");
        }
        AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
        if (!Strings.isNullOrEmpty((String)endpoint)) {
            builder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region));
            builder.setPathStyleAccessEnabled(Boolean.valueOf(true));
        } else {
            builder.setRegion(region);
        }
        return new S3ManagedLedgerOffloader((AmazonS3)builder.build(), bucket, scheduler, maxBlockSize, readBufferSize);
    }

    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, OrderedScheduler scheduler, int maxBlockSize, int readBufferSize) {
        this.s3client = s3client;
        this.bucket = bucket;
        this.scheduler = scheduler;
        this.maxBlockSize = maxBlockSize;
        this.readBufferSize = readBufferSize;
    }

    static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
        return String.format("%s-ledger-%d", uuid.toString(), ledgerId);
    }

    static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
        return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
    }

    public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.scheduler.chooseThread(readHandle.getId()).submit(() -> this.lambda$1(readHandle, promise, uuid));
        return promise;
    }

    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) {
        CompletableFuture<ReadHandle> promise = new CompletableFuture<ReadHandle>();
        String key = S3ManagedLedgerOffloader.dataBlockOffloadKey(ledgerId, uid);
        String indexKey = S3ManagedLedgerOffloader.indexBlockOffloadKey(ledgerId, uid);
        this.scheduler.chooseThread(ledgerId).submit(() -> {
            try {
                promise.complete(S3BackedReadHandleImpl.open((ScheduledExecutorService)this.scheduler.chooseThread(ledgerId), this.s3client, this.bucket, key, indexKey, this.VERSION_CHECK, ledgerId, this.readBufferSize));
            }
            catch (Throwable t) {
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    private static void addVersionInfo(ObjectMetadata metadata) {
        metadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION);
        metadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY, PulsarBrokerVersionStringUtils.getNormalizedVersionString());
        metadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha());
    }

    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.scheduler.chooseThread(ledgerId).submit(() -> {
            try {
                this.s3client.deleteObjects(new DeleteObjectsRequest(this.bucket).withKeys(new String[]{S3ManagedLedgerOffloader.dataBlockOffloadKey(ledgerId, uid), S3ManagedLedgerOffloader.indexBlockOffloadKey(ledgerId, uid)}));
                promise.complete(null);
            }
            catch (Throwable t) {
                log.error("Failed delete s3 Object ", t);
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    /*
     * Unable to fully structure code
     */
    private /* synthetic */ void lambda$1(ReadHandle var1_1, CompletableFuture var2_2, UUID var3_3) {
        if (var1_1.getLength() == 0L || !var1_1.isClosed() || var1_1.getLastAddConfirmed() < 0L) {
            var2_2.completeExceptionally(new IllegalArgumentException("An empty or open ledger should never be offloaded"));
            return;
        }
        indexBuilder = OffloadIndexBlockBuilder.create().withLedgerMetadata(var1_1.getLedgerMetadata()).withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
        dataBlockKey = S3ManagedLedgerOffloader.dataBlockOffloadKey(var1_1.getId(), var3_3);
        indexBlockKey = S3ManagedLedgerOffloader.indexBlockOffloadKey(var1_1.getId(), var3_3);
        dataMetadata = new ObjectMetadata();
        S3ManagedLedgerOffloader.addVersionInfo(dataMetadata);
        dataBlockReq = new InitiateMultipartUploadRequest(this.bucket, dataBlockKey, dataMetadata);
        dataBlockRes = null;
        try {
            dataBlockRes = this.s3client.initiateMultipartUpload(dataBlockReq);
        }
        catch (Throwable t) {
            var2_2.completeExceptionally(t);
            return;
        }
        dataObjectLength = 0L;
        try {
            startEntry = 0L;
            partId = 1;
            entryBytesWritten = 0L;
            etags = new LinkedList<PartETag>();
            while (startEntry <= var1_1.getLastAddConfirmed()) {
                blockSize = BlockAwareSegmentInputStreamImpl.calculateBlockSize(this.maxBlockSize, var1_1, startEntry, entryBytesWritten);
                var19_23 = null;
                var20_25 = null;
                try {
                    blockStream = new BlockAwareSegmentInputStreamImpl(var1_1, startEntry, blockSize);
                    try {
                        block41: {
                            uploadRes = this.s3client.uploadPart(new UploadPartRequest().withBucketName(this.bucket).withKey(dataBlockKey).withUploadId(dataBlockRes.getUploadId()).withInputStream((InputStream)blockStream).withPartSize((long)blockSize).withPartNumber(partId));
                            etags.add(uploadRes.getPartETag());
                            indexBuilder.addBlock(startEntry, partId, blockSize);
                            if (blockStream.getEndEntryId() == -1L) break block41;
                            startEntry = blockStream.getEndEntryId() + 1L;
                            ** GOTO lbl49
                        }
                        if (blockStream == null) break;
                    }
                    catch (Throwable var19_24) {
                        if (blockStream != null) {
                            blockStream.close();
                        }
                        throw var19_24;
                    }
                    blockStream.close();
                    break;
lbl49:
                    // 1 sources

                    entryBytesWritten += (long)blockStream.getBlockEntryBytesCount();
                    ++partId;
                    ** if (blockStream == null) goto lbl-1000
lbl-1000:
                    // 1 sources

                    {
                        blockStream.close();
                    }
lbl-1000:
                    // 2 sources

                    {
                    }
                }
                catch (Throwable var20_26) {
                    if (var19_23 == null) {
                        var19_23 = var20_26;
                    } else if (var19_23 != var20_26) {
                        var19_23.addSuppressed(var20_26);
                    }
                    throw var19_23;
                }
                dataObjectLength += (long)blockSize;
            }
            this.s3client.completeMultipartUpload(new CompleteMultipartUploadRequest().withBucketName(this.bucket).withKey(dataBlockKey).withUploadId(dataBlockRes.getUploadId()).withPartETags(etags));
        }
        catch (Throwable t) {
            try {
                this.s3client.abortMultipartUpload(new AbortMultipartUploadRequest(this.bucket, dataBlockKey, dataBlockRes.getUploadId()));
            }
            catch (Throwable throwable) {
                S3ManagedLedgerOffloader.log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", new Object[]{this.bucket, dataBlockKey, dataBlockRes.getUploadId(), throwable});
            }
            var2_2.completeExceptionally(t);
            return;
        }
        try {
            t = null;
            throwable = null;
            try {
                index = indexBuilder.withDataObjectLength(dataObjectLength).build();
                try {
                    indexStream = index.toStream();
                    try {
                        metadata = new ObjectMetadata();
                        metadata.setContentLength(indexStream.getStreamSize());
                        S3ManagedLedgerOffloader.addVersionInfo(metadata);
                        this.s3client.putObject(new PutObjectRequest(this.bucket, indexBlockKey, (InputStream)indexStream, metadata));
                        var2_2.complete(null);
                    }
                    finally {
                        if (indexStream != null) {
                            indexStream.close();
                        }
                    }
                    ** if (index == null) goto lbl-1000
                }
                catch (Throwable throwable) {
                    if (t == null) {
                        t = throwable;
                    } else if (t != throwable) {
                        t.addSuppressed(throwable);
                    }
                    if (index != null) {
                        index.close();
                    }
                    throw t;
                }
lbl-1000:
                // 1 sources

                {
                    index.close();
                }
lbl-1000:
                // 2 sources

                {
                }
            }
            catch (Throwable throwable) {
                if (t == null) {
                    t = throwable;
                } else if (t != throwable) {
                    t.addSuppressed(throwable);
                }
                throw t;
            }
        }
        catch (Throwable t) {
            try {
                this.s3client.deleteObject(this.bucket, dataBlockKey);
            }
            catch (Throwable throwable) {
                S3ManagedLedgerOffloader.log.error("Failed deleteObject in bucket - {} with key - {}.", new Object[]{this.bucket, dataBlockKey, throwable});
            }
            var2_2.completeExceptionally(t);
            return;
        }
    }

    public static interface VersionCheck {
        public void check(String var1, ObjectMetadata var2) throws IOException;
    }
}

