package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.CustomCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.NullableCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.hash.Hashing;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterFirst;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.PubsubClient;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.joda.time.Duration;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.class */
public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
    private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
    private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2);

    @VisibleForTesting
    static final Coder<PubsubClient.OutgoingMessage> CODER = new OutgoingMessageCoder();
    private final PubsubClient.PubsubClientFactory pubsubFactory;
    private final PubsubClient.TopicPath topic;
    private final Coder<T> elementCoder;

    @Nullable
    private final String timestampLabel;

    @Nullable
    private final String idLabel;
    private final int numShards;
    private final int publishBatchSize;
    private final int publishBatchBytes;
    private final Duration maxLatency;
    private final RecordIdMethod recordIdMethod;

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink$OutgoingMessageCoder.class */
    private static class OutgoingMessageCoder extends CustomCoder<PubsubClient.OutgoingMessage> {
        private static final NullableCoder<String> RECORD_ID_CODER = NullableCoder.of(StringUtf8Coder.of());

        private OutgoingMessageCoder() {
        }

        @Override // com.google.cloud.dataflow.sdk.coders.Coder
        public void encode(PubsubClient.OutgoingMessage outgoingMessage, OutputStream outputStream, Coder.Context context) throws CoderException, IOException {
            ByteArrayCoder.of().encode(outgoingMessage.elementBytes, outputStream, Coder.Context.NESTED);
            BigEndianLongCoder.of().encode(Long.valueOf(outgoingMessage.timestampMsSinceEpoch), outputStream, Coder.Context.NESTED);
            RECORD_ID_CODER.encode(outgoingMessage.recordId, outputStream, Coder.Context.NESTED);
        }

        @Override // com.google.cloud.dataflow.sdk.coders.Coder
        public PubsubClient.OutgoingMessage decode(InputStream inputStream, Coder.Context context) throws CoderException, IOException {
            return new PubsubClient.OutgoingMessage(ByteArrayCoder.of().decode(inputStream, Coder.Context.NESTED), BigEndianLongCoder.of().decode(inputStream, Coder.Context.NESTED).longValue(), RECORD_ID_CODER.decode(inputStream, Coder.Context.NESTED));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink$RecordIdMethod.class */
    public enum RecordIdMethod {
        NONE,
        RANDOM,
        DETERMINISTIC
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink$ShardFn.class */
    public static class ShardFn<T> extends DoFn<T, KV<Integer, PubsubClient.OutgoingMessage>> {
        private final Aggregator<Long, Long> elementCounter = createAggregator(PropertyNames.ELEMENTS, new Sum.SumLongFn());
        private final Coder<T> elementCoder;
        private final int numShards;
        private final RecordIdMethod recordIdMethod;

        ShardFn(Coder<T> coder, int i, RecordIdMethod recordIdMethod) {
            this.elementCoder = coder;
            this.numShards = i;
            this.recordIdMethod = recordIdMethod;
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void processElement(DoFn<T, KV<Integer, PubsubClient.OutgoingMessage>>.ProcessContext processContext) throws Exception {
            this.elementCounter.addValue(1L);
            byte[] encodeToByteArray = CoderUtils.encodeToByteArray(this.elementCoder, processContext.element());
            long millis = processContext.timestamp().getMillis();
            String str = null;
            switch (this.recordIdMethod) {
                case DETERMINISTIC:
                    str = Hashing.murmur3_128().hashBytes(encodeToByteArray).toString();
                    break;
                case RANDOM:
                    str = UUID.randomUUID().toString();
                    break;
            }
            processContext.output(KV.of(Integer.valueOf(ThreadLocalRandom.current().nextInt(this.numShards)), new PubsubClient.OutgoingMessage(encodeToByteArray, millis, str)));
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn, com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("numShards", Integer.valueOf(this.numShards)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink$WriterFn.class */
    public static class WriterFn extends DoFn<KV<Integer, Iterable<PubsubClient.OutgoingMessage>>, Void> {
        private final PubsubClient.PubsubClientFactory pubsubFactory;
        private final PubsubClient.TopicPath topic;
        private final String timestampLabel;
        private final String idLabel;
        private final int publishBatchSize;
        private final int publishBatchBytes;

        @Nullable
        private transient PubsubClient pubsubClient;
        private final Aggregator<Long, Long> batchCounter = createAggregator("batches", new Sum.SumLongFn());
        private final Aggregator<Long, Long> elementCounter = createAggregator(PropertyNames.ELEMENTS, new Sum.SumLongFn());
        private final Aggregator<Long, Long> byteCounter = createAggregator("bytes", new Sum.SumLongFn());

        WriterFn(PubsubClient.PubsubClientFactory pubsubClientFactory, PubsubClient.TopicPath topicPath, String str, String str2, int i, int i2) {
            this.pubsubFactory = pubsubClientFactory;
            this.topic = topicPath;
            this.timestampLabel = str;
            this.idLabel = str2;
            this.publishBatchSize = i;
            this.publishBatchBytes = i2;
        }

        private void publishBatch(List<PubsubClient.OutgoingMessage> list, int i) throws IOException {
            int publish = this.pubsubClient.publish(this.topic, list);
            Preconditions.checkState(publish == list.size(), "Attempted to publish %s messages but %s were successful", Integer.valueOf(list.size()), Integer.valueOf(publish));
            this.batchCounter.addValue(1L);
            this.elementCounter.addValue(Long.valueOf(list.size()));
            this.byteCounter.addValue(Long.valueOf(i));
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void startBundle(DoFn<KV<Integer, Iterable<PubsubClient.OutgoingMessage>>, Void>.Context context) throws Exception {
            Preconditions.checkState(this.pubsubClient == null, "startBundle invoked without prior finishBundle");
            this.pubsubClient = this.pubsubFactory.newClient(this.timestampLabel, this.idLabel, (DataflowPipelineOptions) context.getPipelineOptions().as(DataflowPipelineOptions.class));
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void processElement(DoFn<KV<Integer, Iterable<PubsubClient.OutgoingMessage>>, Void>.ProcessContext processContext) throws Exception {
            ArrayList arrayList = new ArrayList(this.publishBatchSize);
            int i = 0;
            for (PubsubClient.OutgoingMessage outgoingMessage : processContext.element().getValue()) {
                if (!arrayList.isEmpty() && i + outgoingMessage.elementBytes.length > this.publishBatchBytes) {
                    publishBatch(arrayList, i);
                    arrayList.clear();
                    i = 0;
                }
                arrayList.add(outgoingMessage);
                i += outgoingMessage.elementBytes.length;
            }
            if (arrayList.isEmpty()) {
                return;
            }
            publishBatch(arrayList, i);
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void finishBundle(DoFn<KV<Integer, Iterable<PubsubClient.OutgoingMessage>>, Void>.Context context) throws Exception {
            this.pubsubClient.close();
            this.pubsubClient = null;
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn, com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("topic", this.topic.getPath()));
            builder.add(DisplayData.item("transport", this.pubsubFactory.getKind()));
            builder.addIfNotNull(DisplayData.item("timestampLabel", this.timestampLabel));
            builder.addIfNotNull(DisplayData.item("idLabel", this.idLabel));
        }
    }

    @VisibleForTesting
    PubsubUnboundedSink(PubsubClient.PubsubClientFactory pubsubClientFactory, PubsubClient.TopicPath topicPath, Coder<T> coder, String str, String str2, int i, int i2, int i3, Duration duration, RecordIdMethod recordIdMethod) {
        this.pubsubFactory = pubsubClientFactory;
        this.topic = topicPath;
        this.elementCoder = coder;
        this.timestampLabel = str;
        this.idLabel = str2;
        this.numShards = i;
        this.publishBatchSize = i2;
        this.publishBatchBytes = i3;
        this.maxLatency = duration;
        this.recordIdMethod = str2 == null ? RecordIdMethod.NONE : recordIdMethod;
    }

    public PubsubUnboundedSink(PubsubClient.PubsubClientFactory pubsubClientFactory, PubsubClient.TopicPath topicPath, Coder<T> coder, String str, String str2, int i) {
        this(pubsubClientFactory, topicPath, coder, str, str2, i, 1000, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, RecordIdMethod.RANDOM);
    }

    public PubsubClient.TopicPath getTopic() {
        return this.topic;
    }

    @Nullable
    public String getTimestampLabel() {
        return this.timestampLabel;
    }

    @Nullable
    public String getIdLabel() {
        return this.idLabel;
    }

    public Coder<T> getElementCoder() {
        return this.elementCoder;
    }

    @Override // com.google.cloud.dataflow.sdk.transforms.PTransform
    public PDone apply(PCollection<T> pCollection) {
        ((PCollection) ((PCollection) ((PCollection) pCollection.apply(Window.named("PubsubUnboundedSink.Window").into(new GlobalWindows()).triggering(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(this.publishBatchSize), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(this.maxLatency)))).discardingFiredPanes())).apply(ParDo.named("PubsubUnboundedSink.Shard").of(new ShardFn(this.elementCoder, this.numShards, this.recordIdMethod)))).setCoder((Coder) KvCoder.of(VarIntCoder.of(), CODER)).apply(GroupByKey.create())).apply(ParDo.named("PubsubUnboundedSink.Writer").of(new WriterFn(this.pubsubFactory, this.topic, this.timestampLabel, this.idLabel, this.publishBatchSize, this.publishBatchBytes)));
        return PDone.in(pCollection.getPipeline());
    }
}
