package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.class */
public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, PDone> {
    static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
    static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
    private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2);

    @VisibleForTesting
    static final Coder<PubsubClient.OutgoingMessage> CODER = new OutgoingMessageCoder();
    private final PubsubClient.PubsubClientFactory pubsubFactory;
    private final ValueProvider<PubsubClient.TopicPath> topic;

    @Nullable
    private final String timestampAttribute;

    @Nullable
    private final String idAttribute;
    private final int numShards;
    private final int publishBatchSize;
    private final int publishBatchBytes;
    private final Duration maxLatency;
    private final RecordIdMethod recordIdMethod;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink$OutgoingMessageCoder.class */
    private static class OutgoingMessageCoder extends AtomicCoder<PubsubClient.OutgoingMessage> {
        private static final NullableCoder<String> RECORD_ID_CODER = NullableCoder.of(StringUtf8Coder.of());
        private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER = NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));

        private OutgoingMessageCoder() {
        }

        public void encode(PubsubClient.OutgoingMessage outgoingMessage, OutputStream outputStream) throws CoderException, IOException {
            ByteArrayCoder.of().encode(outgoingMessage.elementBytes, outputStream);
            ATTRIBUTES_CODER.encode(outgoingMessage.attributes, outputStream);
            BigEndianLongCoder.of().encode(Long.valueOf(outgoingMessage.timestampMsSinceEpoch), outputStream);
            RECORD_ID_CODER.encode(outgoingMessage.recordId, outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public PubsubClient.OutgoingMessage m69decode(InputStream inputStream) throws CoderException, IOException {
            return new PubsubClient.OutgoingMessage(ByteArrayCoder.of().decode(inputStream), (Map) ATTRIBUTES_CODER.decode(inputStream), BigEndianLongCoder.of().decode(inputStream).longValue(), (String) RECORD_ID_CODER.decode(inputStream));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink$RecordIdMethod.class */
    public enum RecordIdMethod {
        NONE,
        RANDOM,
        DETERMINISTIC
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink$ShardFn.class */
    public static class ShardFn extends DoFn<PubsubMessage, KV<Integer, PubsubClient.OutgoingMessage>> {
        private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements");
        private final int numShards;
        private final RecordIdMethod recordIdMethod;

        ShardFn(int i, RecordIdMethod recordIdMethod) {
            this.numShards = i;
            this.recordIdMethod = recordIdMethod;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<PubsubMessage, KV<Integer, PubsubClient.OutgoingMessage>>.ProcessContext processContext) throws Exception {
            this.elementCounter.inc();
            PubsubMessage pubsubMessage = (PubsubMessage) processContext.element();
            byte[] payload = pubsubMessage.getPayload();
            Map<String, String> attributeMap = pubsubMessage.getAttributeMap();
            long millis = processContext.timestamp().getMillis();
            String str = null;
            switch (this.recordIdMethod) {
                case DETERMINISTIC:
                    str = Hashing.murmur3_128().hashBytes(payload).toString();
                    break;
                case RANDOM:
                    str = UUID.randomUUID().toString();
                    break;
            }
            processContext.output(KV.of(Integer.valueOf(ThreadLocalRandom.current().nextInt(this.numShards)), new PubsubClient.OutgoingMessage(payload, attributeMap, millis, str)));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("numShards", Integer.valueOf(this.numShards)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink$WriterFn.class */
    public static class WriterFn extends DoFn<KV<Integer, Iterable<PubsubClient.OutgoingMessage>>, Void> {
        private final PubsubClient.PubsubClientFactory pubsubFactory;
        private final ValueProvider<PubsubClient.TopicPath> topic;
        private final String timestampAttribute;
        private final String idAttribute;
        private final int publishBatchSize;
        private final int publishBatchBytes;

        @Nullable
        private transient PubsubClient pubsubClient;
        private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
        private final Counter elementCounter = SinkMetrics.elementsWritten();
        private final Counter byteCounter = SinkMetrics.bytesWritten();

        WriterFn(PubsubClient.PubsubClientFactory pubsubClientFactory, ValueProvider<PubsubClient.TopicPath> valueProvider, String str, String str2, int i, int i2) {
            this.pubsubFactory = pubsubClientFactory;
            this.topic = valueProvider;
            this.timestampAttribute = str;
            this.idAttribute = str2;
            this.publishBatchSize = i;
            this.publishBatchBytes = i2;
        }

        private void publishBatch(List<PubsubClient.OutgoingMessage> list, int i) throws IOException {
            int publish = this.pubsubClient.publish((PubsubClient.TopicPath) this.topic.get(), list);
            Preconditions.checkState(publish == list.size(), "Attempted to publish %s messages but %s were successful", list.size(), publish);
            this.batchCounter.inc();
            this.elementCounter.inc(list.size());
            this.byteCounter.inc(i);
        }

        @DoFn.StartBundle
        public void startBundle(DoFn<KV<Integer, Iterable<PubsubClient.OutgoingMessage>>, Void>.StartBundleContext startBundleContext) throws Exception {
            Preconditions.checkState(this.pubsubClient == null, "startBundle invoked without prior finishBundle");
            this.pubsubClient = this.pubsubFactory.newClient(this.timestampAttribute, this.idAttribute, (PubsubOptions) startBundleContext.getPipelineOptions().as(PubsubOptions.class));
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<Integer, Iterable<PubsubClient.OutgoingMessage>>, Void>.ProcessContext processContext) throws Exception {
            ArrayList arrayList = new ArrayList(this.publishBatchSize);
            int i = 0;
            for (PubsubClient.OutgoingMessage outgoingMessage : (Iterable) ((KV) processContext.element()).getValue()) {
                if (!arrayList.isEmpty() && i + outgoingMessage.elementBytes.length > this.publishBatchBytes) {
                    publishBatch(arrayList, i);
                    arrayList.clear();
                    i = 0;
                }
                arrayList.add(outgoingMessage);
                i += outgoingMessage.elementBytes.length;
            }
            if (arrayList.isEmpty()) {
                return;
            }
            publishBatch(arrayList, i);
        }

        @DoFn.FinishBundle
        public void finishBundle() throws Exception {
            this.pubsubClient.close();
            this.pubsubClient = null;
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("topic", this.topic));
            builder.add(DisplayData.item("transport", this.pubsubFactory.getKind()));
            builder.addIfNotNull(DisplayData.item("timestampAttribute", this.timestampAttribute));
            builder.addIfNotNull(DisplayData.item("idAttribute", this.idAttribute));
        }
    }

    @VisibleForTesting
    PubsubUnboundedSink(PubsubClient.PubsubClientFactory pubsubClientFactory, ValueProvider<PubsubClient.TopicPath> valueProvider, String str, String str2, int i, int i2, int i3, Duration duration, RecordIdMethod recordIdMethod) {
        this.pubsubFactory = pubsubClientFactory;
        this.topic = valueProvider;
        this.timestampAttribute = str;
        this.idAttribute = str2;
        this.numShards = i;
        this.publishBatchSize = i2;
        this.publishBatchBytes = i3;
        this.maxLatency = duration;
        this.recordIdMethod = str2 == null ? RecordIdMethod.NONE : recordIdMethod;
    }

    public PubsubUnboundedSink(PubsubClient.PubsubClientFactory pubsubClientFactory, ValueProvider<PubsubClient.TopicPath> valueProvider, String str, String str2, int i) {
        this(pubsubClientFactory, valueProvider, str, str2, i, DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, RecordIdMethod.RANDOM);
    }

    public PubsubUnboundedSink(PubsubClient.PubsubClientFactory pubsubClientFactory, ValueProvider<PubsubClient.TopicPath> valueProvider, String str, String str2, int i, int i2, int i3) {
        this(pubsubClientFactory, valueProvider, str, str2, i, i2, i3, DEFAULT_MAX_LATENCY, RecordIdMethod.RANDOM);
    }

    public PubsubClient.TopicPath getTopic() {
        return (PubsubClient.TopicPath) this.topic.get();
    }

    public ValueProvider<PubsubClient.TopicPath> getTopicProvider() {
        return this.topic;
    }

    @Nullable
    public String getTimestampAttribute() {
        return this.timestampAttribute;
    }

    @Nullable
    public String getIdAttribute() {
        return this.idAttribute;
    }

    public PDone expand(PCollection<PubsubMessage> pCollection) {
        pCollection.apply("PubsubUnboundedSink.Window", Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterFirst.of(new Trigger.OnceTrigger[]{AfterPane.elementCountAtLeast(this.publishBatchSize), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(this.maxLatency)}))).discardingFiredPanes()).apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(this.numShards, this.recordIdMethod))).setCoder(KvCoder.of(VarIntCoder.of(), CODER)).apply(GroupByKey.create()).apply("PubsubUnboundedSink.Writer", ParDo.of(new WriterFn(this.pubsubFactory, this.topic, this.timestampAttribute, this.idAttribute, this.publishBatchSize, this.publishBatchBytes)));
        return PDone.in(pCollection.getPipeline());
    }
}
