package org.apache.pulsar.broker.service.schema;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.SchemaStorageFormat;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.class */
public class BookkeeperSchemaStorage implements SchemaStorage {
    private static final String SchemaPath = "/schemas";
    private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
    private static final byte[] LedgerPassword = "".getBytes();
    private final PulsarService pulsar;
    private final ZooKeeper zooKeeper;
    private final ZooKeeperCache localZkCache;
    private final ServiceConfiguration config;
    private BookKeeper bookKeeper;
    private final ConcurrentMap<String, CompletableFuture<LocatorEntry>> locatorEntries = new ConcurrentHashMap();
    private final ConcurrentMap<String, CompletableFuture<StoredSchema>> readSchemaOperations = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage$Functions.class */
    public interface Functions {
        static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledgerHandle, long j) {
            CompletableFuture<LedgerEntry> completableFuture = new CompletableFuture<>();
            ledgerHandle.asyncReadEntries(j, j, (i, ledgerHandle2, enumeration, obj) -> {
                if (i != 0) {
                    completableFuture.completeExceptionally(BKException.create(i));
                } else {
                    completableFuture.complete((LedgerEntry) enumeration.nextElement());
                }
            }, (Object) null);
            return completableFuture;
        }

        static CompletableFuture<SchemaStorageFormat.SchemaEntry> parseSchemaEntry(LedgerEntry ledgerEntry) {
            CompletableFuture<SchemaStorageFormat.SchemaEntry> completableFuture = new CompletableFuture<>();
            try {
                completableFuture.complete(SchemaStorageFormat.SchemaEntry.parseFrom(ledgerEntry.getEntry()));
            } catch (IOException e) {
                completableFuture.completeExceptionally(e);
            }
            return completableFuture;
        }

        static SchemaStorageFormat.SchemaEntry newSchemaEntry(List<SchemaStorageFormat.IndexEntry> list, byte[] bArr) {
            return SchemaStorageFormat.SchemaEntry.newBuilder().setSchemaData(ByteString.copyFrom(bArr)).addAllIndex(list).m254build();
        }

