package org.graylog.plugins.netflow.codecs;

import com.github.joschi.jadconfig.util.Size;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import org.graylog.plugins.netflow.v9.NetFlowV9Journal;
import org.graylog.plugins.netflow.v9.NetFlowV9Parser;
import org.graylog.plugins.netflow.v9.RawNetFlowV9Packet;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog/plugins/netflow/codecs/NetflowV9CodecAggregator.class */
public class NetflowV9CodecAggregator implements RemoteAddressCodecAggregator {
    private static final Logger LOG = LoggerFactory.getLogger(NetflowV9CodecAggregator.class);
    private static final ByteBuf PASSTHROUGH_MARKER = Unpooled.wrappedBuffer(new byte[]{0});
    private final Cache<TemplateKey, TemplateBytes> templateCache = CacheBuilder.newBuilder().maximumSize(5000).removalListener(removalNotification -> {
        LOG.debug("Removed {} from template cache for reason {}", removalNotification.getKey(), removalNotification.getCause());
    }).recordStats().build();
    private final Cache<TemplateKey, Queue<PacketBytes>> packetCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).maximumWeight(Size.megabytes(1).toBytes()).removalListener(removalNotification -> {
        LOG.debug("Removed {} from packet cache for reason {}", removalNotification.getKey(), removalNotification.getCause());
    }).weigher((templateKey, queue) -> {
        return ((Integer) queue.stream().map((v0) -> {
            return v0.readableBytes();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }).recordStats().build();

    /* loaded from: input_file:org/graylog/plugins/netflow/codecs/NetflowV9CodecAggregator$PacketBytes.class */
    public static class PacketBytes {
        private final byte[] bytes;
        private final Set<Integer> usedTemplates;

        public PacketBytes(byte[] bArr, Set<Integer> set) {
            this.bytes = bArr;
            this.usedTemplates = set;
        }

        public byte[] getBytes() {
            return this.bytes;
        }

        public Set<Integer> getUsedTemplates() {
            return this.usedTemplates;
        }

        public int readableBytes() {
            return this.bytes.length;
        }
    }

    /* loaded from: input_file:org/graylog/plugins/netflow/codecs/NetflowV9CodecAggregator$TemplateBytes.class */
    private static class TemplateBytes {
        private final byte[] bytes;
        private final boolean optionTemplate;

        public TemplateBytes(byte[] bArr, boolean z) {
            this.bytes = bArr;
            this.optionTemplate = z;
        }

        public byte[] getBytes() {
            return this.bytes;
        }

        public boolean isOptionTemplate() {
            return this.optionTemplate;
        }
    }

    @Inject
    public NetflowV9CodecAggregator() {
    }

    @Override // org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator
    @Nonnull
    public CodecAggregator.Result addChunk(ByteBuf byteBuf, SocketAddress socketAddress) {
        if (byteBuf.readableBytes() < 2) {
            return new CodecAggregator.Result(null, false);
        }
        try {
            if (byteBuf.getShort(0) != 9) {
                return new CodecAggregator.Result(Unpooled.copiedBuffer(new ByteBuf[]{PASSTHROUGH_MARKER, byteBuf}), true);
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received V9 packet:\n{}", ByteBufUtil.prettyHexDump(byteBuf));
            }
            RawNetFlowV9Packet parsePacketShallow = NetFlowV9Parser.parsePacketShallow(byteBuf);
            long sourceId = parsePacketShallow.header().sourceId();
            LOG.trace("Incoming NetFlow V9 packet contains: {}", parsePacketShallow);
            for (Map.Entry<Integer, byte[]> entry : parsePacketShallow.templates().entrySet()) {
                this.templateCache.put(new TemplateKey(socketAddress, sourceId, entry.getKey().intValue()), new TemplateBytes(entry.getValue(), false));
            }
            Map.Entry<Integer, byte[]> optionTemplate = parsePacketShallow.optionTemplate();
            if (optionTemplate != null) {
                this.templateCache.put(new TemplateKey(socketAddress, sourceId, optionTemplate.getKey().intValue()), new TemplateBytes(optionTemplate.getValue(), true));
            }
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            if (!parsePacketShallow.templates().isEmpty() || parsePacketShallow.optionTemplate() != null) {
                HashSet hashSet3 = new HashSet();
                for (TemplateKey templateKey : this.templateCache.asMap().keySet()) {
                    if (templateKey.getRemoteAddress() == socketAddress && templateKey.getSourceId() == sourceId) {
                        hashSet3.add(Integer.valueOf(templateKey.getTemplateId()));
                    }
                }
                Queue queue = (Queue) this.packetCache.getIfPresent(TemplateKey.idForExporter(socketAddress, sourceId));
                if (queue != null) {
                    ArrayList arrayList = new ArrayList(queue.size());
                    int i = 0;
                    while (true) {
                        PacketBytes packetBytes = (PacketBytes) queue.poll();
                        if (null == packetBytes) {
                            break;
                        }
                        if (hashSet3.containsAll(packetBytes.getUsedTemplates())) {
                            hashSet.add(Unpooled.wrappedBuffer(packetBytes.getBytes()));
                            hashSet2.addAll(packetBytes.getUsedTemplates());
                            i++;
                        } else {
                            arrayList.add(packetBytes);
                        }
                    }
                    LOG.debug("Processing {} previously buffered packets, {} packets require more templates.", Integer.valueOf(i), Integer.valueOf(arrayList.size()));
                    if (!arrayList.isEmpty()) {
                        queue.addAll(arrayList);
                    }
                }
            }
            boolean z = false;
            HashSet<TemplateKey> hashSet4 = new HashSet();
            hashSet2.addAll(parsePacketShallow.usedTemplates());
            Iterator it = hashSet2.iterator();
            while (it.hasNext()) {
                TemplateKey templateKey2 = new TemplateKey(socketAddress, sourceId, ((Integer) it.next()).intValue());
                if (((TemplateBytes) this.templateCache.getIfPresent(templateKey2)) == null) {
                    try {
                        ((Queue) this.packetCache.get(TemplateKey.idForExporter(socketAddress, sourceId), ConcurrentLinkedQueue::new)).add(new PacketBytes(ByteBufUtil.getBytes(byteBuf), parsePacketShallow.usedTemplates()));
                        z = true;
                    } catch (ExecutionException e) {
                    }
                } else {
                    hashSet4.add(templateKey2);
                    hashSet.add(byteBuf.slice());
                }
            }
            if (!z && !hashSet.isEmpty()) {
                NetFlowV9Journal.RawNetflowV9.Builder newBuilder = NetFlowV9Journal.RawNetflowV9.newBuilder();
                for (TemplateKey templateKey3 : hashSet4) {
                    TemplateBytes templateBytes = (TemplateBytes) this.templateCache.getIfPresent(templateKey3);
                    if (templateBytes == null) {
                        LOG.warn("Template {} expired while processing, discarding netflow packet", templateKey3);
                    } else if (templateBytes.isOptionTemplate()) {
                        LOG.debug("Writing options template flow {}", templateKey3);
                        newBuilder.putOptionTemplate(1, ByteString.copyFrom(templateBytes.getBytes()));
                    } else {
                        LOG.debug("Writing template {}", templateKey3);
                        newBuilder.putTemplates(templateKey3.getTemplateId(), ByteString.copyFrom(templateBytes.getBytes()));
                    }
                }
                Iterator it2 = hashSet.iterator();
                while (it2.hasNext()) {
                    newBuilder.addPackets(ByteString.copyFrom(ByteBufUtil.getBytes((ByteBuf) it2.next())));
                }
                byte[] byteArray = newBuilder.m127build().toByteArray();
                return new CodecAggregator.Result(Unpooled.buffer(byteArray.length + 1).writeByte(1).writeBytes(byteArray), true);
            }
            return new CodecAggregator.Result(null, true);
        } catch (Exception e2) {
            LOG.error("Unexpected failure while aggregating NetFlowV9 packet, discarding packet.", ExceptionUtils.getRootCause(e2));
            return new CodecAggregator.Result(null, false);
        }
    }
}
