/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.hadoop.rest;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.FieldPresenceValidation;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.NetworkUtils;
import org.elasticsearch.hadoop.rest.PartitionDefinition;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestClient;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.rest.SearchRequestBuilder;
import org.elasticsearch.hadoop.rest.query.BoolQueryBuilder;
import org.elasticsearch.hadoop.rest.query.ConstantScoreQueryBuilder;
import org.elasticsearch.hadoop.rest.query.QueryBuilder;
import org.elasticsearch.hadoop.rest.query.QueryUtils;
import org.elasticsearch.hadoop.rest.query.RawQueryBuilder;
import org.elasticsearch.hadoop.rest.request.GetAliasesRequestBuilder;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReaderConfigBuilder;
import org.elasticsearch.hadoop.serialization.builder.ValueReader;
import org.elasticsearch.hadoop.serialization.dto.IndicesAliases;
import org.elasticsearch.hadoop.serialization.dto.NodeInfo;
import org.elasticsearch.hadoop.serialization.dto.ShardInfo;
import org.elasticsearch.hadoop.serialization.dto.mapping.Mapping;
import org.elasticsearch.hadoop.serialization.dto.mapping.MappingSet;
import org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils;
import org.elasticsearch.hadoop.serialization.field.IndexExtractor;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.ClusterInfo;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.IOUtils;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.Version;

