/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.netty.ChunkedByteBuf;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.query.netty.message.KvStateRequest;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
class KvStateServerHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
    private final KvStateRegistry registry;
    private final ExecutorService queryExecutor;
    private final KvStateRequestStats stats;

    KvStateServerHandler(KvStateRegistry kvStateRegistry, ExecutorService queryExecutor, KvStateRequestStats stats) {
        this.registry = Objects.requireNonNull(kvStateRegistry, "KvStateRegistry");
        this.queryExecutor = Objects.requireNonNull(queryExecutor, "Query thread pool");
        this.stats = Objects.requireNonNull(stats, "KvStateRequestStats");
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.stats.reportActiveConnection();
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.stats.reportInactiveConnection();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        KvStateRequest request = null;
        try {
            ByteBuf buf = (ByteBuf)msg;
            KvStateRequestType msgType = KvStateRequestSerializer.deserializeHeader(buf);
            if (msgType == KvStateRequestType.REQUEST) {
                request = KvStateRequestSerializer.deserializeKvStateRequest(buf);
                this.stats.reportRequest();
                InternalKvState<?> kvState = this.registry.getKvState(request.getKvStateId());
                if (kvState != null) {
                    this.queryExecutor.submit(new AsyncKvStateQueryTask(ctx, request, kvState, this.stats));
                } else {
                    ByteBuf unknown = KvStateRequestSerializer.serializeKvStateRequestFailure(ctx.alloc(), request.getRequestId(), new UnknownKvStateID(request.getKvStateId()));
                    ctx.writeAndFlush((Object)unknown);
                    this.stats.reportFailedRequest();
                }
            } else {
                ByteBuf failure = KvStateRequestSerializer.serializeServerFailure(ctx.alloc(), new IllegalArgumentException("Unexpected message type " + (Object)((Object)msgType) + ". KvStateServerHandler expects " + (Object)((Object)KvStateRequestType.REQUEST) + " messages."));
                ctx.writeAndFlush((Object)failure);
            }
        }
        catch (Throwable t) {
            ByteBuf err;
            String stringifiedCause = ExceptionUtils.stringifyException((Throwable)t);
            if (request != null) {
                String errMsg = "Failed to handle incoming request with ID " + request.getRequestId() + ". Caused by: " + stringifiedCause;
                err = KvStateRequestSerializer.serializeKvStateRequestFailure(ctx.alloc(), request.getRequestId(), new RuntimeException(errMsg));
                this.stats.reportFailedRequest();
            } else {
                String errMsg = "Failed to handle incoming message. Caused by: " + stringifiedCause;
                err = KvStateRequestSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(errMsg));
            }
            ctx.writeAndFlush((Object)err);
        }
        finally {
            ReferenceCountUtil.release((Object)msg);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        String stringifiedCause = ExceptionUtils.stringifyException((Throwable)cause);
        String msg = "Exception in server pipeline. Caused by: " + stringifiedCause;
        ByteBuf err = KvStateRequestSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg));
        ctx.writeAndFlush((Object)err).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
    }

    private static class AsyncKvStateQueryTask
    implements Runnable {
        private final ChannelHandlerContext ctx;
        private final KvStateRequest request;
        private final InternalKvState<?> kvState;
        private final KvStateRequestStats stats;
        private final long creationNanos;

        public AsyncKvStateQueryTask(ChannelHandlerContext ctx, KvStateRequest request, InternalKvState<?> kvState, KvStateRequestStats stats) {
            this.ctx = Objects.requireNonNull(ctx, "Channel handler context");
            this.request = Objects.requireNonNull(request, "State query");
            this.kvState = Objects.requireNonNull(kvState, "KvState");
            this.stats = Objects.requireNonNull(stats, "State query stats");
            this.creationNanos = System.nanoTime();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            boolean success = false;
            try {
                if (!this.ctx.channel().isActive()) {
                    return;
                }
                byte[] serializedKeyAndNamespace = this.request.getSerializedKeyAndNamespace();
                byte[] serializedResult = this.kvState.getSerializedValue(serializedKeyAndNamespace);
                if (serializedResult != null) {
                    ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestResult(this.ctx.alloc(), this.request.getRequestId(), serializedResult);
                    int highWatermark = this.ctx.channel().config().getWriteBufferHighWaterMark();
                    ChannelFuture write = buf.readableBytes() <= highWatermark ? this.ctx.writeAndFlush((Object)buf) : this.ctx.writeAndFlush((Object)new ChunkedByteBuf(buf, highWatermark));
                    write.addListener((GenericFutureListener)new QueryResultWriteListener());
                    success = true;
                } else {
                    ByteBuf unknownKey = KvStateRequestSerializer.serializeKvStateRequestFailure(this.ctx.alloc(), this.request.getRequestId(), new UnknownKeyOrNamespace());
                    this.ctx.writeAndFlush((Object)unknownKey);
                }
            }
            catch (Throwable t) {
                try {
                    String stringifiedCause = ExceptionUtils.stringifyException((Throwable)t);
                    String errMsg = "Failed to query state backend for query " + this.request.getRequestId() + ". Caused by: " + stringifiedCause;
                    ByteBuf err = KvStateRequestSerializer.serializeKvStateRequestFailure(this.ctx.alloc(), this.request.getRequestId(), new RuntimeException(errMsg));
                    this.ctx.writeAndFlush((Object)err);
                }
                catch (IOException e) {
                    LOG.error("Failed to respond with the error after failed to query state backend", (Throwable)e);
                }
            }
            finally {
                if (!success) {
                    this.stats.reportFailedRequest();
                }
            }
        }

        public String toString() {
            return "AsyncKvStateQueryTask{, request=" + this.request + ", creationNanos=" + this.creationNanos + '}';
        }

        private class QueryResultWriteListener
        implements ChannelFutureListener {
            private QueryResultWriteListener() {
            }

            public void operationComplete(ChannelFuture future) throws Exception {
                long durationNanos = System.nanoTime() - AsyncKvStateQueryTask.this.creationNanos;
                long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
                if (future.isSuccess()) {
                    AsyncKvStateQueryTask.this.stats.reportSuccessfulRequest(durationMillis);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Query " + AsyncKvStateQueryTask.this.request + " failed after " + durationMillis + " ms", future.cause());
                    }
                    AsyncKvStateQueryTask.this.stats.reportFailedRequest();
                }
            }
        }
    }
}

