package com.couchbase.client.dcp.transport.netty;

import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.message.DcpCloseStreamResponse;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpNoopRequest;
import com.couchbase.client.dcp.message.DcpNoopResponse;
import com.couchbase.client.dcp.message.DcpOpenStreamResponse;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerMessage;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.ChannelDuplexHandler;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import rx.subjects.Subject;

/* loaded from: input_file:com/couchbase/client/dcp/transport/netty/DcpMessageHandler.class */
public class DcpMessageHandler extends ChannelDuplexHandler {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DcpMessageHandler.class);
    private final DataEventHandler dataEventHandler;
    private final Subject<ByteBuf, ByteBuf> controlEvents;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DcpMessageHandler(DataEventHandler dataEventHandler, Subject<ByteBuf, ByteBuf> subject) {
        this.dataEventHandler = dataEventHandler;
        this.controlEvents = subject;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        ByteBuf byteBuf = (ByteBuf) obj;
        if (isDataMessage(byteBuf)) {
            this.dataEventHandler.onEvent(byteBuf);
            return;
        }
        if (isControlMessage(byteBuf)) {
            this.controlEvents.onNext(byteBuf);
            return;
        }
        if (!DcpNoopRequest.is(byteBuf)) {
            LOGGER.warn("Unknown DCP Message, ignoring. \n{}", MessageUtil.humanize(byteBuf));
            return;
        }
        ByteBuf buffer = channelHandlerContext.alloc().buffer();
        DcpNoopResponse.init(buffer);
        MessageUtil.setOpaque(MessageUtil.getOpaque(byteBuf), buffer);
        channelHandlerContext.writeAndFlush(buffer);
    }

    private static boolean isControlMessage(ByteBuf byteBuf) {
        return DcpOpenStreamResponse.is(byteBuf) || DcpStreamEndMessage.is(byteBuf) || DcpSnapshotMarkerMessage.is(byteBuf) || DcpFailoverLogResponse.is(byteBuf) || DcpCloseStreamResponse.is(byteBuf) || DcpGetPartitionSeqnosResponse.is(byteBuf);
    }

    private static boolean isDataMessage(ByteBuf byteBuf) {
        return DcpMutationMessage.is(byteBuf) || DcpDeletionMessage.is(byteBuf) || DcpExpirationMessage.is(byteBuf);
    }
}