        static SchemaStorageFormat.PositionInfo newPositionInfo(long j, long j2) {
            return SchemaStorageFormat.PositionInfo.newBuilder().setLedgerId(j).setEntryId(j2).m214build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage$LocatorEntry.class */
    public static class LocatorEntry {
        final SchemaStorageFormat.SchemaLocator locator;
        final Integer zkZnodeVersion;

        LocatorEntry(SchemaStorageFormat.SchemaLocator schemaLocator, Integer num) {
            this.locator = schemaLocator;
            this.zkZnodeVersion = num;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage$SchemaLocatorDeserializer.class */
    public static class SchemaLocatorDeserializer implements ZooKeeperCache.Deserializer<SchemaStorageFormat.SchemaLocator> {
        SchemaLocatorDeserializer() {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public SchemaStorageFormat.SchemaLocator m127deserialize(String str, byte[] bArr) throws Exception {
            return SchemaStorageFormat.SchemaLocator.parseFrom(bArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public BookkeeperSchemaStorage(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.localZkCache = pulsarService.getLocalZkCache();
        this.zooKeeper = this.localZkCache.getZooKeeper();
        this.config = pulsarService.getConfiguration();
    }

    @VisibleForTesting
    public void init() throws KeeperException, InterruptedException {
        try {
            if (this.zooKeeper.exists(SchemaPath, false) == null) {
                this.zooKeeper.create(SchemaPath, new byte[0], Acl, CreateMode.PERSISTENT);
            }
        } catch (KeeperException.NodeExistsException unused) {
        }
    }

    @Override // org.apache.pulsar.broker.service.schema.SchemaStorage
    public void start() throws IOException {
        this.bookKeeper = this.pulsar.getBookKeeperClientFactory().create(this.pulsar.getConfiguration(), this.pulsar.getZkClient());
    }

    @Override // org.apache.pulsar.broker.service.schema.SchemaStorage
    public CompletableFuture<SchemaVersion> put(String str, byte[] bArr, byte[] bArr2) {
        return putSchemaIfAbsent(str, bArr, bArr2).thenApply((v1) -> {
            return new LongSchemaVersion(v1);
        });
    }

    @Override // org.apache.pulsar.broker.service.schema.SchemaStorage
    public CompletableFuture<StoredSchema> get(String str, SchemaVersion schemaVersion) {
        return schemaVersion == SchemaVersion.Latest ? getSchema(str) : getSchema(str, ((LongSchemaVersion) schemaVersion).getVersion());
    }

    @Override // org.apache.pulsar.broker.service.schema.SchemaStorage
    public CompletableFuture<SchemaVersion> delete(String str) {
        return deleteSchema(str).thenApply((v1) -> {
            return new LongSchemaVersion(v1);
        });
    }

    @NotNull
    private CompletableFuture<StoredSchema> getSchema(String str) {
        return this.readSchemaOperations.computeIfAbsent(str, str2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            getSchemaLocator(getSchemaPath(str)).thenCompose(optional -> {
                if (!optional.isPresent()) {
                    return CompletableFuture.completedFuture(null);
                }
                SchemaStorageFormat.SchemaLocator schemaLocator = ((LocatorEntry) optional.get()).locator;
                return readSchemaEntry(schemaLocator.getInfo().getPosition()).thenApply(schemaEntry -> {
                    return new StoredSchema(schemaEntry.getSchemaData().toByteArray(), new LongSchemaVersion(schemaLocator.getInfo().getVersion()));
                });
            }).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (storedSchema, th) -> {
                this.readSchemaOperations.remove(str, completableFuture);
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                    return null;
                }
                completableFuture.complete(storedSchema);
                return null;
            });
            return completableFuture;
        });
    }

    @Override // org.apache.pulsar.broker.service.schema.SchemaStorage
    public SchemaVersion versionFromBytes(byte[] bArr) {
        return new LongSchemaVersion(ByteBuffer.wrap(bArr).getLong());
    }

    @Override // org.apache.pulsar.broker.service.schema.SchemaStorage
    public void close() throws Exception {
        if (Objects.nonNull(this.bookKeeper)) {
            this.bookKeeper.close();
        }
    }

    @NotNull
    private CompletableFuture<StoredSchema> getSchema(String str, long j) {
        return getSchemaLocator(getSchemaPath(str)).thenCompose(optional -> {
            if (!optional.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            SchemaStorageFormat.SchemaLocator schemaLocator = ((LocatorEntry) optional.get()).locator;
            return j > schemaLocator.getInfo().getVersion() ? CompletableFuture.completedFuture(null) : findSchemaEntryByVersion(schemaLocator.getIndexList(), j).thenApply(schemaEntry -> {
                return new StoredSchema(schemaEntry.getSchemaData().toByteArray(), new LongSchemaVersion(j));
            });
        });
    }

    @NotNull
    private CompletableFuture<Long> putSchema(String str, byte[] bArr, byte[] bArr2) {
        return getOrCreateSchemaLocator(getSchemaPath(str)).thenCompose(locatorEntry -> {
            return addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(), bArr).thenCompose(positionInfo -> {
                return updateSchemaLocator(str, locatorEntry, positionInfo, bArr2);
            });
        });
    }

    @NotNull
    private CompletableFuture<Long> putSchemaIfAbsent(String str, byte[] bArr, byte[] bArr2) {
        return getOrCreateSchemaLocator(getSchemaPath(str)).thenCompose(locatorEntry -> {
            byte[] byteArray = locatorEntry.locator.getInfo().getHash().toByteArray();
            return (byteArray.length <= 0 || !Arrays.equals(byteArray, bArr2)) ? findSchemaEntryByHash(locatorEntry.locator.getIndexList(), bArr2).thenCompose(l -> {
                return Objects.isNull(l) ? addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(), bArr).thenCompose(positionInfo -> {
                    return updateSchemaLocator(str, locatorEntry, positionInfo, bArr2);
                }) : CompletableFuture.completedFuture(l);
            }) : CompletableFuture.completedFuture(Long.valueOf(locatorEntry.locator.getInfo().getVersion()));
        });
    }

    @NotNull
    private CompletableFuture<Long> deleteSchema(String str) {
        return getSchema(str).thenCompose(storedSchema -> {
            return Objects.isNull(storedSchema) ? CompletableFuture.completedFuture(null) : putSchema(str, new byte[0], new byte[0]);
        });
    }

    @NotNull
    private String getSchemaPath(String str) {
        return "/schemas/" + str;
    }

    @NotNull
    private CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToStore(List<SchemaStorageFormat.IndexEntry> list, byte[] bArr) {
        SchemaStorageFormat.SchemaEntry newSchemaEntry = Functions.newSchemaEntry(list, bArr);
        return createLedger().thenCompose(ledgerHandle -> {
            return addEntry(ledgerHandle, newSchemaEntry).thenApply(l -> {
                return Functions.newPositionInfo(ledgerHandle.getId(), l.longValue());
            });
        });
    }

    @NotNull
    private CompletableFuture<Long> updateSchemaLocator(String str, LocatorEntry locatorEntry, SchemaStorageFormat.PositionInfo positionInfo, byte[] bArr) {
        long version = locatorEntry.locator.getInfo().getVersion() + 1;
        SchemaStorageFormat.SchemaLocator schemaLocator = locatorEntry.locator;
        SchemaStorageFormat.IndexEntry m174build = SchemaStorageFormat.IndexEntry.newBuilder().setVersion(version).setPosition(positionInfo).setHash(ByteString.copyFrom(bArr)).m174build();
        return updateSchemaLocator(getSchemaPath(str), SchemaStorageFormat.SchemaLocator.newBuilder().setInfo(m174build).addAllIndex(Iterables.concat(schemaLocator.getIndexList(), Lists.newArrayList(new SchemaStorageFormat.IndexEntry[]{m174build}))).m294build(), locatorEntry.zkZnodeVersion.intValue()).thenApply(r5 -> {
            return Long.valueOf(version);
        });
    }

    @NotNull
    private CompletableFuture<SchemaStorageFormat.SchemaEntry> findSchemaEntryByVersion(List<SchemaStorageFormat.IndexEntry> list, long j) {
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        SchemaStorageFormat.IndexEntry indexEntry = list.get(0);
        if (j < indexEntry.getVersion()) {
            return readSchemaEntry(indexEntry.getPosition()).thenCompose(schemaEntry -> {
                return findSchemaEntryByVersion(schemaEntry.getIndexList(), j);
            });
        }
        for (SchemaStorageFormat.IndexEntry indexEntry2 : list) {
            if (indexEntry2.getVersion() == j) {
                return readSchemaEntry(indexEntry2.getPosition());
            }
            if (indexEntry2.getVersion() > j) {
                break;
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    @NotNull
    private CompletableFuture<Long> findSchemaEntryByHash(List<SchemaStorageFormat.IndexEntry> list, byte[] bArr) {
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        for (SchemaStorageFormat.IndexEntry indexEntry : list) {
            if (Arrays.equals(indexEntry.getHash().toByteArray(), bArr)) {
                return CompletableFuture.completedFuture(Long.valueOf(indexEntry.getVersion()));
            }
        }
        return readSchemaEntry(list.get(0).getPosition()).thenCompose(schemaEntry -> {
            return findSchemaEntryByHash(schemaEntry.getIndexList(), bArr);
        });
    }

    @NotNull
    private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(SchemaStorageFormat.PositionInfo positionInfo) {
        return openLedger(Long.valueOf(positionInfo.getLedgerId())).thenCompose(ledgerHandle -> {
            return Functions.getLedgerEntry(ledgerHandle, positionInfo.getEntryId()).thenCompose(ledgerEntry -> {
                return closeLedger(ledgerHandle).thenApply(r3 -> {
                    return ledgerEntry;
                });
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Functions::parseSchemaEntry);
    }

    @NotNull
    private CompletableFuture<Void> updateSchemaLocator(String str, SchemaStorageFormat.SchemaLocator schemaLocator, int i) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.zooKeeper.setData(str, schemaLocator.toByteArray(), i, (i2, str2, obj, stat) -> {
            KeeperException.Code code = KeeperException.Code.get(i2);
            if (code != KeeperException.Code.OK) {
                completableFuture.completeExceptionally(KeeperException.create(code));
            } else {
                completableFuture.complete(null);
            }
        }, null);
        return completableFuture;
    }

    @NotNull
    private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String str) {
        return this.localZkCache.getEntryAsync(str, new SchemaLocatorDeserializer()).thenApply(optional -> {
            return optional.map(entry -> {
                return new LocatorEntry((SchemaStorageFormat.SchemaLocator) entry.getKey(), Integer.valueOf(((Stat) entry.getValue()).getVersion()));
            });
        });
    }

    @NotNull
    private CompletableFuture<LocatorEntry> getOrCreateSchemaLocator(String str) {
        return this.locatorEntries.computeIfAbsent(str, str2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            getSchemaLocator(str).thenCompose(optional -> {
                if (optional.isPresent()) {
                    return CompletableFuture.completedFuture((LocatorEntry) optional.get());
                }
                SchemaStorageFormat.SchemaLocator m294build = SchemaStorageFormat.SchemaLocator.newBuilder().setInfo(SchemaStorageFormat.IndexEntry.newBuilder().setVersion(-1L).setHash(ByteString.EMPTY).setPosition(SchemaStorageFormat.PositionInfo.newBuilder().setEntryId(-1L).setLedgerId(-1L))).m294build();
                CompletableFuture completableFuture2 = new CompletableFuture();
                ZkUtils.asyncCreateFullPathOptimistic(this.zooKeeper, str, m294build.toByteArray(), Acl, CreateMode.PERSISTENT, (i, str2, obj, str3) -> {
                    KeeperException.Code code = KeeperException.Code.get(i);
                    if (code != KeeperException.Code.OK) {
                        completableFuture2.completeExceptionally(KeeperException.create(code));
                    } else {
                        completableFuture2.complete(new LocatorEntry(m294build, -1));
                    }
                }, (Object) null);
                return completableFuture2;
            }).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (locatorEntry, th) -> {
                this.locatorEntries.remove(str, completableFuture);
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                    return null;
                }
                completableFuture.complete(locatorEntry);
                return null;
            });
            return completableFuture;
        });
    }

    @NotNull
    private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorageFormat.SchemaEntry schemaEntry) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        ledgerHandle.asyncAddEntry(schemaEntry.toByteArray(), (i, ledgerHandle2, j, obj) -> {
            if (i != 0) {
                completableFuture.completeExceptionally(BKException.create(i));
            } else {
                completableFuture.complete(Long.valueOf(j));
            }
        }, (Object) null);
        return completableFuture;
    }

    @NotNull
    private CompletableFuture<LedgerHandle> createLedger() {
        CompletableFuture<LedgerHandle> completableFuture = new CompletableFuture<>();
        this.bookKeeper.asyncCreateLedger(this.config.getManagedLedgerDefaultEnsembleSize(), this.config.getManagedLedgerDefaultWriteQuorum(), this.config.getManagedLedgerDefaultAckQuorum(), BookKeeper.DigestType.fromApiDigestType(this.config.getManagedLedgerDigestType()), LedgerPassword, (i, ledgerHandle, obj) -> {
            if (i != 0) {
                completableFuture.completeExceptionally(BKException.create(i));
            } else {
                completableFuture.complete(ledgerHandle);
            }
        }, (Object) null, Collections.emptyMap());
        return completableFuture;
    }

    @NotNull
    private CompletableFuture<LedgerHandle> openLedger(Long l) {
        CompletableFuture<LedgerHandle> completableFuture = new CompletableFuture<>();
        this.bookKeeper.asyncOpenLedger(l.longValue(), BookKeeper.DigestType.fromApiDigestType(this.config.getManagedLedgerDigestType()), LedgerPassword, (i, ledgerHandle, obj) -> {
            if (i != 0) {
                completableFuture.completeExceptionally(BKException.create(i));
            } else {
                completableFuture.complete(ledgerHandle);
            }
        }, (Object) null);
        return completableFuture;
    }

    @NotNull
    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ledgerHandle.asyncClose((i, ledgerHandle2, obj) -> {
            if (i != 0) {
                completableFuture.completeExceptionally(BKException.create(i));
            } else {
                completableFuture.complete(null);
            }
        }, (Object) null);
        return completableFuture;
    }
}
