package org.apache.beam.sdk.io;

import com.google.api.client.util.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/Write.class */
public class Write {
    private static final Logger LOG = LoggerFactory.getLogger(Write.class);

    /* loaded from: input_file:org/apache/beam/sdk/io/Write$Bound.class */
    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
        private final Sink<T> sink;
        private int numShards;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/Write$Bound$ApplyShardingKey.class */
        public static class ApplyShardingKey<T> implements SerializableFunction<T, Integer> {
            private final int numShards;
            private int shardNumber = -1;

            ApplyShardingKey(int i) {
                this.numShards = i;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.transforms.SerializableFunction
            public Integer apply(T t) {
                if (this.shardNumber == -1) {
                    this.shardNumber = ThreadLocalRandom.current().nextInt(this.numShards);
                } else {
                    this.shardNumber = (this.shardNumber + 1) % this.numShards;
                }
                return Integer.valueOf(this.shardNumber);
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.beam.sdk.transforms.SerializableFunction
            public /* bridge */ /* synthetic */ Integer apply(Object obj) {
                return apply((ApplyShardingKey<T>) obj);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/Write$Bound$WriteBundles.class */
        public class WriteBundles<WriteT> extends DoFn<T, WriteT> {
            private Sink.Writer<T, WriteT> writer = null;
            private final PCollectionView<Sink.WriteOperation<T, WriteT>> writeOperationView;

            WriteBundles(PCollectionView<Sink.WriteOperation<T, WriteT>> pCollectionView) {
                this.writeOperationView = pCollectionView;
            }

            @Override // org.apache.beam.sdk.transforms.DoFn
            public void processElement(DoFn<T, WriteT>.ProcessContext processContext) throws Exception {
                if (this.writer == null) {
                    Sink.WriteOperation writeOperation = (Sink.WriteOperation) processContext.sideInput(this.writeOperationView);
                    Write.LOG.info("Opening writer for write operation {}", writeOperation);
                    this.writer = writeOperation.createWriter(processContext.getPipelineOptions());
                    this.writer.open(UUID.randomUUID().toString());
                    Write.LOG.debug("Done opening writer {} for operation {}", this.writer, this.writeOperationView);
                }
                try {
                    this.writer.write(processContext.element());
                } catch (Exception e) {
                    try {
                        this.writer.close();
                    } catch (Exception e2) {
                        if (e2 instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        e.addSuppressed(e2);
                    }
                    throw e;
                }
            }

            @Override // org.apache.beam.sdk.transforms.DoFn
            public void finishBundle(DoFn<T, WriteT>.Context context) throws Exception {
                if (this.writer != null) {
                    context.output(this.writer.close());
                    this.writer = null;
                }
            }

            @Override // org.apache.beam.sdk.transforms.DoFn, org.apache.beam.sdk.transforms.display.HasDisplayData
            public void populateDisplayData(DisplayData.Builder builder) {
                Bound.this.populateDisplayData(builder);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/Write$Bound$WriteShardedBundles.class */
        public class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
            private final PCollectionView<Sink.WriteOperation<T, WriteT>> writeOperationView;

            WriteShardedBundles(PCollectionView<Sink.WriteOperation<T, WriteT>> pCollectionView) {
                this.writeOperationView = pCollectionView;
            }

            @Override // org.apache.beam.sdk.transforms.DoFn
            public void processElement(DoFn<KV<Integer, Iterable<T>>, WriteT>.ProcessContext processContext) throws Exception {
                Sink.WriteOperation writeOperation = (Sink.WriteOperation) processContext.sideInput(this.writeOperationView);
                Write.LOG.info("Opening writer for write operation {}", writeOperation);
                Sink.Writer createWriter = writeOperation.createWriter(processContext.getPipelineOptions());
                createWriter.open(UUID.randomUUID().toString());
                Write.LOG.debug("Done opening writer {} for operation {}", createWriter, this.writeOperationView);
                try {
                    Iterator<T> it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                    while (it.hasNext()) {
                        createWriter.write(it.next());
                    }
                    processContext.output(createWriter.close());
                } catch (Exception e) {
                    try {
                        createWriter.close();
                    } catch (Exception e2) {
                        if (e2 instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        e.addSuppressed(e2);
                    }
                    throw e;
                }
            }

            @Override // org.apache.beam.sdk.transforms.DoFn, org.apache.beam.sdk.transforms.display.HasDisplayData
            public void populateDisplayData(DisplayData.Builder builder) {
                Bound.this.populateDisplayData(builder);
            }
        }

        private Bound(Sink<T> sink, int i) {
            this.sink = sink;
            this.numShards = i;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PDone apply(PCollection<T> pCollection) {
            PipelineOptions options = pCollection.getPipeline().getOptions();
            this.sink.validate(options);
            return createWrite(pCollection, this.sink.createWriteOperation(options));
        }

        @Override // org.apache.beam.sdk.transforms.PTransform, org.apache.beam.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("sink", this.sink.getClass()).withLabel("Write Sink")).include(this.sink).addIfNotDefault(DisplayData.item("numShards", Integer.valueOf(getNumShards())).withLabel("Fixed Number of Shards"), 0);
        }

        public int getNumShards() {
            return this.numShards;
        }

        public Sink<T> getSink() {
            return this.sink;
        }

        public Bound<T> withNumShards(int i) {
            return new Bound<>(this.sink, Math.max(i, 0));
        }

        private <WriteT> PDone createWrite(PCollection<T> pCollection, Sink.WriteOperation<T, WriteT> writeOperation) {
            Pipeline pipeline = pCollection.getPipeline();
            SerializableCoder of = SerializableCoder.of(writeOperation.getClass());
            PCollection<T> coder = ((PCollection) ((PCollection) pipeline.apply(Create.of(writeOperation).withCoder(of))).apply("Initialize", ParDo.of(new DoFn<Sink.WriteOperation<T, WriteT>, Sink.WriteOperation<T, WriteT>>() { // from class: org.apache.beam.sdk.io.Write.Bound.1
                @Override // org.apache.beam.sdk.transforms.DoFn
                public void processElement(DoFn<Sink.WriteOperation<T, WriteT>, Sink.WriteOperation<T, WriteT>>.ProcessContext processContext) throws Exception {
                    Sink.WriteOperation<T, WriteT> element = processContext.element();
                    Write.LOG.info("Initializing write operation {}", element);
                    element.initialize(processContext.getPipelineOptions());
                    Write.LOG.debug("Done initializing write operation {}", element);
                    processContext.output(element);
                }
            }))).setCoder((Coder) of);
            PCollectionView<?> pCollectionView = (PCollectionView) coder.apply(View.asSingleton());
            PCollection pCollection2 = (PCollection) pCollection.apply(Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).discardingFiredPanes());
            PCollection pCollection3 = getNumShards() <= 0 ? (PCollection) pCollection2.apply("WriteBundles", ParDo.of(new WriteBundles(pCollectionView)).withSideInputs(pCollectionView)) : (PCollection) ((PCollection) ((PCollection) pCollection2.apply("ApplyShardLabel", WithKeys.of((SerializableFunction) new ApplyShardingKey(getNumShards())))).apply("GroupIntoShards", GroupByKey.create())).apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles(pCollectionView)).withSideInputs(pCollectionView));
            pCollection3.setCoder((Coder) writeOperation.getWriterResultCoder());
            final PCollectionView<?> pCollectionView2 = (PCollectionView) pCollection3.apply(View.asIterable());
            coder.apply("Finalize", ParDo.of(new DoFn<Sink.WriteOperation<T, WriteT>, Integer>() { // from class: org.apache.beam.sdk.io.Write.Bound.2
                @Override // org.apache.beam.sdk.transforms.DoFn
                public void processElement(DoFn<Sink.WriteOperation<T, WriteT>, Integer>.ProcessContext processContext) throws Exception {
                    Sink.WriteOperation<T, WriteT> element = processContext.element();
                    Write.LOG.info("Finalizing write operation {}.", element);
                    ArrayList newArrayList = Lists.newArrayList((Iterable) processContext.sideInput(pCollectionView2));
                    Write.LOG.debug("Side input initialized to finalize write operation {}.", element);
                    int max = Math.max(1, Bound.this.getNumShards());
                    int size = max - newArrayList.size();
                    if (size > 0) {
                        Write.LOG.info("Creating {} empty output shards in addition to {} written for a total of {}.", new Object[]{Integer.valueOf(size), Integer.valueOf(newArrayList.size()), Integer.valueOf(max)});
                        for (int i = 0; i < size; i++) {
                            Sink.Writer<T, WriteT> createWriter = element.createWriter(processContext.getPipelineOptions());
                            createWriter.open(UUID.randomUUID().toString());
                            newArrayList.add(createWriter.close());
                        }
                        Write.LOG.debug("Done creating extra shards.");
                    }
                    element.finalize(newArrayList, processContext.getPipelineOptions());
                    Write.LOG.debug("Done finalizing write operation {}", element);
                }
            }).withSideInputs(pCollectionView2));
            return PDone.in(pCollection.getPipeline());
        }
    }

    public static <T> Bound<T> to(Sink<T> sink) {
        Preconditions.checkNotNull(sink, "sink");
        return new Bound<>(sink, 0);
    }
}
