package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;

@VisibleForTesting
@SystemDoFnInternal
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.class */
class StreamingWriteFn<ErrorT> extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
    private final BigQueryServices bqServices;
    private final InsertRetryPolicy retryPolicy;
    private final TupleTag<ErrorT> failedOutputTag;
    private final ErrorContainer<ErrorT> errorContainer;
    private final boolean skipInvalidRows;
    private final boolean ignoreUnknownValues;
    private transient Map<String, List<ValueInSingleWindow<TableRow>>> tableRows;
    private transient Map<String, List<String>> uniqueIdsForTableRows;
    private Counter byteCounter = SinkMetrics.bytesWritten();

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteFn(BigQueryServices bigQueryServices, InsertRetryPolicy insertRetryPolicy, TupleTag<ErrorT> tupleTag, ErrorContainer<ErrorT> errorContainer, boolean z, boolean z2) {
        this.bqServices = bigQueryServices;
        this.retryPolicy = insertRetryPolicy;
        this.failedOutputTag = tupleTag;
        this.errorContainer = errorContainer;
        this.skipInvalidRows = z;
        this.ignoreUnknownValues = z2;
    }

    @DoFn.StartBundle
    public void startBundle() {
        this.tableRows = new HashMap();
        this.uniqueIdsForTableRows = new HashMap();
    }

    @DoFn.ProcessElement
    public void processElement(DoFn<KV<ShardedKey<String>, TableRowInfo>, Void>.ProcessContext processContext, BoundedWindow boundedWindow) {
        String str = (String) ((ShardedKey) ((KV) processContext.element()).getKey()).getKey();
        List orCreateMapListValue = BigQueryHelpers.getOrCreateMapListValue(this.tableRows, str);
        List orCreateMapListValue2 = BigQueryHelpers.getOrCreateMapListValue(this.uniqueIdsForTableRows, str);
        orCreateMapListValue.add(ValueInSingleWindow.of(((TableRowInfo) ((KV) processContext.element()).getValue()).tableRow, processContext.timestamp(), boundedWindow, processContext.pane()));
        orCreateMapListValue2.add(((TableRowInfo) ((KV) processContext.element()).getValue()).uniqueId);
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<KV<ShardedKey<String>, TableRowInfo>, Void>.FinishBundleContext finishBundleContext) throws Exception {
        ArrayList newArrayList = Lists.newArrayList();
        BigQueryOptions bigQueryOptions = (BigQueryOptions) finishBundleContext.getPipelineOptions().as(BigQueryOptions.class);
        for (Map.Entry<String, List<ValueInSingleWindow<TableRow>>> entry : this.tableRows.entrySet()) {
            flushRows(BigQueryHelpers.parseTableSpec(entry.getKey()), entry.getValue(), this.uniqueIdsForTableRows.get(entry.getKey()), bigQueryOptions, newArrayList);
        }
        this.tableRows.clear();
        this.uniqueIdsForTableRows.clear();
        for (ValueInSingleWindow<ErrorT> valueInSingleWindow : newArrayList) {
            finishBundleContext.output(this.failedOutputTag, valueInSingleWindow.getValue(), valueInSingleWindow.getTimestamp(), valueInSingleWindow.getWindow());
        }
    }

    private void flushRows(TableReference tableReference, List<ValueInSingleWindow<TableRow>> list, List<String> list2, BigQueryOptions bigQueryOptions, List<ValueInSingleWindow<ErrorT>> list3) throws InterruptedException {
        if (list.isEmpty()) {
            return;
        }
        try {
            this.byteCounter.inc(this.bqServices.getDatasetService(bigQueryOptions).insertAll(tableReference, list, list2, this.retryPolicy, list3, this.errorContainer, this.skipInvalidRows, this.ignoreUnknownValues));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
