/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.PublishResult;

public class InFlightMessageTracker {
    private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<FlowFile, Counts>();
    private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<FlowFile, Exception>();
    private final Object progressMutex = new Object();
    private final ComponentLog logger;

    public InFlightMessageTracker(ComponentLog logger) {
        this.logger = logger;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void incrementAcknowledgedCount(FlowFile flowFile) {
        Counts counter = this.messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
        counter.incrementAcknowledgedCount();
        Object object = this.progressMutex;
        synchronized (object) {
            this.progressMutex.notify();
        }
    }

    public void trackEmpty(FlowFile flowFile) {
        this.messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
    }

    public int getAcknowledgedCount(FlowFile flowFile) {
        Counts counter = (Counts)this.messageCountsByFlowFile.get(flowFile);
        return counter == null ? 0 : counter.getAcknowledgedCount();
    }

    public void incrementSentCount(FlowFile flowFile) {
        Counts counter = this.messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
        counter.incrementSentCount();
    }

    public int getSentCount(FlowFile flowFile) {
        Counts counter = (Counts)this.messageCountsByFlowFile.get(flowFile);
        return counter == null ? 0 : counter.getSentCount();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void fail(FlowFile flowFile, Exception exception) {
        this.failures.putIfAbsent(flowFile, exception);
        this.logger.error("Failed to send " + flowFile + " to Kafka", (Throwable)exception);
        Object object = this.progressMutex;
        synchronized (object) {
            this.progressMutex.notify();
        }
    }

    public Exception getFailure(FlowFile flowFile) {
        return (Exception)this.failures.get(flowFile);
    }

    public boolean isFailed(FlowFile flowFile) {
        return this.getFailure(flowFile) != null;
    }

    public void reset() {
        this.messageCountsByFlowFile.clear();
        this.failures.clear();
    }

    public PublishResult failOutstanding(Exception exception) {
        this.messageCountsByFlowFile.keySet().stream().filter(ff -> !this.isComplete((FlowFile)ff)).filter(ff -> !this.failures.containsKey(ff)).forEach(ff -> this.failures.put((FlowFile)ff, exception));
        return this.createPublishResult();
    }

    private boolean isComplete(FlowFile flowFile) {
        Counts counts = (Counts)this.messageCountsByFlowFile.get(flowFile);
        if (counts.getAcknowledgedCount() == counts.getSentCount()) {
            return true;
        }
        return this.failures.containsKey(flowFile);
    }

    private boolean isComplete() {
        return this.messageCountsByFlowFile.keySet().stream().allMatch(flowFile -> this.isComplete((FlowFile)flowFile));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void awaitCompletion(long millis) throws InterruptedException, TimeoutException {
        long startTime = System.nanoTime();
        long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
        while (System.nanoTime() < maxTime) {
            Object object = this.progressMutex;
            synchronized (object) {
                if (this.isComplete()) {
                    return;
                }
                this.progressMutex.wait(millis);
            }
        }
        throw new TimeoutException();
    }

    PublishResult createPublishResult() {
        return new PublishResult(){

            @Override
            public boolean isFailure() {
                return !InFlightMessageTracker.this.failures.isEmpty();
            }

            @Override
            public int getSuccessfulMessageCount(FlowFile flowFile) {
                return InFlightMessageTracker.this.getAcknowledgedCount(flowFile);
            }

            @Override
            public Exception getReasonForFailure(FlowFile flowFile) {
                return InFlightMessageTracker.this.getFailure(flowFile);
            }
        };
    }

    public static class Counts {
        private final AtomicInteger sentCount = new AtomicInteger(0);
        private final AtomicInteger acknowledgedCount = new AtomicInteger(0);

        public void incrementSentCount() {
            this.sentCount.incrementAndGet();
        }

        public void incrementAcknowledgedCount() {
            this.acknowledgedCount.incrementAndGet();
        }

        public int getAcknowledgedCount() {
            return this.acknowledgedCount.get();
        }

        public int getSentCount() {
            return this.sentCount.get();
        }
    }
}

