package org.apache.nifi.processors.standard;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/processors/standard/WaitNotifyProtocol.class */
public class WaitNotifyProtocol {
    public static final String DEFAULT_COUNT_NAME = "default";
    public static final String CONSUMED_COUNT_NAME = "consumed";
    private static final int MAX_REPLACE_RETRY_COUNT = 5;
    private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
    private final Deserializer<String> stringDeserializer = bArr -> {
        if (bArr == null) {
            return null;
        }
        return new String(bArr, StandardCharsets.UTF_8);
    };
    private final AtomicDistributedMapCacheClient cache;
    private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final Serializer<String> stringSerializer = (str, outputStream) -> {
        if (str != null) {
            outputStream.write(str.getBytes(StandardCharsets.UTF_8));
        }
    };

    /* loaded from: input_file:org/apache/nifi/processors/standard/WaitNotifyProtocol$Signal.class */
    public static class Signal {
        private transient String identifier;
        private transient AtomicCacheEntry<String, String, Object> cachedEntry;
        private Map<String, Long> counts = new HashMap();
        private Map<String, String> attributes = new HashMap();
        private int releasableCount = 0;

        public Map<String, Long> getCounts() {
            return this.counts;
        }

        public void setCounts(Map<String, Long> map) {
            this.counts = map;
        }

        public Map<String, String> getAttributes() {
            return this.attributes;
        }

        public void setAttributes(Map<String, String> map) {
            this.attributes = map;
        }

        @JsonIgnore
        public long getTotalCount() {
            return this.counts.values().stream().mapToLong((v0) -> {
                return v0.longValue();
            }).sum();
        }

        public boolean isTotalCountReached(long j) {
            return getTotalCount() >= j;
        }

        public boolean isCountReached(String str, long j) {
            return getCount(str) >= j;
        }

        public long getCount(String str) {
            if (str == null || str.isEmpty()) {
                return getTotalCount();
            }
            Long l = this.counts.get(str);
            if (l != null) {
                return l.longValue();
            }
            return 0L;
        }

        public int getReleasableCount() {
            return this.releasableCount;
        }

        public void setReleasableCount(int i) {
            this.releasableCount = i;
        }

        public <E> void releaseCandidates(String str, long j, int i, List<E> list, Consumer<List<E>> consumer, Consumer<List<E>> consumer2) {
            int size = list.size();
            if (this.releasableCount < size) {
                long count = getCount(str);
                this.releasableCount = (int) (this.releasableCount + ((count / j) * i));
                long j2 = count % j;
                if (str == null || str.isEmpty()) {
                    this.counts.put(WaitNotifyProtocol.CONSUMED_COUNT_NAME, Long.valueOf(this.counts.getOrDefault(WaitNotifyProtocol.CONSUMED_COUNT_NAME, 0L).longValue() - (count - j2)));
                } else {
                    this.counts.put(str, Long.valueOf(j2));
                }
            }
            int min = Math.min(this.releasableCount, size);
            consumer.accept(list.subList(0, min));
            consumer2.accept(list.subList(min, size));
            this.releasableCount -= min;
        }
    }

    public WaitNotifyProtocol(AtomicDistributedMapCacheClient atomicDistributedMapCacheClient) {
        this.cache = atomicDistributedMapCacheClient;
    }

    public Signal notify(String str, Map<String, Integer> map, Map<String, String> map2) throws IOException, ConcurrentModificationException {
        for (int i = 0; i < 5; i++) {
            Signal signal = getSignal(str);
            Signal signal2 = signal != null ? signal : new Signal();
            signal2.identifier = str;
            if (map2 != null) {
                signal2.attributes.putAll(map2);
            }
            map.forEach((str2, num) -> {
                signal2.counts.put(str2, Long.valueOf(num.intValue() == 0 ? 0L : (signal2.counts.containsKey(str2) ? ((Long) signal2.counts.get(str2)).longValue() : 0L) + num.intValue()));
            });
            if (replace(signal2)) {
                return signal2;
            }
            long j = 10 * (i + 1);
            logger.info("Waiting for {} ms to retry... {}.{}", new Object[]{Long.valueOf(j), str, map});
            try {
                Thread.sleep(j);
            } catch (InterruptedException e) {
                throw new ConcurrentModificationException(String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", str, map), e);
            }
        }
        throw new ConcurrentModificationException(String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", str, map, 5));
    }

    public Signal notify(String str, String str2, int i, Map<String, String> map) throws IOException, ConcurrentModificationException {
        HashMap hashMap = new HashMap();
        hashMap.put(str2, Integer.valueOf(i));
        return notify(str, hashMap, map);
    }

    public Signal getSignal(String str) throws IOException, DeserializationException {
        AtomicCacheEntry fetch = this.cache.fetch(str, stringSerializer, this.stringDeserializer);
        if (fetch == null) {
            return null;
        }
        String str2 = (String) fetch.getValue();
        try {
            Signal signal = (Signal) objectMapper.readValue(str2, Signal.class);
            signal.identifier = str;
            signal.cachedEntry = fetch;
            return signal;
        } catch (JsonParseException e) {
            try {
                Map<String, String> m144deserialize = new FlowFileAttributesSerializer().m144deserialize(str2.getBytes(StandardCharsets.UTF_8));
                Signal signal2 = new Signal();
                signal2.identifier = str;
                signal2.setAttributes(m144deserialize);
                signal2.getCounts().put("default", 1L);
                return signal2;
            } catch (Exception e2) {
                throw new DeserializationException(String.format("Cached value for %s was not a serialized Signal nor FlowFileAttributes. Error messages: \"%s\", \"%s\"", str, e.getMessage(), e2.getMessage()));
            }
        }
    }

    public void complete(String str) throws IOException {
        this.cache.remove(str, stringSerializer);
    }

    public boolean replace(Signal signal) throws IOException {
        String writeValueAsString = objectMapper.writeValueAsString(signal);
        if (signal.cachedEntry == null) {
            signal.cachedEntry = new AtomicCacheEntry(signal.identifier, writeValueAsString, (Object) null);
        } else {
            signal.cachedEntry.setValue(writeValueAsString);
        }
        return this.cache.replace(signal.cachedEntry, stringSerializer, stringSerializer);
    }
}
