package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.event.EventBus;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.message.DcpCloseStreamResponse;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosResponse;
import com.couchbase.client.dcp.message.DcpOpenStreamResponse;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.util.concurrent.Promise;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/DcpChannelControlHandler.class */
public class DcpChannelControlHandler implements ControlEventHandler {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DcpChannelControlHandler.class);
    private final DcpChannel dcpChannel;
    private final ControlEventHandler controlEventHandler;
    private final EventBus eventBus;

    public DcpChannelControlHandler(DcpChannel dcpChannel) {
        this.dcpChannel = dcpChannel;
        this.controlEventHandler = dcpChannel.env.controlEventHandler();
        this.eventBus = dcpChannel.env.eventBus();
    }

    @Override // com.couchbase.client.dcp.ControlEventHandler
    public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
        if (DcpOpenStreamResponse.is(byteBuf)) {
            filterOpenStreamResponse(channelFlowController, byteBuf);
            return;
        }
        if (DcpFailoverLogResponse.is(byteBuf)) {
            filterFailoverLogResponse(byteBuf);
            return;
        }
        if (DcpStreamEndMessage.is(byteBuf)) {
            filterDcpStreamEndMessage(channelFlowController, byteBuf);
            return;
        }
        if (DcpCloseStreamResponse.is(byteBuf)) {
            filterDcpCloseStreamResponse(channelFlowController, byteBuf);
        } else if (DcpGetPartitionSeqnosResponse.is(byteBuf)) {
            filterDcpGetPartitionSeqnosResponse(byteBuf);
        } else {
            this.controlEventHandler.onEvent(channelFlowController, byteBuf);
        }
    }

    private boolean filterOpenStreamResponse(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
        try {
            Promise<?> remove = this.dcpChannel.outstandingPromises.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf)));
            short shortValue = this.dcpChannel.outstandingVbucketInfos.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf))).shortValue();
            short status = MessageUtil.getStatus(byteBuf);
            switch (status) {
                case ClientEnvironment.DEFAULT_SSL_ENABLED /* 0 */:
                    remove.setSuccess((Object) null);
                    ByteBuf buffer = Unpooled.buffer();
                    DcpFailoverLogResponse.init(buffer);
                    DcpFailoverLogResponse.vbucket(buffer, DcpOpenStreamResponse.vbucket(byteBuf));
                    ByteBuf writeShort = MessageUtil.getContent(byteBuf).copy().writeShort(shortValue);
                    MessageUtil.setContent(writeShort, buffer);
                    writeShort.release();
                    this.controlEventHandler.onEvent(channelFlowController, buffer);
                    break;
                case XERROR:
                    remove.setFailure(new NotMyVbucketException());
                    break;
                case 35:
                    remove.setFailure(new RollbackException());
                    ByteBuf buffer2 = Unpooled.buffer();
                    RollbackMessage.init(buffer2, shortValue, DcpOpenStreamResponse.rollbackSeqno(byteBuf));
                    this.controlEventHandler.onEvent(channelFlowController, buffer2);
                    break;
                default:
                    remove.setFailure(new IllegalStateException("Unhandled Status: " + ((int) status)));
                    break;
            }
            return false;
        } finally {
            byteBuf.release();
        }
    }

    private void filterDcpGetPartitionSeqnosResponse(ByteBuf byteBuf) {
        try {
            this.dcpChannel.outstandingPromises.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf))).setSuccess(MessageUtil.getContent(byteBuf).copy());
        } finally {
            byteBuf.release();
        }
    }

    private void filterFailoverLogResponse(ByteBuf byteBuf) {
        try {
            Promise<?> remove = this.dcpChannel.outstandingPromises.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf)));
            short shortValue = this.dcpChannel.outstandingVbucketInfos.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf))).shortValue();
            ByteBuf buffer = Unpooled.buffer();
            DcpFailoverLogResponse.init(buffer);
            DcpFailoverLogResponse.vbucket(buffer, DcpFailoverLogResponse.vbucket(byteBuf));
            ByteBuf writeShort = MessageUtil.getContent(byteBuf).copy().writeShort(shortValue);
            MessageUtil.setContent(writeShort, buffer);
            writeShort.release();
            remove.setSuccess(buffer);
            byteBuf.release();
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    private void filterDcpStreamEndMessage(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
        try {
            short vbucket = DcpStreamEndMessage.vbucket(byteBuf);
            StreamEndReason reason = DcpStreamEndMessage.reason(byteBuf);
            LOGGER.debug("Server closed Stream on vbid {} with reason {}", Short.valueOf(vbucket), reason);
            if (this.eventBus != null) {
                this.eventBus.publish(new StreamEndEvent(vbucket, reason));
            }
            this.dcpChannel.openStreams.set(vbucket, 0);
            this.dcpChannel.conductor.maybeMovePartition(vbucket);
            channelFlowController.ack(byteBuf);
            byteBuf.release();
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    private void filterDcpCloseStreamResponse(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
        try {
            this.dcpChannel.outstandingPromises.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf))).setSuccess((Object) null);
            channelFlowController.ack(byteBuf);
            byteBuf.release();
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }
}
