package org.graylog.integrations.ipfix.codecs;

import com.github.joschi.jadconfig.util.Size;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.graylog.integrations.ipfix.InformationElementDefinitions;
import org.graylog.integrations.ipfix.IpfixJournal;
import org.graylog.integrations.ipfix.IpfixParser;
import org.graylog.integrations.ipfix.ShallowDataSet;
import org.graylog.integrations.ipfix.ShallowTemplateSet;
import org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog/integrations/ipfix/codecs/IpfixAggregator.class */
public class IpfixAggregator implements RemoteAddressCodecAggregator {
    private static final Logger LOG = LoggerFactory.getLogger(IpfixAggregator.class);
    private final IpfixParser shallowParser = new IpfixParser(InformationElementDefinitions.empty());
    private final Cache<TemplateKey, ShallowTemplateSet.Record> templateCache = CacheBuilder.newBuilder().maximumSize(5000).removalListener(removalNotification -> {
        LOG.debug("Removed [{}] from template cache for reason [{}]", removalNotification.getKey(), removalNotification.getCause());
    }).recordStats().build();
    private final Cache<TemplateKey, Queue<ShallowDataSet>> packetCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).maximumWeight(Size.megabytes(1).toBytes()).removalListener(removalNotification -> {
        LOG.debug("Removed [{}] from packet cache for reason [{}]", removalNotification.getKey(), removalNotification.getCause());
    }).weigher((templateKey, queue) -> {
        return ((Integer) queue.stream().map(shallowDataSet -> {
            return Integer.valueOf(shallowDataSet.content().length);
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }).recordStats().build();

    @Override // org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator
    @Nonnull
    public CodecAggregator.Result addChunk(ByteBuf byteBuf, @Nullable SocketAddress socketAddress) {
        if (!byteBuf.isReadable(2)) {
            return new CodecAggregator.Result(null, false);
        }
        try {
            IpfixParser.MessageDescription shallowParseMessage = this.shallowParser.shallowParseMessage(byteBuf);
            long observationDomainId = shallowParseMessage.getHeader().observationDomainId();
            addTemplateKeyInCache(socketAddress, shallowParseMessage, observationDomainId);
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            if (!shallowParseMessage.declaredTemplateIds().isEmpty()) {
                HashSet hashSet3 = new HashSet();
                collectAllTemplateIds(socketAddress, observationDomainId, hashSet3);
                handleBufferedPackets(hashSet, hashSet2, hashSet3, (Queue) this.packetCache.getIfPresent(TemplateKey.idForExporter(socketAddress, observationDomainId)));
            }
            boolean z = false;
            HashSet hashSet4 = new HashSet(this.templateCache.asMap().keySet());
            hashSet2.addAll(shallowParseMessage.referencedTemplateIds());
            LOG.debug("Finding the needed templates for the buffered and current packets");
            Iterator<Integer> it = hashSet2.iterator();
            while (it.hasNext()) {
                TemplateKey templateKey = new TemplateKey(socketAddress, observationDomainId, it.next().intValue());
                if (this.templateCache.getIfPresent(templateKey) == null) {
                    LOG.debug("Template is null, packet needs to be buffered until templates have been received.");
                    try {
                        Queue queue = (Queue) this.packetCache.get(TemplateKey.idForExporter(socketAddress, observationDomainId), ConcurrentLinkedQueue::new);
                        ByteBufUtil.getBytes(byteBuf);
                        queue.addAll(shallowParseMessage.dataSets());
                        z = true;
                    } catch (ExecutionException e) {
                    }
                } else {
                    LOG.debug("Template [{}] has been added to template list.", templateKey);
                    hashSet4.add(templateKey);
                    hashSet.addAll(shallowParseMessage.dataSets());
                }
            }
            if (z) {
                LOG.debug("Packet has been buffered and will not be processed now, returning result.");
                return new CodecAggregator.Result(null, true);
            }
            if (hashSet.isEmpty()) {
                LOG.debug("Packet has not been buffered and no packet is queued.");
                return new CodecAggregator.Result(null, true);
            }
            IpfixJournal.RawIpfix.Builder newBuilder = IpfixJournal.RawIpfix.newBuilder();
            buildJournalObject(hashSet, hashSet4, newBuilder);
            return getCompleteResult(newBuilder.m202build());
        } catch (Exception e2) {
            LOG.error("Unable to aggregate IPFIX message due to the following error ", e2);
            return new CodecAggregator.Result(null, false);
        }
    }

    public void buildJournalObject(Set<ShallowDataSet> set, Set<TemplateKey> set2, IpfixJournal.RawIpfix.Builder builder) {
        LOG.debug("Assembling the packet with necessary templates and data records which include the templates needed.");
        for (TemplateKey templateKey : set2) {
            builder.putTemplates(templateKey.getTemplateId(), ByteString.copyFrom(((ShallowTemplateSet.Record) this.templateCache.getIfPresent(templateKey)).getRecordBytes()));
        }
        LOG.debug("IPFIX data set has been processed for the same template id, adding data set to IPFIX journal.");
        for (ShallowDataSet shallowDataSet : set) {
            builder.addDataSets(IpfixJournal.DataSet.newBuilder().setTemplateId(shallowDataSet.templateId()).setTimestampEpochSeconds(shallowDataSet.epochSeconds()).setDataRecords(ByteString.copyFrom(shallowDataSet.content())).m155build());
        }
    }

    public CodecAggregator.Result getCompleteResult(IpfixJournal.RawIpfix rawIpfix) {
        LOG.debug("Raw ipfix object complete, returning result.");
        return new CodecAggregator.Result(Unpooled.wrappedBuffer(rawIpfix.toByteArray()), true);
    }

    public void handleBufferedPackets(Set<ShallowDataSet> set, Set<Integer> set2, Set<Integer> set3, Queue<ShallowDataSet> queue) {
        if (queue != null) {
            LOG.debug("Buffered packets detected in the packet cache.");
            ArrayList arrayList = new ArrayList(queue.size());
            int i = 0;
            while (true) {
                ShallowDataSet poll = queue.poll();
                if (null == poll) {
                    break;
                }
                if (set3.contains(Integer.valueOf(poll.templateId()))) {
                    LOG.debug("Packet contains template id from a known template, adding to packets to send set.");
                    set.add(poll);
                    set2.add(Integer.valueOf(poll.templateId()));
                    i++;
                } else {
                    LOG.debug("Packet contains unknown template id, adding to temporary queue.");
                    arrayList.add(poll);
                }
            }
            LOG.debug("Processing [{}] previously buffered packets, [{}] packets require more templates.", Integer.valueOf(i), Integer.valueOf(arrayList.size()));
            if (arrayList.isEmpty()) {
                return;
            }
            LOG.debug("Buffered packets could not be processed, adding to temporary queue to wait for more templates.");
            queue.addAll(arrayList);
        }
    }

    public void collectAllTemplateIds(@Nullable SocketAddress socketAddress, long j, Set<Integer> set) {
        LOG.debug("Collecting all templateIds from templateKeys stored in templateCache.");
        for (TemplateKey templateKey : this.templateCache.asMap().keySet()) {
            if (templateKey.getRemoteAddress() == socketAddress && templateKey.getObservationDomainId() == j) {
                set.add(Integer.valueOf(templateKey.getTemplateId()));
            }
        }
    }

    public void addTemplateKeyInCache(@Nullable SocketAddress socketAddress, IpfixParser.MessageDescription messageDescription, long j) {
        for (Integer num : messageDescription.declaredTemplateIds()) {
            TemplateKey templateKey = new TemplateKey(socketAddress, j, num.intValue());
            LOG.debug("Created template key with remote address [{}], observation domain ID [{}] and template ID [{}].", new Object[]{templateKey.getRemoteAddress(), Long.valueOf(templateKey.getObservationDomainId()), Integer.valueOf(templateKey.getTemplateId())});
            this.templateCache.put(templateKey, messageDescription.getTemplateRecord(num.intValue()));
            LOG.debug("Saving templates key (raw bytes) in template cache to combine in new message later.");
        }
    }
}
