/*
 * Decompiled with CFR 0.152.
 */
package io.kroxylicious.proxy.internal;

import io.kroxylicious.proxy.frame.BareSaslRequest;
import io.kroxylicious.proxy.frame.BareSaslResponse;
import io.kroxylicious.proxy.frame.DecodedRequestFrame;
import io.kroxylicious.proxy.frame.DecodedResponseFrame;
import io.kroxylicious.proxy.internal.AuthenticationEvent;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AlterClientQuotasRequestData;
import org.apache.kafka.common.message.AlterConfigsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControlledShutdownRequestData;
import org.apache.kafka.common.message.CreateAclsRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.DeleteAclsRequestData;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteRecordsRequestData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DescribeAclsRequestData;
import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
import org.apache.kafka.common.message.DescribeClusterRequestData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeDelegationTokenRequestData;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
import org.apache.kafka.common.message.DescribeProducersRequestData;
import org.apache.kafka.common.message.DescribeTransactionsRequestData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EnvelopeRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListTransactionsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.ResponseHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.message.StopReplicaRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AllocateProducerIdsRequest;
import org.apache.kafka.common.requests.AlterClientQuotasRequest;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
import org.apache.kafka.common.requests.AlterUserScramCredentialsRequest;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BrokerHeartbeatRequest;
import org.apache.kafka.common.requests.BrokerRegistrationRequest;
import org.apache.kafka.common.requests.ControlledShutdownRequest;
import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DescribeAclsRequest;
import org.apache.kafka.common.requests.DescribeClientQuotasRequest;
import org.apache.kafka.common.requests.DescribeClusterRequest;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeUserScramCredentialsRequest;
import org.apache.kafka.common.requests.ElectLeadersRequest;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EnvelopeRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
import org.apache.kafka.common.requests.ListTransactionsRequest;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetDeleteRequest;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.UnregisterBrokerRequest;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.plain.internals.PlainSaslServerProvider;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.scram.internals.ScramSaslServerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAuthnHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger LOG;
    private final List<String> enabledMechanisms;
    SaslServer saslServer;
    private final Map<String, AuthenticateCallbackHandler> mechanismHandlers;
    State lastSeen;

    public KafkaAuthnHandler(Channel ch, Map<SaslMechanism, AuthenticateCallbackHandler> mechanismHandlers) {
        this(ch, State.START, mechanismHandlers);
    }

    KafkaAuthnHandler(Channel ch, State init, Map<SaslMechanism, AuthenticateCallbackHandler> mechanismHandlers) {
        this.lastSeen = init;
        LOG.debug("{}: Initial state {}", (Object)ch, (Object)this.lastSeen);
        this.mechanismHandlers = mechanismHandlers.entrySet().stream().collect(Collectors.toMap(e -> ((SaslMechanism)((Object)((Object)e.getKey()))).mechanismName(), Map.Entry::getValue));
        this.enabledMechanisms = List.copyOf(this.mechanismHandlers.keySet());
    }

    private InvalidRequestException illegalTransition(State next) {
        InvalidRequestException e = new InvalidRequestException("Illegal state transition from " + String.valueOf((Object)this.lastSeen) + " to " + String.valueOf((Object)next));
        this.lastSeen = State.FAILED;
        return e;
    }

    private void doTransition(Channel channel, State next) {
        State previous = this.lastSeen;
        switch (next.ordinal()) {
            case 1: {
                if (previous == State.START) break;
                throw this.illegalTransition(next);
            }
            case 2: 
            case 3: {
                if (previous == State.START || previous == State.API_VERSIONS) break;
                throw this.illegalTransition(next);
            }
            case 4: {
                if (previous == State.START || previous == State.SASL_HANDSHAKE_v0 || previous == State.UNFRAMED_SASL_AUTHENTICATE) break;
                throw this.illegalTransition(next);
            }
            case 5: {
                if (previous == State.SASL_HANDSHAKE_v1_PLUS || previous == State.FRAMED_SASL_AUTHENTICATE) break;
                throw this.illegalTransition(next);
            }
            case 7: {
                if (previous == State.FRAMED_SASL_AUTHENTICATE || previous == State.UNFRAMED_SASL_AUTHENTICATE) break;
                throw this.illegalTransition(next);
            }
            case 6: {
                break;
            }
            default: {
                throw this.illegalTransition(next);
            }
        }
        LOG.debug("{}: Transition from {} to {}", new Object[]{channel, this.lastSeen, next});
        this.lastSeen = next;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof BareSaslRequest) {
            this.handleBareRequest(ctx, (BareSaslRequest)msg);
        } else if (msg instanceof DecodedRequestFrame) {
            this.handleFramedRequest(ctx, (DecodedRequestFrame)msg);
        } else if (this.lastSeen == State.AUTHN_SUCCESS) {
            ctx.fireChannelRead(msg);
        } else {
            throw new IllegalStateException("Unexpected message " + String.valueOf(msg.getClass()));
        }
    }

    private void handleFramedRequest(ChannelHandlerContext ctx, DecodedRequestFrame<?> frame) throws SaslException {
        switch (frame.apiKey()) {
            case API_VERSIONS: {
                if (this.lastSeen != State.AUTHN_SUCCESS) {
                    this.doTransition(ctx.channel(), State.API_VERSIONS);
                }
                ctx.fireChannelRead(frame);
                return;
            }
            case SASL_HANDSHAKE: {
                this.doTransition(ctx.channel(), frame.apiVersion() == 0 ? State.SASL_HANDSHAKE_v0 : State.SASL_HANDSHAKE_v1_PLUS);
                this.onSaslHandshakeRequest(ctx, frame);
                return;
            }
            case SASL_AUTHENTICATE: {
                this.doTransition(ctx.channel(), State.FRAMED_SASL_AUTHENTICATE);
                this.onSaslAuthenticateRequest(ctx, frame);
                return;
            }
        }
        if (this.lastSeen == State.AUTHN_SUCCESS) {
            ctx.fireChannelRead(frame);
        } else {
            KafkaAuthnHandler.writeFramedResponse(ctx, frame, this.errorResponse(frame, (Throwable)new IllegalSaslStateException("Not authenticated")));
        }
    }

    private void handleBareRequest(ChannelHandlerContext ctx, BareSaslRequest msg) throws SaslException {
        if (this.lastSeen != State.SASL_HANDSHAKE_v0 && this.lastSeen != State.UNFRAMED_SASL_AUTHENTICATE) {
            this.lastSeen = State.FAILED;
            throw new InvalidRequestException("Bare SASL bytes without GSSAPI support or prior SaslHandshake");
        }
        this.doTransition(ctx.channel(), State.UNFRAMED_SASL_AUTHENTICATE);
        this.writeBareResponse(ctx, this.doEvaluateResponse(ctx, msg.bytes()));
    }

    private void writeBareResponse(ChannelHandlerContext ctx, byte[] bytes) throws SaslException {
        ctx.writeAndFlush((Object)new BareSaslResponse(bytes));
    }

    private ApiMessage errorResponse(DecodedRequestFrame<?> frame, Throwable error) {
        Object reqBody = frame.body();
        short apiVersion = frame.apiVersion();
        return (switch (frame.apiKey()) {
            case ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE -> throw new IllegalStateException();
            case ApiKeys.PRODUCE -> new ProduceRequest((ProduceRequestData)reqBody, apiVersion);
            case ApiKeys.FETCH -> new FetchRequest((FetchRequestData)reqBody, apiVersion);
            case ApiKeys.LIST_OFFSETS -> {
                ListOffsetsRequestData listOffsetsRequestData = (ListOffsetsRequestData)reqBody;
                if (listOffsetsRequestData.replicaId() == -1) {
                    yield ListOffsetsRequest.Builder.forConsumer((boolean)true, (IsolationLevel)IsolationLevel.forId((byte)listOffsetsRequestData.isolationLevel()), (boolean)true).build(apiVersion);
                }
                yield ListOffsetsRequest.Builder.forReplica((short)apiVersion, (int)listOffsetsRequestData.replicaId()).build(apiVersion);
            }
            case ApiKeys.METADATA -> new MetadataRequest((MetadataRequestData)reqBody, apiVersion);
            case ApiKeys.OFFSET_COMMIT -> new OffsetCommitRequest((OffsetCommitRequestData)reqBody, apiVersion);
            case ApiKeys.OFFSET_FETCH -> {
                OffsetFetchRequestData offsetFetchRequestData = (OffsetFetchRequestData)reqBody;
                if (offsetFetchRequestData.groups() != null) {
                    yield new OffsetFetchRequest.Builder(offsetFetchRequestData.groups().stream().collect(Collectors.toMap(OffsetFetchRequestData.OffsetFetchRequestGroup::groupId, x -> x.topics().stream().flatMap(t -> t.partitionIndexes().stream().map(p -> new TopicPartition(t.name(), p.intValue()))).collect(Collectors.toList()))), true, false).build(apiVersion);
                }
                if (offsetFetchRequestData.topics() != null) {
                    yield new OffsetFetchRequest.Builder(offsetFetchRequestData.groupId(), offsetFetchRequestData.requireStable(), offsetFetchRequestData.topics().stream().flatMap(x -> x.partitionIndexes().stream().map(p -> new TopicPartition(x.name(), p.intValue()))).collect(Collectors.toList()), false).build(apiVersion);
                }
                throw new IllegalStateException();
            }
            case ApiKeys.FIND_COORDINATOR -> new FindCoordinatorRequest.Builder((FindCoordinatorRequestData)reqBody).build(apiVersion);
            case ApiKeys.JOIN_GROUP -> new JoinGroupRequest((JoinGroupRequestData)reqBody, apiVersion);
            case ApiKeys.HEARTBEAT -> new HeartbeatRequest.Builder((HeartbeatRequestData)reqBody).build(apiVersion);
            case ApiKeys.LEAVE_GROUP -> {
                LeaveGroupRequestData data = (LeaveGroupRequestData)reqBody;
                yield new LeaveGroupRequest.Builder(data.groupId(), data.members()).build(apiVersion);
            }
            case ApiKeys.SYNC_GROUP -> new SyncGroupRequest((SyncGroupRequestData)reqBody, apiVersion);
            case ApiKeys.DESCRIBE_GROUPS -> new DescribeGroupsRequest.Builder((DescribeGroupsRequestData)reqBody).build(apiVersion);
            case ApiKeys.LIST_GROUPS -> new ListGroupsRequest((ListGroupsRequestData)reqBody, apiVersion);
            case ApiKeys.API_VERSIONS -> new ApiVersionsRequest((ApiVersionsRequestData)reqBody, apiVersion);
            case ApiKeys.CREATE_TOPICS -> new CreateTopicsRequest((CreateTopicsRequestData)reqBody, apiVersion);
            case ApiKeys.DELETE_TOPICS -> new DeleteTopicsRequest.Builder((DeleteTopicsRequestData)reqBody).build(apiVersion);
            case ApiKeys.DELETE_RECORDS -> new DeleteRecordsRequest.Builder((DeleteRecordsRequestData)reqBody).build(apiVersion);
            case ApiKeys.INIT_PRODUCER_ID -> new InitProducerIdRequest.Builder((InitProducerIdRequestData)reqBody).build(apiVersion);
            case ApiKeys.OFFSET_FOR_LEADER_EPOCH -> new OffsetsForLeaderEpochRequest((OffsetForLeaderEpochRequestData)reqBody, apiVersion);
            case ApiKeys.ADD_PARTITIONS_TO_TXN -> new AddPartitionsToTxnRequest((AddPartitionsToTxnRequestData)reqBody, apiVersion);
            case ApiKeys.ADD_OFFSETS_TO_TXN -> new AddOffsetsToTxnRequest((AddOffsetsToTxnRequestData)reqBody, apiVersion);
            case ApiKeys.END_TXN -> new EndTxnRequest.Builder((EndTxnRequestData)reqBody).build(apiVersion);
            case ApiKeys.WRITE_TXN_MARKERS -> new WriteTxnMarkersRequest.Builder((WriteTxnMarkersRequestData)reqBody).build(apiVersion);
            case ApiKeys.TXN_OFFSET_COMMIT -> new TxnOffsetCommitRequest((TxnOffsetCommitRequestData)reqBody, apiVersion);
            case ApiKeys.DESCRIBE_ACLS -> {
                DescribeAclsRequestData d = (DescribeAclsRequestData)reqBody;
                yield new DescribeAclsRequest.Builder(new AclBindingFilter(new ResourcePatternFilter(ResourceType.fromCode((byte)d.resourceTypeFilter()), d.resourceNameFilter(), PatternType.fromCode((byte)d.patternTypeFilter())), new AccessControlEntryFilter(d.principalFilter(), d.hostFilter(), AclOperation.fromCode((byte)d.operation()), AclPermissionType.fromCode((byte)d.permissionType())))).build(apiVersion);
            }
            case ApiKeys.CREATE_ACLS -> new CreateAclsRequest.Builder((CreateAclsRequestData)reqBody).build(apiVersion);
            case ApiKeys.DELETE_ACLS -> new DeleteAclsRequest.Builder((DeleteAclsRequestData)reqBody).build(apiVersion);
            case ApiKeys.DESCRIBE_CONFIGS -> new DescribeConfigsRequest((DescribeConfigsRequestData)reqBody, apiVersion);
            case ApiKeys.ALTER_CONFIGS -> new AlterConfigsRequest((AlterConfigsRequestData)reqBody, apiVersion);
            case ApiKeys.ALTER_REPLICA_LOG_DIRS -> new AlterReplicaLogDirsRequest((AlterReplicaLogDirsRequestData)reqBody, apiVersion);
            case ApiKeys.DESCRIBE_LOG_DIRS -> new DescribeLogDirsRequest((DescribeLogDirsRequestData)reqBody, apiVersion);
            case ApiKeys.CREATE_PARTITIONS -> new CreatePartitionsRequest.Builder((CreatePartitionsRequestData)reqBody).build(apiVersion);
            case ApiKeys.CREATE_DELEGATION_TOKEN -> new CreateDelegationTokenRequest.Builder((CreateDelegationTokenRequestData)reqBody).build(apiVersion);
            case ApiKeys.RENEW_DELEGATION_TOKEN -> new RenewDelegationTokenRequest.Builder((RenewDelegationTokenRequestData)reqBody).build(apiVersion);
            case ApiKeys.EXPIRE_DELEGATION_TOKEN -> new ExpireDelegationTokenRequest.Builder((ExpireDelegationTokenRequestData)reqBody).build(apiVersion);
            case ApiKeys.DESCRIBE_DELEGATION_TOKEN -> {
                DescribeDelegationTokenRequestData tokenRequestData = (DescribeDelegationTokenRequestData)reqBody;
                yield new DescribeDelegationTokenRequest.Builder(tokenRequestData.owners().stream().map(o -> new KafkaPrincipal(o.principalType(), o.principalName())).collect(Collectors.toList())).build(apiVersion);
            }
            case ApiKeys.DELETE_GROUPS -> new DeleteGroupsRequest((DeleteGroupsRequestData)reqBody, apiVersion);
            case ApiKeys.ELECT_LEADERS -> {
                ElectLeadersRequestData electLeaders = (ElectLeadersRequestData)reqBody;
                yield new ElectLeadersRequest.Builder(ElectionType.valueOf((byte)electLeaders.electionType()), (Collection)electLeaders.topicPartitions().stream().flatMap(t -> t.partitions().stream().map(p -> new TopicPartition(t.topic(), p.intValue()))).collect(Collectors.toList()), electLeaders.timeoutMs()).build(apiVersion);
            }
            case ApiKeys.INCREMENTAL_ALTER_CONFIGS -> new IncrementalAlterConfigsRequest((IncrementalAlterConfigsRequestData)reqBody, apiVersion);
            case ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> new AlterPartitionReassignmentsRequest.Builder((AlterPartitionReassignmentsRequestData)reqBody).build(apiVersion);
            case ApiKeys.LIST_PARTITION_REASSIGNMENTS -> new ListPartitionReassignmentsRequest.Builder((ListPartitionReassignmentsRequestData)reqBody).build(apiVersion);
            case ApiKeys.OFFSET_DELETE -> new OffsetDeleteRequest((OffsetDeleteRequestData)reqBody, apiVersion);
            case ApiKeys.DESCRIBE_CLIENT_QUOTAS -> new DescribeClientQuotasRequest((DescribeClientQuotasRequestData)reqBody, apiVersion);
            case ApiKeys.ALTER_CLIENT_QUOTAS -> new AlterClientQuotasRequest((AlterClientQuotasRequestData)reqBody, apiVersion);
            case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS -> new DescribeUserScramCredentialsRequest.Builder((DescribeUserScramCredentialsRequestData)reqBody).build(apiVersion);
            case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS -> new AlterUserScramCredentialsRequest.Builder((AlterUserScramCredentialsRequestData)reqBody).build(apiVersion);
            case ApiKeys.DESCRIBE_QUORUM -> new DescribeClientQuotasRequest((DescribeClientQuotasRequestData)reqBody, apiVersion);
            case ApiKeys.ALTER_PARTITION -> new AlterPartitionRequest((AlterPartitionRequestData)reqBody, apiVersion);
            case ApiKeys.UPDATE_FEATURES -> new UpdateFeaturesRequest((UpdateFeaturesRequestData)reqBody, apiVersion);
            case ApiKeys.DESCRIBE_CLUSTER -> new DescribeClusterRequest((DescribeClusterRequestData)reqBody, apiVersion);
            case ApiKeys.DESCRIBE_PRODUCERS -> new DescribeProducersRequest.Builder((DescribeProducersRequestData)reqBody).build(apiVersion);
            case ApiKeys.DESCRIBE_TRANSACTIONS -> new DescribeTransactionsRequest.Builder((DescribeTransactionsRequestData)reqBody).build(apiVersion);
            case ApiKeys.LIST_TRANSACTIONS -> new ListTransactionsRequest.Builder((ListTransactionsRequestData)reqBody).build(apiVersion);
            case ApiKeys.ALLOCATE_PRODUCER_IDS -> new AllocateProducerIdsRequest((AllocateProducerIdsRequestData)reqBody, apiVersion);
            case ApiKeys.VOTE -> new VoteRequest.Builder((VoteRequestData)reqBody).build(apiVersion);
            case ApiKeys.BEGIN_QUORUM_EPOCH -> new BeginQuorumEpochRequest.Builder((BeginQuorumEpochRequestData)reqBody).build(apiVersion);
            case ApiKeys.END_QUORUM_EPOCH -> new EndQuorumEpochRequest.Builder((EndQuorumEpochRequestData)reqBody).build(apiVersion);
            case ApiKeys.ENVELOPE -> new EnvelopeRequest((EnvelopeRequestData)reqBody, apiVersion);
            case ApiKeys.FETCH_SNAPSHOT -> new FetchSnapshotRequest((FetchSnapshotRequestData)reqBody, apiVersion);
            case ApiKeys.LEADER_AND_ISR -> {
                LeaderAndIsrRequestData lisr = (LeaderAndIsrRequestData)reqBody;
                yield new LeaderAndIsrRequest.Builder(apiVersion, lisr.controllerId(), lisr.controllerEpoch(), lisr.brokerEpoch(), lisr.ungroupedPartitionStates(), lisr.topicStates().stream().collect(Collectors.toMap(LeaderAndIsrRequestData.LeaderAndIsrTopicState::topicName, LeaderAndIsrRequestData.LeaderAndIsrTopicState::topicId)), (Collection)lisr.liveLeaders().stream().map(x -> new Node(x.brokerId(), x.hostName(), x.port())).collect(Collectors.toList())).build(apiVersion);
            }
            case ApiKeys.STOP_REPLICA -> {
                StopReplicaRequestData stopReplica = (StopReplicaRequestData)reqBody;
                yield new StopReplicaRequest.Builder(apiVersion, stopReplica.controllerId(), stopReplica.controllerEpoch(), stopReplica.brokerEpoch(), stopReplica.deletePartitions(), stopReplica.topicStates()).build(apiVersion);
            }
            case ApiKeys.UPDATE_METADATA -> new UpdateFeaturesRequest((UpdateFeaturesRequestData)reqBody, apiVersion);
            case ApiKeys.CONTROLLED_SHUTDOWN -> new ControlledShutdownRequest.Builder((ControlledShutdownRequestData)reqBody, apiVersion).build(apiVersion);
            case ApiKeys.BROKER_REGISTRATION -> new BrokerRegistrationRequest((BrokerRegistrationRequestData)reqBody, apiVersion);
            case ApiKeys.BROKER_HEARTBEAT -> new BrokerHeartbeatRequest((BrokerHeartbeatRequestData)reqBody, apiVersion);
            case ApiKeys.UNREGISTER_BROKER -> new UnregisterBrokerRequest((UnregisterBrokerRequestData)reqBody, apiVersion);
            default -> throw new IllegalStateException();
        }).getErrorResponse(error).data();
    }

    private void onSaslHandshakeRequest(ChannelHandlerContext ctx, DecodedRequestFrame<SaslHandshakeRequestData> data) throws SaslException {
        Errors error;
        String mechanism = ((SaslHandshakeRequestData)data.body()).mechanism();
        if (this.lastSeen == State.AUTHN_SUCCESS) {
            error = Errors.ILLEGAL_SASL_STATE;
        } else if (this.enabledMechanisms.contains(mechanism)) {
            AuthenticateCallbackHandler cbh = this.mechanismHandlers.get(mechanism);
            this.saslServer = Sasl.createSaslServer(mechanism, "kafka", null, null, (CallbackHandler)cbh);
            if (this.saslServer == null) {
                throw new IllegalStateException("SASL mechanism had no providers: " + mechanism);
            }
            error = Errors.NONE;
        } else {
            error = Errors.UNSUPPORTED_SASL_MECHANISM;
        }
        SaslHandshakeResponseData body = new SaslHandshakeResponseData().setMechanisms(this.enabledMechanisms).setErrorCode(error.code());
        KafkaAuthnHandler.writeFramedResponse(ctx, data, (ApiMessage)body);
        ctx.channel().read();
    }

    private void onSaslAuthenticateRequest(ChannelHandlerContext ctx, DecodedRequestFrame<SaslAuthenticateRequestData> data) {
        String errorMessage;
        Errors error;
        byte[] bytes = new byte[]{};
        try {
            bytes = this.doEvaluateResponse(ctx, ((SaslAuthenticateRequestData)data.body()).authBytes());
            error = Errors.NONE;
            errorMessage = null;
        }
        catch (SaslAuthenticationException e) {
            error = Errors.SASL_AUTHENTICATION_FAILED;
            errorMessage = e.getMessage();
        }
        catch (SaslException e) {
            error = Errors.SASL_AUTHENTICATION_FAILED;
            errorMessage = "An error occurred";
        }
        SaslAuthenticateResponseData body = new SaslAuthenticateResponseData().setErrorCode(error.code()).setErrorMessage(errorMessage).setAuthBytes(bytes);
        KafkaAuthnHandler.writeFramedResponse(ctx, data, (ApiMessage)body);
        ctx.channel().read();
    }

    private static void writeFramedResponse(ChannelHandlerContext ctx, DecodedRequestFrame<?> data, ApiMessage body) {
        ctx.writeAndFlush(new DecodedResponseFrame<ApiMessage>(data.apiVersion(), data.correlationId(), new ResponseHeaderData().setCorrelationId(data.correlationId()), body));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private byte[] doEvaluateResponse(ChannelHandlerContext ctx, byte[] authBytes) throws SaslException {
        byte[] bytes;
        try {
            bytes = this.saslServer.evaluateResponse(authBytes);
        }
        catch (SaslAuthenticationException e) {
            LOG.debug("{}: Authentication failed", (Object)ctx.channel());
            this.doTransition(ctx.channel(), State.FAILED);
            this.saslServer.dispose();
            throw e;
        }
        catch (Exception e) {
            LOG.debug("{}: Authentication failed", (Object)ctx.channel());
            this.doTransition(ctx.channel(), State.FAILED);
            this.saslServer.dispose();
            throw new SaslAuthenticationException(e.getMessage());
        }
        if (this.saslServer.isComplete()) {
            try {
                String authorizationId = this.saslServer.getAuthorizationID();
                Map<String, Object> properties = SaslMechanism.fromMechanismName(this.saslServer.getMechanismName()).negotiatedProperties(this.saslServer);
                this.doTransition(ctx.channel(), State.AUTHN_SUCCESS);
                LOG.debug("{}: Authentication successful, authorizationId={}, negotiatedProperties={}", new Object[]{ctx.channel(), authorizationId, properties});
                ctx.fireUserEventTriggered((Object)new AuthenticationEvent(authorizationId, properties));
            }
            finally {
                this.saslServer.dispose();
            }
        }
        return bytes;
    }

    static {
        PlainSaslServerProvider.initialize();
        ScramSaslServerProvider.initialize();
        LOG = LoggerFactory.getLogger(KafkaAuthnHandler.class);
    }

    static enum State {
        START,
        API_VERSIONS,
        SASL_HANDSHAKE_v0,
        SASL_HANDSHAKE_v1_PLUS,
        UNFRAMED_SASL_AUTHENTICATE,
        FRAMED_SASL_AUTHENTICATE,
        FAILED,
        AUTHN_SUCCESS;

    }

    /*
     * Uses 'sealed' constructs - enablewith --sealed true
     */
    public static enum SaslMechanism {
        PLAIN("PLAIN", null){

            @Override
            public Map<String, Object> negotiatedProperties(SaslServer saslServer) {
                return Map.of();
            }
        }
        ,
        SCRAM_SHA_256("SCRAM-SHA-256", ScramMechanism.SCRAM_SHA_256){

            @Override
            public Map<String, Object> negotiatedProperties(SaslServer saslServer) {
                Object lifetime = saslServer.getNegotiatedProperty("CREDENTIAL.LIFETIME.MS");
                return lifetime == null ? Map.of() : Map.of("CREDENTIAL.LIFETIME.MS", lifetime);
            }
        }
        ,
        SCRAM_SHA_512("SCRAM-SHA-512", ScramMechanism.SCRAM_SHA_512){

            @Override
            public Map<String, Object> negotiatedProperties(SaslServer saslServer) {
                Object lifetime = saslServer.getNegotiatedProperty("CREDENTIAL.LIFETIME.MS");
                return lifetime == null ? Map.of() : Map.of("CREDENTIAL.LIFETIME.MS", lifetime);
            }
        };

        private final String name;
        private final ScramMechanism scramMechanism;

        private SaslMechanism(String saslName, ScramMechanism scramMechanism) {
            this.name = saslName;
            this.scramMechanism = scramMechanism;
        }

        public String mechanismName() {
            return this.name;
        }

        static SaslMechanism fromMechanismName(String mechanismName) {
            switch (mechanismName) {
                case "PLAIN": {
                    return PLAIN;
                }
                case "SCRAM-SHA-256": {
                    return SCRAM_SHA_256;
                }
                case "SCRAM-SHA-512": {
                    return SCRAM_SHA_512;
                }
            }
            throw new UnsupportedSaslMechanismException(mechanismName);
        }

        public ScramMechanism scramMechanism() {
            return this.scramMechanism;
        }

        public abstract Map<String, Object> negotiatedProperties(SaslServer var1);
    }
}

