/*
 * Decompiled with CFR 0.152.
 */
package io.kroxylicious.proxy.internal;

import io.kroxylicious.proxy.frame.Frame;
import io.kroxylicious.proxy.frame.RequestFrame;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResponseOrderer
extends ChannelDuplexHandler {
    Deque<Integer> inflightCorrelationIds = new ArrayDeque<Integer>();
    Map<Integer, QueuedResponse> queuedResponses = new HashMap<Integer, QueuedResponse>();
    private static final Logger logger = LoggerFactory.getLogger(ResponseOrderer.class);

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof RequestFrame) {
            RequestFrame requestFrame = (RequestFrame)msg;
            if (requestFrame.hasResponse()) {
                this.inflightCorrelationIds.addLast(requestFrame.correlationId());
            }
        } else if (msg instanceof Frame) {
            Frame frame = (Frame)msg;
            this.inflightCorrelationIds.addLast(frame.correlationId());
        }
        super.channelRead(ctx, msg);
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof Frame) {
            Frame responseFrame = (Frame)msg;
            Integer oldestCorrelationId = this.inflightCorrelationIds.peekFirst();
            if (oldestCorrelationId == null) {
                logger.warn("Handling a Frame {}, but we have no inflight correlation ids, continuing to write", msg);
                super.write(ctx, msg, promise);
            } else if (oldestCorrelationId.intValue() == responseFrame.correlationId()) {
                this.inflightCorrelationIds.removeFirst();
                super.write(ctx, msg, promise);
                this.drainQueue(ctx);
            } else {
                this.queuedResponses.put(responseFrame.correlationId(), new QueuedResponse(msg, promise));
            }
        } else {
            super.write(ctx, msg, promise);
        }
    }

    private void drainQueue(ChannelHandlerContext ctx) throws Exception {
        Integer oldestCorrelationId;
        while ((oldestCorrelationId = this.inflightCorrelationIds.peekFirst()) != null && this.queuedResponses.containsKey(oldestCorrelationId)) {
            Integer integer = this.inflightCorrelationIds.removeFirst();
            QueuedResponse thing = this.queuedResponses.remove(integer);
            super.write(ctx, thing.msg, thing.promise);
        }
    }

    int inFlightRequestCount() {
        return this.inflightCorrelationIds.size();
    }

    int queuedResponseCount() {
        return this.queuedResponses.size();
    }

    record QueuedResponse(Object msg, ChannelPromise promise) {
    }
}

