package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.UncheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.cloud.pubsublite.proto.Subscription;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.class */
public class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
    private static final long MEBIBYTE = 1048576;
    private final SubscriberOptions options;
    private static final Logger LOG = LoggerFactory.getLogger(SubscribeTransform.class);
    private static final long MIN_PER_PARTITION_MEMORY = 4194304;
    private static final long MAX_PER_PARTITION_MEMORY = 104857600;
    private static final long SOFT_MEMORY_LIMIT = 536870912;
    private static final MemoryLimiter LIMITER = new MemoryLimiterImpl(MIN_PER_PARTITION_MEMORY, MAX_PER_PARTITION_MEMORY, SOFT_MEMORY_LIMIT);
    public static final PTransformOverride V1_READ_OVERRIDE = PTransformOverride.of(PTransformMatchers.classEqualTo(SubscribeTransform.class), new ReadOverrideFactory());

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform$ReadOverrideFactory.class */
    private static class ReadOverrideFactory implements PTransformOverrideFactory<PBegin, PCollection<SequencedMessage>, SubscribeTransform> {
        private ReadOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<SequencedMessage>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<SequencedMessage>, SubscribeTransform> appliedPTransform) {
            return PTransformOverrideFactory.PTransformReplacement.of(appliedPTransform.getPipeline().begin(), new SourceTransform());
        }

        public Map<PCollection<?>, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PCollection<?>> map, PCollection<SequencedMessage> pCollection) {
            return ReplacementOutputs.singleton(map, pCollection);
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PCollection<?>>) map, (PCollection<SequencedMessage>) pOutput);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform$SourceTransform.class */
    private static final class SourceTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
        private final SubscribeTransform impl;

        private SourceTransform(SubscribeTransform subscribeTransform) {
            this.impl = subscribeTransform;
        }

        public PCollection<SequencedMessage> expand(PBegin pBegin) {
            return this.impl.expandSource(pBegin);
        }
    }

    public SubscribeTransform(SubscriberOptions subscriberOptions) {
        this.options = subscriberOptions;
    }

    private void checkSubscription(SubscriptionPartition subscriptionPartition) throws ApiException {
        UncheckedApiPreconditions.checkArgument(subscriptionPartition.subscription().equals(this.options.subscriptionPath()));
    }

    private Subscriber newSubscriber(Partition partition, Offset offset, Consumer<List<SequencedMessage>> consumer) {
        try {
            return new SubscriberAssembler(this.options, partition).getSubscriberFactory(offset).newSubscriber(list -> {
                consumer.accept((List) list.stream().map(sequencedMessage -> {
                    return sequencedMessage.toProto();
                }).collect(Collectors.toList()));
            });
        } catch (Throwable th) {
            throw ExtractStatus.toCanonical(th).underlying;
        }
    }

    private MemoryBufferedSubscriber newBufferedSubscriber(SubscriptionPartition subscriptionPartition, Offset offset) throws ApiException {
        checkSubscription(subscriptionPartition);
        return new MemoryBufferedSubscriberImpl(subscriptionPartition.partition(), offset, LIMITER, consumer -> {
            return newSubscriber(subscriptionPartition.partition(), offset, consumer);
        });
    }

    private MemoryBufferedSubscriber getCachedSubscriber(SubscriptionPartition subscriptionPartition, Offset offset) {
        Supplier supplier = () -> {
            return PerServerSubscriberCache.CACHE.get(subscriptionPartition, () -> {
                return newBufferedSubscriber(subscriptionPartition, offset);
            });
        };
        while (true) {
            MemoryBufferedSubscriber memoryBufferedSubscriber = (MemoryBufferedSubscriber) supplier.get();
            Offset fetchOffset = memoryBufferedSubscriber.fetchOffset();
            if (offset.equals(fetchOffset)) {
                return memoryBufferedSubscriber;
            }
            LOG.info("Discarding subscriber due to mismatch, this should be rare. {}, start: {} fetch: {}", new Object[]{subscriptionPartition, offset, fetchOffset});
            try {
                memoryBufferedSubscriber.stopAsync().awaitTerminated();
            } catch (Exception e) {
            }
        }
    }

    private SubscriptionPartitionProcessor newPartitionProcessor(SubscriptionPartition subscriptionPartition, RestrictionTracker<OffsetByteRange, OffsetByteProgress> restrictionTracker, DoFn.OutputReceiver<SequencedMessage> outputReceiver) {
        return new SubscriptionPartitionProcessorImpl(restrictionTracker, outputReceiver, getCachedSubscriber(subscriptionPartition, Offset.of(((OffsetByteRange) restrictionTracker.currentRestriction()).getRange().getFrom())));
    }

    private TopicBacklogReader newBacklogReader(SubscriptionPartition subscriptionPartition) {
        checkSubscription(subscriptionPartition);
        return new SubscriberAssembler(this.options, subscriptionPartition.partition()).getBacklogReader();
    }

    private TrackerWithProgress newRestrictionTracker(TopicBacklogReader topicBacklogReader, OffsetByteRange offsetByteRange) {
        return new OffsetByteRangeTracker(offsetByteRange, topicBacklogReader);
    }

    private InitialOffsetReader newInitialOffsetReader(SubscriptionPartition subscriptionPartition) {
        checkSubscription(subscriptionPartition);
        return new SubscriberAssembler(this.options, subscriptionPartition.partition()).getInitialOffsetReader();
    }

    private BlockingCommitter newCommitter(SubscriptionPartition subscriptionPartition) {
        checkSubscription(subscriptionPartition);
        return new SubscriberAssembler(this.options, subscriptionPartition.partition()).newCommitter();
    }

    private TopicPath getTopicPath() {
        try {
            AdminClient create = AdminClient.create(AdminClientSettings.newBuilder().setRegion(this.options.subscriptionPath().location().extractRegion()).build());
            Throwable th = null;
            try {
                TopicPath parse = TopicPath.parse(((Subscription) create.getSubscription(this.options.subscriptionPath()).get()).getTopic());
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return parse;
            } finally {
            }
        } catch (Throwable th3) {
            throw ExtractStatus.toCanonical(th3).underlying;
        }
    }

    private PCollection<SequencedMessage> expandSdf(PBegin pBegin) {
        return pBegin.apply(new SubscriptionPartitionLoader(getTopicPath(), this.options.subscriptionPath())).apply(ParDo.of(new PerSubscriptionPartitionSdf(new ManagedFactoryImpl(this::newBacklogReader), new ManagedFactoryImpl(this::newCommitter), this::newInitialOffsetReader, this::newRestrictionTracker, this::newPartitionProcessor)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PCollection<SequencedMessage> expandSource(PBegin pBegin) {
        return pBegin.apply(Read.from(new UnboundedSourceImpl(this.options, this::newBufferedSubscriber, this::newBacklogReader)));
    }

    public PCollection<SequencedMessage> expand(PBegin pBegin) {
        return expandSdf(pBegin);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1186709046:
                if (implMethodName.equals("newCommitter")) {
                    z = 3;
                    break;
                }
                break;
            case -413196704:
                if (implMethodName.equals("newBacklogReader")) {
                    z = 4;
                    break;
                }
                break;
            case 304615130:
                if (implMethodName.equals("newInitialOffsetReader")) {
                    z = true;
                    break;
                }
                break;
            case 725106312:
                if (implMethodName.equals("newPartitionProcessor")) {
                    z = false;
                    break;
                }
                break;
            case 1391668327:
                if (implMethodName.equals("newBufferedSubscriber")) {
                    z = 2;
                    break;
                }
                break;
            case 1483996044:
                if (implMethodName.equals("newRestrictionTracker")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("newProcessor") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;Lorg/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker;Lorg/apache/beam/sdk/transforms/DoFn$OutputReceiver;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessor;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;Lorg/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker;Lorg/apache/beam/sdk/transforms/DoFn$OutputReceiver;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessor;")) {
                    SubscribeTransform subscribeTransform = (SubscribeTransform) serializedLambda.getCapturedArg(0);
                    return subscribeTransform::newPartitionProcessor;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader;")) {
                    SubscribeTransform subscribeTransform2 = (SubscribeTransform) serializedLambda.getCapturedArg(0);
                    return subscribeTransform2::newInitialOffsetReader;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl$SubscriberFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;Lcom/google/cloud/pubsublite/Offset;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;Lcom/google/cloud/pubsublite/Offset;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber;")) {
                    SubscribeTransform subscribeTransform3 = (SubscribeTransform) serializedLambda.getCapturedArg(0);
                    return subscribeTransform3::newBufferedSubscriber;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/BlockingCommitter;")) {
                    SubscribeTransform subscribeTransform4 = (SubscribeTransform) serializedLambda.getCapturedArg(0);
                    return subscribeTransform4::newCommitter;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReader;")) {
                    SubscribeTransform subscribeTransform5 = (SubscribeTransform) serializedLambda.getCapturedArg(0);
                    return subscribeTransform5::newBacklogReader;
                }
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl$BacklogReaderFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReader;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReader;")) {
                    SubscribeTransform subscribeTransform6 = (SubscribeTransform) serializedLambda.getCapturedArg(0);
                    return subscribeTransform6::newBacklogReader;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReader;Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRange;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/TrackerWithProgress;")) {
                    SubscribeTransform subscribeTransform7 = (SubscribeTransform) serializedLambda.getCapturedArg(0);
                    return subscribeTransform7::newRestrictionTracker;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
