/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.indexer.messages;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.github.joschi.jadconfig.util.Duration;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.WaitStrategy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import io.searchbox.action.Action;
import io.searchbox.action.BulkableAction;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.DocumentResult;
import io.searchbox.core.Get;
import io.searchbox.core.Index;
import io.searchbox.indices.Analyze;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.http.client.config.RequestConfig;
import org.graylog2.indexer.ElasticsearchException;
import org.graylog2.indexer.IndexFailure;
import org.graylog2.indexer.IndexFailureImpl;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.cluster.jest.JestUtils;
import org.graylog2.indexer.messages.DocumentNotFoundException;
import org.graylog2.indexer.messages.IndexBlockRetryAttempt;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.plugin.Message;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class Messages {
    private static final Logger LOG = LoggerFactory.getLogger(Messages.class);
    private static final Duration MAX_WAIT_TIME = Duration.seconds((long)30L);
    @VisibleForTesting
    static final WaitStrategy exponentialWaitMilliseconds = WaitStrategies.exponentialWait((long)MAX_WAIT_TIME.getQuantity(), (TimeUnit)MAX_WAIT_TIME.getUnit());
    private static final int retrySecondsMultiplier = 500;
    @VisibleForTesting
    static final WaitStrategy exponentialWaitSeconds = WaitStrategies.exponentialWait((long)500L, (long)MAX_WAIT_TIME.getQuantity(), (TimeUnit)MAX_WAIT_TIME.getUnit());
    private static final Retryer<BulkResult> BULK_REQUEST_RETRYER = RetryerBuilder.newBuilder().retryIfException(t -> t instanceof IOException).retryIfResult(Messages::requestFailedForRetryableReason).withWaitStrategy(exponentialWaitMilliseconds).withRetryListener(new RetryListener(){

        public <V> void onRetry(Attempt<V> attempt) {
            if (attempt.hasException()) {
                LOG.error("Caught exception during bulk indexing: {}, retrying (attempt #{}).", (Object)attempt.getExceptionCause(), (Object)attempt.getAttemptNumber());
            } else if (attempt.getResult() instanceof BulkResult && Messages.requestFailedForRetryableReason((BulkResult)attempt.getResult())) {
                BulkResult bulkResult = (BulkResult)attempt.getResult();
                LOG.error("Bulk indexing failed: {}, retrying (attempt #{})", (Object)Messages.reasonOfFailure(bulkResult), (Object)attempt.getAttemptNumber());
            } else if (attempt.getAttemptNumber() > 1L) {
                LOG.info("Bulk indexing finally successful (attempt #{}).", (Object)attempt.getAttemptNumber());
            }
        }
    }).build();
    static final String INDEX_BLOCK_ERROR = "cluster_block_exception";
    static final String INDEX_BLOCK_REASON = "blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];";
    private final Meter invalidTimestampMeter;
    private final JestClient client;
    private final ProcessingStatusRecorder processingStatusRecorder;
    private final boolean useExpectContinue;
    private final LinkedBlockingQueue<List<IndexFailure>> indexFailureQueue;
    private final Counter outputByteCounter;
    private final Counter systemTrafficCounter;

    private static boolean requestFailedForRetryableReason(BulkResult result) {
        return Messages.isAliasWithInvalidTargets(result);
    }

    private static boolean isAliasWithInvalidTargets(BulkResult result) {
        return result.getResponseCode() == 400 && Messages.reasonOfFailure(result).startsWith("no write index is defined for alias");
    }

    private static String reasonOfFailure(BulkResult result) {
        return result.getJsonObject().path("error").path("reason").asText();
    }

    @Inject
    public Messages(MetricRegistry metricRegistry, JestClient client, ProcessingStatusRecorder processingStatusRecorder, @Named(value="elasticsearch_use_expect_continue") boolean useExpectContinue) {
        this.invalidTimestampMeter = metricRegistry.meter(MetricRegistry.name(Messages.class, (String[])new String[]{"invalid-timestamps"}));
        this.outputByteCounter = metricRegistry.counter("org.graylog2.traffic.output");
        this.systemTrafficCounter = metricRegistry.counter("org.graylog2.traffic.system-output-traffic");
        this.client = client;
        this.processingStatusRecorder = processingStatusRecorder;
        this.useExpectContinue = useExpectContinue;
        this.indexFailureQueue = new LinkedBlockingQueue(1000);
    }

    public ResultMessage get(String messageId, String index) throws DocumentNotFoundException, IOException {
        Get get = ((Get.Builder)new Get.Builder(index, messageId).type("message")).build();
        DocumentResult result = (DocumentResult)this.client.execute((Action)get);
        if (!result.isSucceeded()) {
            throw new DocumentNotFoundException(index, messageId);
        }
        Map message = (Map)result.getSourceAsObject(Map.class, false);
        return ResultMessage.parseFromSource(result.getId(), result.getIndex(), message);
    }

    public List<String> analyze(String toAnalyze, String index, String analyzer) throws IOException {
        Analyze analyze = new Analyze.Builder().index(index).analyzer(analyzer).text(toAnalyze).build();
        JestResult result = this.client.execute((Action)analyze);
        List tokens = (List)result.getValue("tokens");
        ArrayList<String> terms = new ArrayList<String>(tokens.size());
        tokens.forEach(token -> terms.add((String)token.get("token")));
        return terms;
    }

    public List<String> bulkIndex(List<Map.Entry<IndexSet, Message>> messageList) {
        return this.bulkIndex(messageList, false);
    }

    public List<String> bulkIndex(List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic) {
        if (messageList.isEmpty()) {
            return Collections.emptyList();
        }
        int chunkSize = messageList.size();
        int offset = 0;
        ArrayList<BulkResult.BulkResultItem> failedItems = new ArrayList<BulkResult.BulkResultItem>();
        while (true) {
            try {
                List<BulkResult.BulkResultItem> failures = this.bulkIndexChunked(messageList, isSystemTraffic, offset, chunkSize);
                failedItems.addAll(failures);
            }
            catch (EntityTooLargeException e) {
                LOG.warn("Bulk index failed with 'Request Entity Too Large' error. Retrying by splitting up batch size <{}>.", (Object)chunkSize);
                if (chunkSize == messageList.size()) {
                    LOG.warn("Consider lowering the \"output_batch_size\" setting.");
                }
                failedItems.addAll(e.failedItems);
                offset += e.indexedSuccessfully;
                if ((chunkSize /= 2) != 0) continue;
                throw new ElasticsearchException("Bulk index cannot split output batch any further.");
            }
            break;
        }
        if (!failedItems.isEmpty()) {
            Set<String> failedIds = failedItems.stream().map(item -> item.id).collect(Collectors.toSet());
            this.recordTimestamp(messageList, failedIds);
            return this.propagateFailure(failedItems, messageList);
        }
        this.recordTimestamp(messageList, Collections.emptySet());
        return Collections.emptyList();
    }

    private List<BulkResult.BulkResultItem> bulkIndexChunked(List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic, int offset, int chunkSize) throws EntityTooLargeException {
        chunkSize = Math.min(messageList.size(), chunkSize);
        ArrayList<BulkResult.BulkResultItem> failedItems = new ArrayList<BulkResult.BulkResultItem>();
        Iterable chunks = Iterables.partition(messageList.subList(offset, messageList.size()), (int)chunkSize);
        int chunkCount = 1;
        int indexedSuccessfully = 0;
        for (List chunk : chunks) {
            long messageSizes = chunk.stream().mapToLong(m -> ((Message)m.getValue()).getSize()).sum();
            BulkResult result = this.bulkIndexChunk(chunk);
            if (result.getResponseCode() == 413) {
                throw new EntityTooLargeException(indexedSuccessfully, failedItems);
            }
            indexedSuccessfully += chunk.size();
            Set<BulkResult.BulkResultItem> remainingFailures = this.retryOnlyIndexBlockItemsForever(chunk, result.getFailedItems());
            failedItems.addAll(remainingFailures);
            if (isSystemTraffic) {
                this.systemTrafficCounter.inc(messageSizes);
            } else {
                this.outputByteCounter.inc(messageSizes);
            }
            if (LOG.isDebugEnabled()) {
                String chunkInfo = "";
                if (chunkSize != messageList.size()) {
                    chunkInfo = String.format(Locale.ROOT, " (chunk %d/%d offset %d)", chunkCount, (int)Math.ceil((double)messageList.size() / (double)chunkSize), offset);
                }
                LOG.debug("Index: Bulk indexed {} messages{}, failures: {}", new Object[]{result.getItems().size(), chunkInfo, failedItems.size()});
            }
            if (!remainingFailures.isEmpty()) {
                LOG.error("Failed to index [{}] messages. Please check the index error log in your web interface for the reason. Error: {}", (Object)remainingFailures.size(), (Object)result.getErrorMessage());
            }
            ++chunkCount;
        }
        return failedItems;
    }

    private BulkResult bulkIndexChunk(List<Map.Entry<IndexSet, Message>> chunk) {
        Bulk.Builder bulk = new Bulk.Builder();
        for (Map.Entry<IndexSet, Message> entry : chunk) {
            Message message = entry.getValue();
            bulk.addAction((BulkableAction)((Index.Builder)((Index.Builder)((Index.Builder)new Index.Builder(message.toElasticSearchObject(this.invalidTimestampMeter)).index(entry.getKey().getWriteIndexAlias())).type("message")).id(message.getId())).build());
        }
        return this.runBulkRequest(bulk.build(), chunk.size());
    }

    private Set<BulkResult.BulkResultItem> retryOnlyIndexBlockItemsForever(List<Map.Entry<IndexSet, Message>> chunk, List<BulkResult.BulkResultItem> allFailedItems) {
        Set<BulkResult.BulkResultItem> indexBlocks = this.indexBlocksFrom(allFailedItems);
        HashSet<BulkResult.BulkResultItem> otherFailures = new HashSet<BulkResult.BulkResultItem>((Collection<BulkResult.BulkResultItem>)Sets.difference(new HashSet<BulkResult.BulkResultItem>(allFailedItems), indexBlocks));
        List<Map.Entry<IndexSet, Message>> blockedMessages = this.messagesForResultItems(chunk, indexBlocks);
        if (!indexBlocks.isEmpty()) {
            LOG.warn("Retrying {} messages, because their indices are blocked with status [read-only / allow delete]", (Object)indexBlocks.size());
        }
        long attempt = 1L;
        while (!indexBlocks.isEmpty()) {
            this.waitBeforeRetrying(attempt++);
            BulkResult bulkResult = this.bulkIndexChunk(blockedMessages);
            List failedItems = bulkResult.getFailedItems();
            indexBlocks = this.indexBlocksFrom(failedItems);
            blockedMessages = this.messagesForResultItems(blockedMessages, indexBlocks);
            Sets.SetView newOtherFailures = Sets.difference(new HashSet(failedItems), indexBlocks);
            otherFailures.addAll((Collection<BulkResult.BulkResultItem>)newOtherFailures);
            if (!indexBlocks.isEmpty()) continue;
            LOG.info("Retries were successful after {} attempts. Ingestion will continue now.", (Object)attempt);
        }
        return otherFailures;
    }

    private void waitBeforeRetrying(long attempt) {
        try {
            long sleepTime = exponentialWaitSeconds.computeSleepTime((Attempt)new IndexBlockRetryAttempt(attempt));
            Thread.sleep(sleepTime);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private List<Map.Entry<IndexSet, Message>> messagesForResultItems(List<Map.Entry<IndexSet, Message>> chunk, Set<BulkResult.BulkResultItem> indexBlocks) {
        Set blockedMessageIds = indexBlocks.stream().map(item -> item.id).collect(Collectors.toSet());
        return chunk.stream().filter(entry -> blockedMessageIds.contains(((Message)entry.getValue()).getId())).collect(Collectors.toList());
    }

    private Set<BulkResult.BulkResultItem> indexBlocksFrom(List<BulkResult.BulkResultItem> allFailedItems) {
        return allFailedItems.stream().filter(this::hasFailedDueToBlockedIndex).collect(Collectors.toSet());
    }

    private boolean hasFailedDueToBlockedIndex(BulkResult.BulkResultItem item) {
        return item.errorType.equals(INDEX_BLOCK_ERROR) && item.errorReason.equals(INDEX_BLOCK_REASON);
    }

    private void recordTimestamp(List<Map.Entry<IndexSet, Message>> messageList, Set<String> failedIds) {
        for (Map.Entry<IndexSet, Message> entry : messageList) {
            Message message = entry.getValue();
            if (failedIds.contains(message.getId())) continue;
            this.processingStatusRecorder.updatePostIndexingReceiveTime(message.getReceiveTime());
        }
    }

    private BulkResult runBulkRequest(Bulk request, int count) {
        try {
            if (this.useExpectContinue) {
                RequestConfig requestConfig = RequestConfig.custom().setExpectContinueEnabled(true).build();
                return (BulkResult)BULK_REQUEST_RETRYER.call(() -> (BulkResult)JestUtils.execute(this.client, requestConfig, request));
            }
            return (BulkResult)BULK_REQUEST_RETRYER.call(() -> (BulkResult)this.client.execute((Action)request));
        }
        catch (RetryException | ExecutionException e) {
            if (e instanceof RetryException) {
                LOG.error("Could not bulk index {} messages. Giving up after {} attempts.", (Object)count, (Object)((RetryException)e).getNumberOfFailedAttempts());
            } else {
                LOG.error("Couldn't bulk index " + count + " messages.", e);
            }
            throw new RuntimeException(e);
        }
    }

    private List<String> propagateFailure(List<BulkResult.BulkResultItem> items, List<Map.Entry<IndexSet, Message>> messageList) {
        Map messageMap = messageList.stream().map(Map.Entry::getValue).distinct().collect(Collectors.toMap(Message::getId, Function.identity()));
        ArrayList<String> failedMessageIds = new ArrayList<String>(items.size());
        ArrayList<IndexFailureImpl> indexFailures = new ArrayList<IndexFailureImpl>(items.size());
        for (BulkResult.BulkResultItem item : items) {
            LOG.warn("Failed to index message: index=<{}> id=<{}> error=<{}>", new Object[]{item.index, item.id, item.error});
            Message messageEntry = (Message)messageMap.get(item.id);
            ImmutableMap doc = ImmutableMap.builder().put((Object)"letter_id", (Object)item.id).put((Object)"index", (Object)item.index).put((Object)"type", (Object)item.type).put((Object)"message", (Object)item.error).put((Object)"timestamp", (Object)messageEntry.getTimestamp()).build();
            indexFailures.add(new IndexFailureImpl((Map<String, Object>)doc));
            failedMessageIds.add(item.id);
        }
        try {
            this.indexFailureQueue.offer(indexFailures, 25L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("Couldn't save index failures.", (Throwable)e);
        }
        return failedMessageIds;
    }

    public Index prepareIndexRequest(String index, Map<String, Object> source, String id) {
        source.remove("_id");
        return ((Index.Builder)((Index.Builder)((Index.Builder)new Index.Builder(source).index(index)).type("message")).id(id)).build();
    }

    public LinkedBlockingQueue<List<IndexFailure>> getIndexFailureQueue() {
        return this.indexFailureQueue;
    }

    private class EntityTooLargeException
    extends Exception {
        private final int indexedSuccessfully;
        private final List<BulkResult.BulkResultItem> failedItems;

        public EntityTooLargeException(int indexedSuccessfully, List<BulkResult.BulkResultItem> failedItems) {
            this.indexedSuccessfully = indexedSuccessfully;
            this.failedItems = failedItems;
        }
    }
}

