/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.messaging.netty;

import java.io.IOException;
import org.apache.storm.messaging.netty.ControlMessage;
import org.apache.storm.messaging.netty.ISaslServer;
import org.apache.storm.messaging.netty.SaslMessageToken;
import org.apache.storm.messaging.netty.SaslNettyServer;
import org.apache.storm.messaging.netty.SaslNettyServerState;
import org.apache.storm.shade.io.netty.channel.Channel;
import org.apache.storm.shade.io.netty.channel.ChannelHandler;
import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SaslStormServerHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(SaslStormServerHandler.class);
    private final ISaslServer server;
    private byte[] token;
    private String topologyName;

    public SaslStormServerHandler(ISaslServer server) throws IOException {
        this.server = server;
        this.getSASLCredentials();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg == null) {
            return;
        }
        Channel channel = ctx.channel();
        if (msg instanceof ControlMessage && msg == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
            SaslNettyServer saslNettyServer = (SaslNettyServer)channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
            if (saslNettyServer == null) {
                LOG.debug("No saslNettyServer for " + channel + " yet; creating now, with topology token: " + this.topologyName);
                try {
                    saslNettyServer = new SaslNettyServer(this.topologyName, this.token);
                    LOG.debug("SaslNettyServer for " + channel + "created with topology token: " + this.topologyName);
                }
                catch (IOException e) {
                    LOG.error("Error occurred while creating saslNettyServer on server " + channel.localAddress() + " for client " + channel.remoteAddress());
                    throw new IllegalStateException("Failed to set SaslNettyServerState.SASL_NETTY_SERVER");
                }
                channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).set((Object)saslNettyServer);
            } else {
                LOG.debug("Found existing saslNettyServer on server:" + channel.localAddress() + " for client " + channel.remoteAddress());
            }
            LOG.debug("processToken:  With nettyServer: " + saslNettyServer + " and token length: " + this.token.length);
            SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(saslNettyServer.response(new byte[0]));
            channel.writeAndFlush((Object)saslTokenMessageRequest, channel.voidPromise());
            return;
        }
        if (msg instanceof SaslMessageToken) {
            SaslNettyServer saslNettyServer = (SaslNettyServer)channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
            if (saslNettyServer == null) {
                throw new Exception("saslNettyServer was unexpectedly null for channel: " + channel);
            }
            SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(saslNettyServer.response(((SaslMessageToken)msg).getSaslToken()));
            channel.writeAndFlush((Object)saslTokenMessageRequest, channel.voidPromise());
            if (saslNettyServer.isComplete()) {
                LOG.debug("SASL authentication is complete for client with username: " + saslNettyServer.getUserName());
                channel.writeAndFlush((Object)ControlMessage.SASL_COMPLETE_REQUEST, channel.voidPromise());
                LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
                ctx.pipeline().remove((ChannelHandler)this);
                this.server.authenticated(channel);
            }
        } else {
            LOG.warn("Sending upstream an unexpected non-SASL message :  " + msg);
            ctx.fireChannelRead(msg);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

    private void getSASLCredentials() throws IOException {
        this.topologyName = this.server.name();
        String secretKey = this.server.secretKey();
        if (secretKey != null) {
            this.token = secretKey.getBytes();
        }
        LOG.debug("SASL credentials for storm topology {} is {}", (Object)this.topologyName, (Object)secretKey);
    }
}