public abstract class RestService
implements Serializable {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static List<PartitionDefinition> findPartitions(Settings settings, Log log) {
        Version.logVersion();
        InitializationUtils.validateSettings(settings);
        ClusterInfo clusterInfo = InitializationUtils.discoverAndValidateClusterInfo(settings, log);
        InitializationUtils.validateSettingsForReading(settings);
        List<NodeInfo> nodes = InitializationUtils.discoverNodesIfNeeded(settings, log);
        InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
        InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
        InitializationUtils.filterNonIngestNodesIfNeeded(settings, log);
        try (RestRepository client = new RestRepository(settings);){
            boolean indexExists = client.resourceExists(true);
            List<List<Map<String, Object>>> shards = null;
            if (!indexExists) {
                if (!settings.getIndexReadMissingAsEmpty()) throw new EsHadoopIllegalArgumentException(String.format("Index [%s] missing and settings [%s] is set to false", settings.getResourceRead(), "es.index.read.missing.as.empty"));
                log.info((Object)String.format("Index [%s] missing - treating it as empty", settings.getResourceRead()));
                shards = Collections.emptyList();
            } else {
                shards = client.getReadTargetShards();
                if (log.isTraceEnabled()) {
                    log.trace((Object)("Creating splits for shards " + shards));
                }
            }
            log.info((Object)String.format("Reading from [%s]", settings.getResourceRead()));
            MappingSet mapping = null;
            if (!shards.isEmpty()) {
                FieldPresenceValidation validation;
                mapping = client.getMappings();
                if (log.isDebugEnabled()) {
                    log.debug((Object)String.format("Discovered resolved mapping {%s} for [%s]", mapping.getResolvedView(), settings.getResourceRead()));
                }
                if ((validation = settings.getReadFieldExistanceValidation()).isRequired()) {
                    MappingUtils.validateMapping(SettingsUtils.determineSourceFields(settings), mapping.getResolvedView(), validation, log);
                }
            }
            HashMap<String, NodeInfo> nodesMap = new HashMap<String, NodeInfo>();
            if (nodes != null) {
                for (NodeInfo node : nodes) {
                    nodesMap.put(node.getId(), node);
                }
            }
            List<PartitionDefinition> partitions = clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null ? RestService.findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log) : RestService.findShardPartitions(settings, mapping, nodesMap, shards, log);
            Collections.shuffle(partitions);
            List<PartitionDefinition> list = partitions;
            return list;
        }
    }

    static List<PartitionDefinition> findShardPartitions(Settings settings, MappingSet mappingSet, Map<String, NodeInfo> nodes, List<List<Map<String, Object>>> shards, Log log) {
        Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
        ArrayList<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
        PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, resolvedMapping);
        for (List<Map<String, Object>> group : shards) {
            String index = null;
            int shardId = -1;
            ArrayList<String> locationList = new ArrayList<String>();
            for (Map<String, Object> replica : group) {
                ShardInfo shard = new ShardInfo(replica);
                index = shard.getIndex();
                shardId = shard.getName();
                if (!nodes.containsKey(shard.getNode())) continue;
                locationList.add(nodes.get(shard.getNode()).getPublishAddress());
            }
            if (index == null) {
                if (settings.getIndexReadAllowRedStatus()) {
                    log.warn((Object)"Shard information is missing from an index and will not be reached during job execution. Assuming shard is unavailable and cluster is red! Continuing with read operation by skipping this shard! This may result in incomplete data retrieval!");
                    continue;
                }
                throw new IllegalStateException("Could not locate shard information for one of the read indices. Check your cluster status to see if it is unstable!");
            }
            PartitionDefinition partition = partitionBuilder.build(index, shardId, locationList.toArray(new String[0]));
            partitions.add(partition);
        }
        return partitions;
    }

    static List<PartitionDefinition> findSlicePartitions(RestClient client, Settings settings, MappingSet mappingSet, Map<String, NodeInfo> nodes, List<List<Map<String, Object>>> shards, Log log) {
        QueryBuilder query = QueryUtils.parseQueryAndFilters(settings);
        Integer maxDocsPerPartition = settings.getMaxDocsPerPartition();
        Assert.notNull(maxDocsPerPartition, "Attempting to find slice partitions but maximum documents per partition is not set.");
        Resource readResource = new Resource(settings, true);
        Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
        PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, resolvedMapping);
        ArrayList<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
        for (List<Map<String, Object>> group : shards) {
            String index = null;
            int shardId = -1;
            ArrayList<String> locationList = new ArrayList<String>();
            for (Map<String, Object> replica : group) {
                ShardInfo shard = new ShardInfo(replica);
                index = shard.getIndex();
                shardId = shard.getName();
                if (!nodes.containsKey(shard.getNode())) continue;
                locationList.add(nodes.get(shard.getNode()).getPublishAddress());
            }
            String[] locations = locationList.toArray(new String[0]);
            if (index == null) {
                if (settings.getIndexReadAllowRedStatus()) {
                    log.warn((Object)"Shard information is missing from an index and will not be reached during job execution. Assuming shard is unavailable and cluster is red! Continuing with read operation by skipping this shard! This may result in incomplete data retrieval!");
                    continue;
                }
                throw new IllegalStateException("Could not locate shard information for one of the read indices. Check your cluster status to see if it is unstable!");
            }
            long numDocs = readResource.isTyped() ? client.count(index, readResource.type(), Integer.toString(shardId), query) : client.countIndexShard(index, Integer.toString(shardId), query);
            int numPartitions = (int)Math.max(1L, numDocs / (long)maxDocsPerPartition.intValue());
            for (int i = 0; i < numPartitions; ++i) {
                PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
                partitions.add(partitionBuilder.build(index, shardId, slice, locations));
            }
        }
        return partitions;
    }

    static String checkLocality(String[] locations, Log log) {
        block6: {
            try {
                InetAddress[] candidates = NetworkUtils.getGlobalInterfaces();
                for (String address : locations) {
                    StringUtils.IpAndPort ipAndPort = StringUtils.parseIpAddress(address);
                    InetAddress addr = InetAddress.getByName(ipAndPort.ip);
                    for (InetAddress candidate : candidates) {
                        if (!addr.equals(candidate)) continue;
                        return address;
                    }
                }
            }
            catch (SocketException e) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)"Unable to retrieve the global interfaces of the system", (Throwable)e);
                }
            }
            catch (UnknownHostException e) {
                if (!log.isDebugEnabled()) break block6;
                log.debug((Object)"Unable to retrieve IP address", (Throwable)e);
            }
        }
        return null;
    }

    public static PartitionReader createReader(Settings settings, PartitionDefinition partition, Log log) {
        IndicesAliases indicesAliases;
        Map<String, IndicesAliases.Alias> aliases;
        String pinAddress;
        if (!SettingsUtils.hasPinnedNode(settings) && partition.getLocations().length > 0 && (pinAddress = RestService.checkLocality(partition.getLocations(), log)) != null) {
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Partition reader instance [%s] assigned to [%s]", partition, pinAddress));
            }
            SettingsUtils.pinNode(settings, pinAddress);
        }
        ClusterInfo clusterInfo = InitializationUtils.discoverClusterInfo(settings, log);
        ValueReader reader = (ValueReader)ObjectUtils.instantiate(settings.getSerializerValueReaderClassName(), settings);
        RestRepository repository = new RestRepository(settings);
        Mapping fieldMapping = null;
        if (StringUtils.hasText(partition.getSerializedMapping())) {
            fieldMapping = IOUtils.deserializeFromJsonString(partition.getSerializedMapping(), Mapping.class);
        } else {
            log.warn((Object)String.format("No mapping found for [%s] - either no index exists or the partition configuration has been corrupted", partition));
        }
        ScrollReader scrollReader = new ScrollReader(ScrollReaderConfigBuilder.builder(reader, fieldMapping, settings));
        if (settings.getNodesClientOnly()) {
            String clientNode = repository.getRestClient().getCurrentNode();
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Client-node routing detected; partition reader instance [%s] assigned to [%s]", partition, clientNode));
            }
            SettingsUtils.pinNode(settings, clientNode);
        }
        boolean includeVersion = settings.getReadMetadata() && settings.getReadMetadataVersion();
        Resource read = new Resource(settings, true);
        SearchRequestBuilder requestBuilder = new SearchRequestBuilder(clusterInfo.getMajorVersion(), includeVersion).resource(read).indices(partition.getIndex()).query(QueryUtils.parseQuery(settings)).scroll(settings.getScrollKeepAlive()).size(settings.getScrollSize()).limit(settings.getScrollLimit()).fields(SettingsUtils.determineSourceFields(settings)).filters(QueryUtils.parseFilters(settings)).shard(Integer.toString(partition.getShardId())).readMetadata(settings.getReadMetadata()).local(true).preference(settings.getShardPreference()).excludeSource(settings.getExcludeSource());
        if (partition.getSlice() != null && partition.getSlice().max > 1) {
            requestBuilder.slice(partition.getSlice().id, partition.getSlice().max);
        }
        String[] indices = read.index().split(",");
        if (!QueryUtils.isExplicitlyRequested(partition.getIndex(), indices) && (aliases = (indicesAliases = new GetAliasesRequestBuilder(repository.getRestClient()).indices(partition.getIndex()).execute().getIndices()).getAliases(partition.getIndex())) != null && aliases.size() > 0) {
            requestBuilder = RestService.applyAliasMetadata(clusterInfo.getMajorVersion(), aliases, requestBuilder, partition.getIndex(), indices);
        }
        return new PartitionReader(scrollReader, repository, requestBuilder);
    }

    static SearchRequestBuilder applyAliasMetadata(EsMajorVersion version, Map<String, IndicesAliases.Alias> aliases, SearchRequestBuilder searchRequestBuilder, String index, String ... indicesOrAliases) {
        if (QueryUtils.isExplicitlyRequested(index, indicesOrAliases)) {
            return searchRequestBuilder;
        }
        HashSet<String> routing = new HashSet<String>();
        ArrayList<RawQueryBuilder> aliasFilters = new ArrayList<RawQueryBuilder>();
        for (IndicesAliases.Alias alias : aliases.values()) {
            if (!QueryUtils.isExplicitlyRequested(alias.getName(), indicesOrAliases)) continue;
            if (StringUtils.hasLength(alias.getSearchRouting())) {
                for (String value : alias.getSearchRouting().split(",")) {
                    routing.add(value.trim());
                }
            }
            if (alias.getFilter() == null) continue;
            try {
                aliasFilters.add(new RawQueryBuilder(alias.getFilter(), false));
            }
            catch (IOException iOException) {
                throw new EsHadoopIllegalArgumentException("Failed to parse alias filter: [" + alias.getFilter() + "]");
            }
        }
        if (aliasFilters.size() > 0) {
            QueryBuilder aliasQuery;
            if (aliasFilters.size() == 1) {
                aliasQuery = (QueryBuilder)aliasFilters.get(0);
            } else {
                aliasQuery = new BoolQueryBuilder();
                for (QueryBuilder queryBuilder : aliasFilters) {
                    ((BoolQueryBuilder)aliasQuery).should(queryBuilder);
                }
            }
            if (searchRequestBuilder.query() == null) {
                searchRequestBuilder.query(aliasQuery);
            } else {
                BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
                boolQueryBuilder.must(searchRequestBuilder.query());
                if (version.after(EsMajorVersion.V_1_X)) {
                    boolQueryBuilder.filter(aliasQuery);
                } else {
                    boolQueryBuilder.must(new ConstantScoreQueryBuilder().filter(aliasQuery).boost(0.0f));
                }
                searchRequestBuilder.query(boolQueryBuilder);
            }
        }
        if (routing.size() > 0) {
            searchRequestBuilder.routing(StringUtils.concatenate(routing, ","));
        }
        return searchRequestBuilder;
    }

    public static List<PartitionDefinition> assignPartitions(List<PartitionDefinition> partitions, int currentTask, int totalTasks) {
        int esPartitions = partitions.size();
        if (totalTasks >= esPartitions) {
            return currentTask >= esPartitions ? Collections.emptyList() : Collections.singletonList(partitions.get(currentTask));
        }
        int partitionsPerTask = esPartitions / totalTasks;
        int remainder = esPartitions % totalTasks;
        int partitionsPerCurrentTask = partitionsPerTask;
        if (currentTask < remainder) {
            ++partitionsPerCurrentTask;
        }
        int offset = partitionsPerTask * currentTask;
        if (currentTask != 0) {
            offset += remainder > currentTask ? 1 : remainder;
        }
        if (partitionsPerCurrentTask == 1) {
            return Collections.singletonList(partitions.get(offset));
        }
        ArrayList<PartitionDefinition> pa = new ArrayList<PartitionDefinition>(partitionsPerCurrentTask);
        for (int index = offset; index < offset + partitionsPerCurrentTask; ++index) {
            pa.add(partitions.get(index));
        }
        return pa;
    }

    public static MultiReaderIterator multiReader(Settings settings, List<PartitionDefinition> definitions, Log log) {
        return new MultiReaderIterator(definitions, settings, log);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static PartitionWriter createWriter(Settings settings, long currentSplit, int totalSplits, Log log) {
        RestRepository repository;
        Version.logVersion();
        InitializationUtils.validateSettings(settings);
        InitializationUtils.discoverAndValidateClusterInfo(settings, log);
        InitializationUtils.validateSettingsForWriting(settings);
        InitializationUtils.discoverNodesIfNeeded(settings, log);
        InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
        InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
        InitializationUtils.filterNonIngestNodesIfNeeded(settings, log);
        List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
        int selectedNode = currentSplit < 0L ? new Random().nextInt(nodes.size()) : (int)(currentSplit % (long)nodes.size());
        SettingsUtils.pinNode(settings, nodes.get(selectedNode));
        Resource resource = new Resource(settings, false);
        log.info((Object)String.format("Writing to [%s]", resource));
        IndexExtractor iformat = (IndexExtractor)ObjectUtils.instantiate(settings.getMappingIndexExtractorClassName(), settings);
        iformat.compile(resource.toString());
        if (iformat.hasPattern()) {
            repository = RestService.initMultiIndices(settings, currentSplit, resource, log);
        } else {
            if (!StringUtils.isValidSingularIndexName(resource.index())) {
                throw new EsHadoopIllegalArgumentException("Illegal write index name [" + resource.index() + "]. Write resources must be lowercase singular index names, with no illegal pattern characters except for multi-resource writes.");
            }
            GetAliasesRequestBuilder.Response response = null;
            try (RestClient bootstrap = new RestClient(settings);){
                response = new GetAliasesRequestBuilder(bootstrap).aliases(resource.index()).execute();
            }
            repository = response != null && response.hasAliases() ? RestService.initAliasWrite(response, settings, currentSplit, resource, log) : RestService.initSingleIndex(settings, currentSplit, resource, log);
        }
        return new PartitionWriter(settings, currentSplit, totalSplits, repository);
    }

    private static RestRepository initSingleIndex(Settings settings, long currentInstance, Resource resource, Log log) {
        RestRepository repository;
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Resource [%s] resolves as a single index", resource));
        }
        if ((repository = new RestRepository(settings)).touch() && repository.waitForYellow()) {
            log.warn((Object)String.format("Timed out waiting for index [%s] to reach yellow health", resource));
        }
        if (settings.getNodesWANOnly()) {
            String node = SettingsUtils.getPinnedNode(settings);
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Partition writer instance [%s] assigned to [%s]", currentInstance, node));
            }
            return repository;
        }
        if (settings.getNodesClientOnly()) {
            String clientNode = repository.getRestClient().getCurrentNode();
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Client-node routing detected; partition writer instance [%s] assigned to [%s]", currentInstance, clientNode));
            }
            return repository;
        }
        Map<ShardInfo, NodeInfo> targetShards = repository.getWriteTargetPrimaryShards(settings.getNodesClientOnly());
        repository.close();
        Assert.isTrue(!targetShards.isEmpty(), String.format("Cannot determine write shards for [%s]; likely its format is incorrect (maybe it contains illegal characters? or all shards failed?)", resource));
        ArrayList<ShardInfo> orderedShards = new ArrayList<ShardInfo>(targetShards.keySet());
        Collections.sort(orderedShards);
        if (log.isTraceEnabled()) {
            log.trace((Object)String.format("Partition writer instance [%s] discovered [%s] primary shards %s", currentInstance, orderedShards.size(), orderedShards));
        }
        if (currentInstance <= 0L) {
            currentInstance = new Random().nextInt(targetShards.size()) + 1;
        }
        int bucket = (int)(currentInstance % (long)targetShards.size());
        ShardInfo chosenShard = (ShardInfo)orderedShards.get(bucket);
        NodeInfo targetNode = targetShards.get(chosenShard);
        SettingsUtils.pinNode(settings, targetNode.getPublishAddress());
        String node = SettingsUtils.getPinnedNode(settings);
        repository = new RestRepository(settings);
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Partition writer instance [%s] assigned to primary shard [%s] at address [%s]", currentInstance, chosenShard.getName(), node));
        }
        return repository;
    }

    private static RestRepository initMultiIndices(Settings settings, long currentInstance, Resource resource, Log log) {
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Resource [%s] resolves as an index pattern", resource));
        }
        String node = SettingsUtils.getPinnedNode(settings);
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Partition writer instance [%s] assigned to [%s]", currentInstance, node));
        }
        return new RestRepository(settings);
    }

    private static RestRepository initAliasWrite(GetAliasesRequestBuilder.Response response, Settings settings, long currentInstance, Resource resource, Log log) {
        Map<String, Map<String, IndicesAliases.Alias>> indexAliasTable;
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Resource [%s] resolves as an index alias", resource));
        }
        if ((indexAliasTable = response.getIndices().getAll()).size() < 1) {
            throw new EsHadoopIllegalArgumentException("Cannot initialize alias write resource [" + resource.index() + "] if it does not have any alias entries.");
        }
        if (indexAliasTable.size() > 1) {
            String currentWriteIndex = null;
            for (Map.Entry<String, Map<String, IndicesAliases.Alias>> indexRow : indexAliasTable.entrySet()) {
                String indexName = indexRow.getKey();
                Map<String, IndicesAliases.Alias> aliases = indexRow.getValue();
                IndicesAliases.Alias aliasInfo = aliases.get(resource.index());
                if (!aliasInfo.isWriteIndex()) continue;
                currentWriteIndex = indexName;
                break;
            }
            if (currentWriteIndex == null) {
                throw new EsHadoopIllegalArgumentException("Attempting to write to alias [" + resource.index() + "], but detected multiple indices [" + indexAliasTable.size() + "] with no write index selected. Bailing out...");
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Writing to currently configured write-index [%s]", currentWriteIndex));
            }
        } else if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Writing to the alias's single configured index [%s]", indexAliasTable.keySet().iterator().next()));
        }
        String node = SettingsUtils.getPinnedNode(settings);
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Partition writer instance [%s] assigned to [%s]", currentInstance, node));
        }
        return new RestRepository(settings);
    }

    public static class PartitionReader
    implements Closeable {
        public final ScrollReader scrollReader;
        public final RestRepository client;
        public final SearchRequestBuilder queryBuilder;
        private ScrollQuery scrollQuery;
        private boolean closed = false;

        PartitionReader(ScrollReader scrollReader, RestRepository client, SearchRequestBuilder queryBuilder) {
            this.scrollReader = scrollReader;
            this.client = client;
            this.queryBuilder = queryBuilder;
        }

        @Override
        public void close() {
            if (!this.closed) {
                this.closed = true;
                if (this.scrollQuery != null) {
                    this.scrollQuery.close();
                }
                this.client.close();
            }
        }

        public ScrollQuery scrollQuery() {
            if (this.scrollQuery == null) {
                this.scrollQuery = this.queryBuilder.build(this.client, this.scrollReader);
            }
            return this.scrollQuery;
        }
    }

    public static class MultiReaderIterator
    implements Closeable,
    Iterator {
        private final List<PartitionDefinition> definitions;
        private final Iterator<PartitionDefinition> definitionIterator;
        private PartitionReader currentReader;
        private ScrollQuery currentScroll;
        private boolean finished = false;
        private final Settings settings;
        private final Log log;

        MultiReaderIterator(List<PartitionDefinition> defs, Settings settings, Log log) {
            this.definitions = defs;
            this.definitionIterator = defs.iterator();
            this.settings = settings;
            this.log = log;
        }

        @Override
        public void close() {
            if (this.finished) {
                return;
            }
            ScrollQuery sq = this.getCurrent();
            if (sq != null) {
                sq.close();
            }
            if (this.currentReader != null) {
                this.currentReader.close();
            }
            this.finished = true;
        }

        @Override
        public boolean hasNext() {
            ScrollQuery sq = this.getCurrent();
            return sq != null ? sq.hasNext() : false;
        }

        private ScrollQuery getCurrent() {
            if (this.finished) {
                return null;
            }
            boolean hasValue = false;
            while (!hasValue) {
                if (this.currentReader == null) {
                    if (this.definitionIterator.hasNext()) {
                        this.currentReader = RestService.createReader(this.settings, this.definitionIterator.next(), this.log);
                    } else {
                        this.finished = true;
                        return null;
                    }
                }
                if (this.currentScroll == null) {
                    this.currentScroll = this.currentReader.scrollQuery();
                }
                if (hasValue = this.currentScroll.hasNext()) continue;
                this.currentScroll.close();
                this.currentScroll = null;
                this.currentReader.close();
                this.currentReader = null;
            }
            return this.currentScroll;
        }

        public Object[] next() {
            ScrollQuery sq = this.getCurrent();
            return sq.next();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    public static class PartitionWriter
    implements Closeable {
        public final RestRepository repository;
        public final long number;
        public final int total;
        public final Settings settings;
        private boolean closed = false;

        PartitionWriter(Settings settings, long splitIndex, int splitsSize, RestRepository repository) {
            this.settings = settings;
            this.repository = repository;
            this.number = splitIndex;
            this.total = splitsSize;
        }

        @Override
        public void close() {
            if (!this.closed) {
                this.closed = true;
                this.repository.close();
            }
        }
    }
}

