package org.apache.beam.sdk.io.elasticsearch;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.beam.repackaged.beam_sdks_java_io_elasticsearch.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_sdks_java_io_elasticsearch.com.google.common.base.Preconditions;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_ConnectionConfiguration;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_Read;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.class */
public class ElasticsearchIO {
    private static final ObjectMapper mapper = new ObjectMapper();

    /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$BoundedElasticsearchReader.class */
    private static class BoundedElasticsearchReader extends BoundedSource.BoundedReader<String> {
        private final BoundedElasticsearchSource source;
        private RestClient restClient;
        private String current;
        private String scrollId;
        private ListIterator<String> batchIterator;

        private BoundedElasticsearchReader(BoundedElasticsearchSource boundedElasticsearchSource) {
            this.source = boundedElasticsearchSource;
        }

        public boolean start() throws IOException {
            this.restClient = this.source.spec.getConnectionConfiguration().createClient();
            String query = this.source.spec.getQuery();
            if (query == null) {
                query = "{\"query\": { \"match_all\": {} }}";
            }
            if (this.source.backendVersion == 5 && this.source.numSlices != null && this.source.numSlices.intValue() > 1) {
                query = query.replaceFirst("\\{", "{" + String.format("\"slice\": {\"id\": %s,\"max\": %s}", this.source.sliceId, this.source.numSlices) + ",");
            }
            String format = String.format("/%s/%s/_search", this.source.spec.getConnectionConfiguration().getIndex(), this.source.spec.getConnectionConfiguration().getType());
            HashMap hashMap = new HashMap();
            hashMap.put("scroll", this.source.spec.getScrollKeepalive());
            if (this.source.backendVersion == 2) {
                hashMap.put("size", String.valueOf(this.source.spec.getBatchSize()));
                if (this.source.shardPreference != null) {
                    hashMap.put("preference", "_shards:" + this.source.shardPreference);
                }
            }
            JsonNode parseResponse = ElasticsearchIO.parseResponse(this.restClient.performRequest("GET", format, hashMap, new NStringEntity(query, ContentType.APPLICATION_JSON), new Header[0]));
            updateScrollId(parseResponse);
            return readNextBatchAndReturnFirstDocument(parseResponse);
        }

        private void updateScrollId(JsonNode jsonNode) {
            this.scrollId = jsonNode.path("_scroll_id").asText();
        }

        public boolean advance() throws IOException {
            if (this.batchIterator.hasNext()) {
                this.current = this.batchIterator.next();
                return true;
            }
            JsonNode parseResponse = ElasticsearchIO.parseResponse(this.restClient.performRequest("GET", "/_search/scroll", Collections.emptyMap(), new NStringEntity(String.format("{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}", this.source.spec.getScrollKeepalive(), this.scrollId), ContentType.APPLICATION_JSON), new Header[0]));
            updateScrollId(parseResponse);
            return readNextBatchAndReturnFirstDocument(parseResponse);
        }

        private boolean readNextBatchAndReturnFirstDocument(JsonNode jsonNode) {
            JsonNode path = jsonNode.path("hits").path("hits");
            if (path.size() == 0) {
                this.current = null;
                this.batchIterator = null;
                return false;
            }
            ArrayList arrayList = new ArrayList();
            Iterator it = path.iterator();
            while (it.hasNext()) {
                arrayList.add(((JsonNode) it.next()).path("_source").toString());
            }
            this.batchIterator = arrayList.listIterator();
            this.current = this.batchIterator.next();
            return true;
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public String m245getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public void close() throws IOException {
            try {
                this.restClient.performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), new NStringEntity(String.format("{\"scroll_id\" : [\"%s\"]}", this.scrollId), ContentType.APPLICATION_JSON), new Header[0]);
            } finally {
                if (this.restClient != null) {
                    this.restClient.close();
                }
            }
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
        public BoundedSource<String> m244getCurrentSource() {
            return this.source;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$BoundedElasticsearchSource.class */
    public static class BoundedElasticsearchSource extends BoundedSource<String> {
        private int backendVersion;
        private final Read spec;

        @Nullable
        private final String shardPreference;

        @Nullable
        private final Integer numSlices;

        @Nullable
        private final Integer sliceId;

        private BoundedElasticsearchSource(Read read, @Nullable String str, @Nullable Integer num, @Nullable Integer num2, int i) {
            this.backendVersion = i;
            this.spec = read;
            this.shardPreference = str;
            this.numSlices = num;
            this.sliceId = num2;
        }

        @VisibleForTesting
        BoundedElasticsearchSource(Read read, @Nullable String str, @Nullable Integer num, @Nullable Integer num2) {
            this.spec = read;
            this.shardPreference = str;
            this.numSlices = num;
            this.sliceId = num2;
        }

        public List<? extends BoundedSource<String>> split(long j, PipelineOptions pipelineOptions) throws Exception {
            ConnectionConfiguration connectionConfiguration = this.spec.getConnectionConfiguration();
            this.backendVersion = ElasticsearchIO.getBackendVersion(connectionConfiguration);
            ArrayList arrayList = new ArrayList();
            if (this.backendVersion == 2) {
                Iterator fields = getStats(connectionConfiguration, true).path("indices").path(connectionConfiguration.getIndex()).path("shards").fields();
                while (fields.hasNext()) {
                    arrayList.add(new BoundedElasticsearchSource(this.spec, (String) ((Map.Entry) fields.next()).getKey(), null, null, this.backendVersion));
                }
                Preconditions.checkArgument(!arrayList.isEmpty(), "No shard found");
            } else if (this.backendVersion == 5) {
                int ceil = (int) Math.ceil(((float) estimateIndexSize(connectionConfiguration)) / ((float) j));
                if (ceil > 1024) {
                    ceil = 1024;
                }
                for (int i = 0; i < ceil; i++) {
                    arrayList.add(new BoundedElasticsearchSource(this.spec, null, Integer.valueOf(ceil), Integer.valueOf(i), this.backendVersion));
                }
            }
            return arrayList;
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws IOException {
            return estimateIndexSize(this.spec.getConnectionConfiguration());
        }

        @VisibleForTesting
        static long estimateIndexSize(ConnectionConfiguration connectionConfiguration) throws IOException {
            return getStats(connectionConfiguration, false).path("indices").path(connectionConfiguration.getIndex()).path("primaries").path("store").path("size_in_bytes").asLong();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.spec.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("shard", this.shardPreference));
            builder.addIfNotNull(DisplayData.item("numSlices", this.numSlices));
            builder.addIfNotNull(DisplayData.item("sliceId", this.sliceId));
        }

        public BoundedSource.BoundedReader<String> createReader(PipelineOptions pipelineOptions) {
            return new BoundedElasticsearchReader(this);
        }

        public void validate() {
            this.spec.validate(null);
        }

        public Coder<String> getOutputCoder() {
            return StringUtf8Coder.of();
        }

        private static JsonNode getStats(ConnectionConfiguration connectionConfiguration, boolean z) throws IOException {
            HashMap hashMap = new HashMap();
            if (z) {
                hashMap.put("level", "shards");
            }
            String format = String.format("/%s/_stats", connectionConfiguration.getIndex());
            RestClient createClient = connectionConfiguration.createClient();
            Throwable th = null;
            try {
                JsonNode parseResponse = ElasticsearchIO.parseResponse(createClient.performRequest("GET", format, hashMap, new Header[0]));
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createClient.close();
                    }
                }
                return parseResponse;
            } catch (Throwable th3) {
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createClient.close();
                    }
                }
                throw th3;
            }
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$ConnectionConfiguration.class */
    public static abstract class ConnectionConfiguration implements Serializable {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$ConnectionConfiguration$Builder.class */
        public static abstract class Builder {
            abstract Builder setAddresses(List<String> list);

            abstract Builder setUsername(String str);

            abstract Builder setPassword(String str);

            abstract Builder setKeystorePath(String str);

            abstract Builder setKeystorePassword(String str);

            abstract Builder setIndex(String str);

            abstract Builder setType(String str);

            abstract ConnectionConfiguration build();
        }

        public abstract List<String> getAddresses();

        @Nullable
        public abstract String getUsername();

        @Nullable
        public abstract String getPassword();

        @Nullable
        public abstract String getKeystorePath();

        @Nullable
        public abstract String getKeystorePassword();

        public abstract String getIndex();

        public abstract String getType();

        abstract Builder builder();

        public static ConnectionConfiguration create(String[] strArr, String str, String str2) {
            Preconditions.checkArgument(strArr != null, "addresses can not be null");
            Preconditions.checkArgument(strArr.length > 0, "addresses can not be empty");
            Preconditions.checkArgument(str != null, "index can not be null");
            Preconditions.checkArgument(str2 != null, "type can not be null");
            return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder().setAddresses(Arrays.asList(strArr)).setIndex(str).setType(str2).build();
        }

        public ConnectionConfiguration withUsername(String str) {
            Preconditions.checkArgument(str != null, "username can not be null");
            Preconditions.checkArgument(!str.isEmpty(), "username can not be empty");
            return builder().setUsername(str).build();
        }

        public ConnectionConfiguration withPassword(String str) {
            Preconditions.checkArgument(str != null, "password can not be null");
            Preconditions.checkArgument(!str.isEmpty(), "password can not be empty");
            return builder().setPassword(str).build();
        }

        public ConnectionConfiguration withKeystorePath(String str) {
            Preconditions.checkArgument(str != null, "keystorePath can not be null");
            Preconditions.checkArgument(!str.isEmpty(), "keystorePath can not be empty");
            return builder().setKeystorePath(str).build();
        }

        public ConnectionConfiguration withKeystorePassword(String str) {
            Preconditions.checkArgument(str != null, "keystorePassword can not be null");
            return builder().setKeystorePassword(str).build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item("address", getAddresses().toString()));
            builder.add(DisplayData.item("index", getIndex()));
            builder.add(DisplayData.item("type", getType()));
            builder.addIfNotNull(DisplayData.item("username", getUsername()));
            builder.addIfNotNull(DisplayData.item("keystore.path", getKeystorePath()));
        }

        /* JADX WARN: Finally extract failed */
        @VisibleForTesting
        RestClient createClient() throws IOException {
            HttpHost[] httpHostArr = new HttpHost[getAddresses().size()];
            int i = 0;
            Iterator<String> it = getAddresses().iterator();
            while (it.hasNext()) {
                URL url = new URL(it.next());
                httpHostArr[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
                i++;
            }
            RestClientBuilder builder = RestClient.builder(httpHostArr);
            if (getUsername() != null) {
                BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
                basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(getUsername(), getPassword()));
                builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                    return httpAsyncClientBuilder.setDefaultCredentialsProvider(basicCredentialsProvider);
                });
            }
            if (getKeystorePath() != null && !getKeystorePath().isEmpty()) {
                try {
                    KeyStore keyStore = KeyStore.getInstance("jks");
                    FileInputStream fileInputStream = new FileInputStream(new File(getKeystorePath()));
                    Throwable th = null;
                    try {
                        try {
                            String keystorePassword = getKeystorePassword();
                            keyStore.load(fileInputStream, keystorePassword == null ? null : keystorePassword.toCharArray());
                            if (0 != 0) {
                                try {
                                    fileInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                fileInputStream.close();
                            }
                            SSLContext build = SSLContexts.custom().loadTrustMaterial(keyStore, new TrustSelfSignedStrategy()).build();
                            SSLIOSessionStrategy sSLIOSessionStrategy = new SSLIOSessionStrategy(build);
                            builder.setHttpClientConfigCallback(httpAsyncClientBuilder2 -> {
                                return httpAsyncClientBuilder2.setSSLContext(build).setSSLStrategy(sSLIOSessionStrategy);
                            });
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (th != null) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            fileInputStream.close();
                        }
                        throw th3;
                    }
                } catch (Exception e) {
                    throw new IOException("Can't load the client certificate from the keystore", e);
                }
            }
            return builder.build();
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Read.class */
    public static abstract class Read extends PTransform<PBegin, PCollection<String>> {
        private static final long MAX_BATCH_SIZE = 10000;

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Read$Builder.class */
        public static abstract class Builder {
            abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);

            abstract Builder setQuery(String str);

            abstract Builder setScrollKeepalive(String str);

            abstract Builder setBatchSize(long j);

            abstract Read build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract ConnectionConfiguration getConnectionConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getQuery();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getScrollKeepalive();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getBatchSize();

        abstract Builder builder();

        public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
            return builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Read withQuery(String str) {
            Preconditions.checkArgument(str != null, "query can not be null");
            Preconditions.checkArgument(!str.isEmpty(), "query can not be empty");
            return builder().setQuery(str).build();
        }

        public Read withScrollKeepalive(String str) {
            Preconditions.checkArgument(str != null, "scrollKeepalive can not be null");
            Preconditions.checkArgument(!"0m".equals(str), "scrollKeepalive can not be 0m");
            return builder().setScrollKeepalive(str).build();
        }

        public Read withBatchSize(long j) {
            Preconditions.checkArgument(j > 0 && j <= MAX_BATCH_SIZE, "batchSize must be > 0 and <= %s, but was: %s", MAX_BATCH_SIZE, j);
            return builder().setBatchSize(j).build();
        }

        public PCollection<String> expand(PBegin pBegin) {
            Preconditions.checkState(getConnectionConfiguration() != null, "withConnectionConfiguration() is required");
            return pBegin.apply(org.apache.beam.sdk.io.Read.from(new BoundedElasticsearchSource(this, null, null, null)));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("query", getQuery()));
            builder.addIfNotNull(DisplayData.item("batchSize", Long.valueOf(getBatchSize())));
            builder.addIfNotNull(DisplayData.item("scrollKeepalive", getScrollKeepalive()));
            getConnectionConfiguration().populateDisplayData(builder);
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write.class */
    public static abstract class Write extends PTransform<PCollection<String>, PDone> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$Builder.class */
        public static abstract class Builder {
            abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);

            abstract Builder setMaxBatchSize(long j);

            abstract Builder setMaxBatchSizeBytes(long j);

            abstract Builder setIdFn(FieldValueExtractFn fieldValueExtractFn);

            abstract Builder setIndexFn(FieldValueExtractFn fieldValueExtractFn);

            abstract Builder setTypeFn(FieldValueExtractFn fieldValueExtractFn);

            abstract Builder setUsePartialUpdate(boolean z);

            abstract Write build();
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$FieldValueExtractFn.class */
        public interface FieldValueExtractFn extends SerializableFunction<JsonNode, String> {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$WriteFn.class */
        public static class WriteFn extends DoFn<String, Void> {
            private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
            private static final int DEFAULT_RETRY_ON_CONFLICT = 5;
            private int backendVersion;
            private final Write spec;
            private transient RestClient restClient;
            private ArrayList<String> batch;
            private long currentBatchSizeBytes;

            /* JADX INFO: Access modifiers changed from: private */
            @JsonInclude(JsonInclude.Include.NON_NULL)
            @JsonPropertyOrder({"_index", "_type", "_id"})
            /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$WriteFn$DocumentMetadata.class */
            public static class DocumentMetadata implements Serializable {

                @JsonProperty("_index")
                final String index;

                @JsonProperty("_type")
                final String type;

                @JsonProperty("_id")
                final String id;

                @JsonProperty("_retry_on_conflict")
                final Integer retryOnConflict;

                DocumentMetadata(String str, String str2, String str3, Integer num) {
                    this.index = str;
                    this.type = str2;
                    this.id = str3;
                    this.retryOnConflict = num;
                }
            }

            @VisibleForTesting
            WriteFn(Write write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                ConnectionConfiguration connectionConfiguration = this.spec.getConnectionConfiguration();
                this.backendVersion = ElasticsearchIO.getBackendVersion(connectionConfiguration);
                this.restClient = connectionConfiguration.createClient();
            }

            @DoFn.StartBundle
            public void startBundle(DoFn<String, Void>.StartBundleContext startBundleContext) {
                this.batch = new ArrayList<>();
                this.currentBatchSizeBytes = 0L;
            }

            private String getDocumentMetadata(String str) throws IOException {
                if (this.spec.getIndexFn() == null && this.spec.getTypeFn() == null && this.spec.getIdFn() == null) {
                    return "{}";
                }
                JsonNode readTree = OBJECT_MAPPER.readTree(str);
                return OBJECT_MAPPER.writeValueAsString(new DocumentMetadata(this.spec.getIndexFn() != null ? lowerCaseOrNull((String) this.spec.getIndexFn().apply(readTree)) : null, this.spec.getTypeFn() != null ? (String) this.spec.getTypeFn().apply(readTree) : null, this.spec.getIdFn() != null ? (String) this.spec.getIdFn().apply(readTree) : null, this.spec.getUsePartialUpdate() ? 5 : null));
            }

            private static String lowerCaseOrNull(String str) {
                if (str == null) {
                    return null;
                }
                return str.toLowerCase();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<String, Void>.ProcessContext processContext) throws Exception {
                String str = (String) processContext.element();
                String documentMetadata = getDocumentMetadata(str);
                if (this.spec.getUsePartialUpdate()) {
                    this.batch.add(String.format("{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n", documentMetadata, str));
                } else {
                    this.batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, str));
                }
                this.currentBatchSizeBytes += str.getBytes(StandardCharsets.UTF_8).length;
                if (this.batch.size() >= this.spec.getMaxBatchSize() || this.currentBatchSizeBytes >= this.spec.getMaxBatchSizeBytes()) {
                    flushBatch();
                }
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn<String, Void>.FinishBundleContext finishBundleContext) throws Exception {
                flushBatch();
            }

            private void flushBatch() throws IOException {
                if (this.batch.isEmpty()) {
                    return;
                }
                StringBuilder sb = new StringBuilder();
                Iterator<String> it = this.batch.iterator();
                while (it.hasNext()) {
                    sb.append(it.next());
                }
                this.batch.clear();
                this.currentBatchSizeBytes = 0L;
                ElasticsearchIO.checkForErrors(this.restClient.performRequest("POST", String.format("/%s/%s/_bulk", this.spec.getConnectionConfiguration().getIndex(), this.spec.getConnectionConfiguration().getType()), Collections.emptyMap(), new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON), new Header[0]), this.backendVersion);
            }

            @DoFn.Teardown
            public void closeClient() throws Exception {
                if (this.restClient != null) {
                    this.restClient.close();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract ConnectionConfiguration getConnectionConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getMaxBatchSize();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getMaxBatchSizeBytes();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract FieldValueExtractFn getIdFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract FieldValueExtractFn getIndexFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract FieldValueExtractFn getTypeFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean getUsePartialUpdate();

        abstract Builder builder();

        public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
            return builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Write withMaxBatchSize(long j) {
            Preconditions.checkArgument(j > 0, "batchSize must be > 0, but was %s", j);
            return builder().setMaxBatchSize(j).build();
        }

        public Write withMaxBatchSizeBytes(long j) {
            Preconditions.checkArgument(j > 0, "batchSizeBytes must be > 0, but was %s", j);
            return builder().setMaxBatchSizeBytes(j).build();
        }

        public Write withIdFn(FieldValueExtractFn fieldValueExtractFn) {
            Preconditions.checkArgument(fieldValueExtractFn != null, "idFn must not be null");
            return builder().setIdFn(fieldValueExtractFn).build();
        }

        public Write withIndexFn(FieldValueExtractFn fieldValueExtractFn) {
            Preconditions.checkArgument(fieldValueExtractFn != null, "indexFn must not be null");
            return builder().setIndexFn(fieldValueExtractFn).build();
        }

        public Write withTypeFn(FieldValueExtractFn fieldValueExtractFn) {
            Preconditions.checkArgument(fieldValueExtractFn != null, "typeFn must not be null");
            return builder().setTypeFn(fieldValueExtractFn).build();
        }

        public Write withUsePartialUpdate(boolean z) {
            return builder().setUsePartialUpdate(z).build();
        }

        public PDone expand(PCollection<String> pCollection) {
            Preconditions.checkState(getConnectionConfiguration() != null, "withConnectionConfiguration() is required");
            pCollection.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(pCollection.getPipeline());
        }
    }

    public static Read read() {
        return new AutoValue_ElasticsearchIO_Read.Builder().setScrollKeepalive("5m").setBatchSize(100L).build();
    }

    public static Write write() {
        return new AutoValue_ElasticsearchIO_Write.Builder().setMaxBatchSize(1000L).setMaxBatchSizeBytes(5242880L).setUsePartialUpdate(false).build();
    }

    private ElasticsearchIO() {
    }

    @VisibleForTesting
    static JsonNode parseResponse(Response response) throws IOException {
        return (JsonNode) mapper.readValue(response.getEntity().getContent(), JsonNode.class);
    }

    static void checkForErrors(Response response, int i) throws IOException {
        JsonNode parseResponse = parseResponse(response);
        if (parseResponse.path("errors").asBoolean()) {
            StringBuilder sb = new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
            Iterator it = parseResponse.path("items").iterator();
            while (it.hasNext()) {
                JsonNode jsonNode = (JsonNode) it.next();
                String str = "";
                if (i == 2) {
                    str = "create";
                } else if (i == 5) {
                    str = "index";
                }
                JsonNode path = jsonNode.path(str);
                JsonNode jsonNode2 = path.get("error");
                if (jsonNode2 != null) {
                    sb.append(String.format("%nDocument id %s: %s (%s)", path.path("_id").asText(), jsonNode2.path("reason").asText(), jsonNode2.path("type").asText()));
                    JsonNode jsonNode3 = jsonNode2.get("caused_by");
                    if (jsonNode3 != null) {
                        sb.append(String.format("%nCaused by: %s (%s)", jsonNode3.path("reason").asText(), jsonNode3.path("type").asText()));
                    }
                }
            }
            throw new IOException(sb.toString());
        }
    }

    static int getBackendVersion(ConnectionConfiguration connectionConfiguration) {
        try {
            RestClient createClient = connectionConfiguration.createClient();
            Throwable th = null;
            try {
                int parseInt = Integer.parseInt(parseResponse(createClient.performRequest("GET", "", new Header[0])).path("version").path("number").asText().substring(0, 1));
                Preconditions.checkArgument(parseInt == 2 || parseInt == 5, "The Elasticsearch version to connect to is %s.x. This version of the ElasticsearchIO is only compatible with Elasticsearch v5.x and v2.x", parseInt);
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createClient.close();
                    }
                }
                return parseInt;
            } finally {
            }
        } catch (IOException e) {
            throw new IllegalArgumentException("Cannot get Elasticsearch version");
        }
    }
}
