package com.couchbase.client.core.endpoint;

import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.RequestCancelledException;
import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.ResponseHandler;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.CoreScheduler;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.metrics.NetworkLatencyMetricsIdentifier;
import com.couchbase.client.core.retry.RetryHelper;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.deps.com.lmax.disruptor.EventSink;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.deps.io.netty.handler.codec.DecoderException;
import com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageCodec;
import com.couchbase.client.deps.io.netty.handler.codec.base64.Base64;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpRequest;
import com.couchbase.client.deps.io.netty.handler.timeout.IdleStateEvent;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import javax.net.ssl.SSLHandshakeException;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subjects.Subject;

/* loaded from: input_file:com/couchbase/client/core/endpoint/AbstractGenericHandler.class */
public abstract class AbstractGenericHandler<RESPONSE, ENCODED, REQUEST extends CouchbaseRequest> extends MessageToMessageCodec<RESPONSE, REQUEST> {
    protected static final Charset CHARSET = CharsetUtil.UTF_8;
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) AbstractGenericHandler.class);
    protected static final byte[] EMPTY_BYTES = new byte[0];
    private final EventSink<ResponseEvent> responseBuffer;
    private final AbstractEndpoint endpoint;
    private final Queue<REQUEST> sentRequestQueue;
    private final Queue<Long> sentRequestTimings;
    private final boolean isTransient;
    private final boolean traceEnabled;
    private final boolean moveResponseOut;
    private final Map<Class<? extends CouchbaseRequest>, String> classNameCache;
    private REQUEST currentRequest;
    private DecodingState currentDecodingState;
    private long currentOpTime;
    private String remoteHostname;
    private ChannelPromise connectFuture;
    private String remoteHttpHost;
    private final int sentQueueLimit;
    private final boolean pipeline;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/couchbase/client/core/endpoint/AbstractGenericHandler$KeepAliveResponseAction.class */
    public class KeepAliveResponseAction extends Subscriber<CouchbaseResponse> {
        private final ChannelHandlerContext ctx;

        KeepAliveResponseAction(ChannelHandlerContext channelHandlerContext) {
            this.ctx = channelHandlerContext;
        }

        public void onCompleted() {
        }

        public void onError(Throwable th) {
            AbstractGenericHandler.LOGGER.warn(AbstractGenericHandler.logIdent(this.ctx, AbstractGenericHandler.this.endpoint) + "Got error while consuming KeepAliveResponse.", th);
        }

        public void onNext(CouchbaseResponse couchbaseResponse) {
            AbstractGenericHandler.this.onKeepAliveResponse(this.ctx, couchbaseResponse);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractGenericHandler(AbstractEndpoint abstractEndpoint, EventSink<ResponseEvent> eventSink, boolean z, boolean z2) {
        this(abstractEndpoint, eventSink, new ArrayDeque(), z, z2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractGenericHandler(AbstractEndpoint abstractEndpoint, EventSink<ResponseEvent> eventSink, Queue<REQUEST> queue, boolean z, boolean z2) {
        this.currentOpTime = -1L;
        this.pipeline = z2;
        this.endpoint = abstractEndpoint;
        this.responseBuffer = eventSink;
        this.sentRequestQueue = queue;
        this.currentDecodingState = DecodingState.INITIAL;
        this.isTransient = z;
        this.traceEnabled = LOGGER.isTraceEnabled();
        this.sentRequestTimings = new ArrayDeque();
        this.classNameCache = new IdentityHashMap();
        this.moveResponseOut = env() == null || !env().callbacksOnIoPool();
        this.sentQueueLimit = Integer.parseInt(System.getProperty("com.couchbase.sentRequestQueueLimit", "5120"));
    }

    protected abstract ENCODED encodeRequest(ChannelHandlerContext channelHandlerContext, REQUEST request) throws Exception;

    protected abstract CouchbaseResponse decodeResponse(ChannelHandlerContext channelHandlerContext, RESPONSE response) throws Exception;

    protected abstract ServiceType serviceType();

    @Override // com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageCodec, com.couchbase.client.deps.io.netty.channel.ChannelDuplexHandler, com.couchbase.client.deps.io.netty.channel.ChannelOutboundHandler
    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        if (!this.pipeline && (!this.sentRequestQueue.isEmpty() || this.currentDecodingState != DecodingState.INITIAL)) {
            if (this.traceEnabled) {
                LOGGER.trace("Rescheduling {} because pipelining disable and a request is in-flight.", obj);
            }
            RetryHelper.retryOrCancel(env(), (CouchbaseRequest) obj, this.responseBuffer);
        } else if (this.sentRequestQueue.size() < this.sentQueueLimit) {
            super.write(channelHandlerContext, obj, channelPromise);
        } else {
            LOGGER.debug("Rescheduling {} because sentRequestQueueLimit reached.", obj);
            RetryHelper.retryOrCancel(env(), (CouchbaseRequest) obj, this.responseBuffer);
        }
    }

    protected void encode(ChannelHandlerContext channelHandlerContext, REQUEST request, List<Object> list) throws Exception {
        ENCODED encodeRequest = encodeRequest(channelHandlerContext, request);
        this.sentRequestQueue.offer(request);
        list.add(encodeRequest);
        this.sentRequestTimings.offer(Long.valueOf(System.nanoTime()));
    }

    @Override // com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageCodec
    protected void decode(ChannelHandlerContext channelHandlerContext, RESPONSE response, List<Object> list) throws Exception {
        if (this.currentDecodingState == DecodingState.INITIAL) {
            initialDecodeTasks(channelHandlerContext);
        }
        try {
            CouchbaseResponse decodeResponse = decodeResponse(channelHandlerContext, response);
            if (decodeResponse != null) {
                publishResponse(decodeResponse, this.currentRequest.observable());
                if (this.currentDecodingState == DecodingState.FINISHED) {
                    writeMetrics(decodeResponse);
                }
            }
        } catch (CouchbaseException e) {
            this.currentRequest.observable().onError(e);
        } catch (Exception e2) {
            this.currentRequest.observable().onError(new CouchbaseException(e2));
        }
        if (this.currentDecodingState == DecodingState.FINISHED) {
            resetStatesAfterDecode(channelHandlerContext);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void writeMetrics(CouchbaseResponse couchbaseResponse) {
        if (this.currentRequest == null || this.currentOpTime < 0 || env() == null || !env().networkLatencyMetricsCollector().isEnabled()) {
            return;
        }
        try {
            Class<?> cls = this.currentRequest.getClass();
            String str = this.classNameCache.get(cls);
            if (str == null) {
                str = cls.getSimpleName();
                this.classNameCache.put(cls, str);
            }
            env().networkLatencyMetricsCollector().record(new NetworkLatencyMetricsIdentifier(this.remoteHostname, serviceType().toString(), str, couchbaseResponse.status().toString()), this.currentOpTime);
        } catch (Throwable th) {
            LOGGER.warn("Could not collect latency metric for request + " + this.currentRequest + "(" + this.currentOpTime + ")", th);
        }
    }

    private void resetStatesAfterDecode(ChannelHandlerContext channelHandlerContext) {
        if (this.traceEnabled) {
            LOGGER.trace("{}Finished decoding of {}", logIdent(channelHandlerContext, this.endpoint), this.currentRequest);
        }
        this.currentRequest = null;
        this.currentDecodingState = DecodingState.INITIAL;
    }

    private void initialDecodeTasks(ChannelHandlerContext channelHandlerContext) {
        this.currentRequest = this.sentRequestQueue.poll();
        this.currentDecodingState = DecodingState.STARTED;
        if (this.currentRequest != null) {
            Long poll = this.sentRequestTimings.poll();
            if (poll != null) {
                this.currentOpTime = System.nanoTime() - poll.longValue();
            } else {
                this.currentOpTime = -1L;
            }
        }
        if (this.traceEnabled) {
            LOGGER.trace("{}Started decoding of {}", logIdent(channelHandlerContext, this.endpoint), this.currentRequest);
        }
    }

    protected void publishResponse(CouchbaseResponse couchbaseResponse, Subject<CouchbaseResponse, CouchbaseResponse> subject) {
        if (couchbaseResponse.status() == ResponseStatus.RETRY || subject == null) {
            this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, couchbaseResponse, subject);
            return;
        }
        if (!this.moveResponseOut) {
            completeResponse(couchbaseResponse, subject);
            return;
        }
        Scheduler scheduler = env().scheduler();
        if (scheduler instanceof CoreScheduler) {
            scheduleDirect((CoreScheduler) scheduler, couchbaseResponse, subject);
        } else {
            scheduleWorker(scheduler, couchbaseResponse, subject);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void completeResponse(CouchbaseResponse couchbaseResponse, Subject<CouchbaseResponse, CouchbaseResponse> subject) {
        try {
            subject.onNext(couchbaseResponse);
            subject.onCompleted();
        } catch (Exception e) {
            LOGGER.warn("Caught exception while onNext on observable", (Throwable) e);
            subject.onError(e);
        }
    }

    private static void scheduleDirect(CoreScheduler coreScheduler, final CouchbaseResponse couchbaseResponse, final Subject<CouchbaseResponse, CouchbaseResponse> subject) {
        coreScheduler.scheduleDirect(new Action0() { // from class: com.couchbase.client.core.endpoint.AbstractGenericHandler.1
            public void call() {
                AbstractGenericHandler.completeResponse(CouchbaseResponse.this, subject);
            }
        });
    }

    private static void scheduleWorker(Scheduler scheduler, final CouchbaseResponse couchbaseResponse, final Subject<CouchbaseResponse, CouchbaseResponse> subject) {
        final Scheduler.Worker createWorker = scheduler.createWorker();
        createWorker.schedule(new Action0() { // from class: com.couchbase.client.core.endpoint.AbstractGenericHandler.2
            public void call() {
                try {
                    subject.onNext(couchbaseResponse);
                    subject.onCompleted();
                } catch (Exception e) {
                    AbstractGenericHandler.LOGGER.warn("Caught exception while onNext on observable", (Throwable) e);
                    subject.onError(e);
                } finally {
                    createWorker.unsubscribe();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void finishedDecoding() {
        this.currentDecodingState = DecodingState.FINISHED;
        if (this.isTransient) {
            this.endpoint.disconnect();
        }
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        LOGGER.debug(logIdent(channelHandlerContext, this.endpoint) + "Channel Inactive.");
        this.endpoint.notifyChannelInactive();
        channelHandlerContext.fireChannelInactive();
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        LOGGER.debug(logIdent(channelHandlerContext, this.endpoint) + "Channel Active.");
        SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
        if (remoteAddress instanceof InetSocketAddress) {
            this.remoteHostname = ((InetSocketAddress) remoteAddress).getAddress().getHostAddress();
        } else {
            this.remoteHostname = remoteAddress.toString();
        }
        channelHandlerContext.fireChannelActive();
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelInboundHandler
    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (!channelHandlerContext.channel().isWritable()) {
            channelHandlerContext.flush();
        }
        channelHandlerContext.fireChannelWritabilityChanged();
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelDuplexHandler, com.couchbase.client.deps.io.netty.channel.ChannelOutboundHandler
    public void connect(ChannelHandlerContext channelHandlerContext, SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) throws Exception {
        this.connectFuture = channelPromise;
        channelHandlerContext.connect(socketAddress, socketAddress2, channelPromise);
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelHandler, com.couchbase.client.deps.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        if (th instanceof IOException) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(logIdent(channelHandlerContext, this.endpoint) + "Connection reset by peer: " + th.getMessage(), th);
            } else {
                LOGGER.info(logIdent(channelHandlerContext, this.endpoint) + "Connection reset by peer: " + th.getMessage());
            }
            handleOutstandingOperations(channelHandlerContext);
            return;
        }
        if (!(th instanceof DecoderException) || !(th.getCause() instanceof SSLHandshakeException)) {
            LOGGER.warn(logIdent(channelHandlerContext, this.endpoint) + "Caught unknown exception: " + th.getMessage(), th);
            channelHandlerContext.fireExceptionCaught(th);
        } else if (this.connectFuture.isDone()) {
            LOGGER.warn(logIdent(channelHandlerContext, this.endpoint) + "Caught SSL exception after being connected: " + th.getMessage(), th);
        } else {
            this.connectFuture.setFailure(th.getCause());
        }
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelHandler
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
        handleOutstandingOperations(channelHandlerContext);
    }

    private void handleOutstandingOperations(ChannelHandlerContext channelHandlerContext) {
        if (this.sentRequestQueue.isEmpty()) {
            LOGGER.trace(logIdent(channelHandlerContext, this.endpoint) + "Not cancelling operations - sent queue is empty.");
            return;
        }
        LOGGER.debug(logIdent(channelHandlerContext, this.endpoint) + "Cancelling " + this.sentRequestQueue.size() + " outstanding requests.");
        while (!this.sentRequestQueue.isEmpty()) {
            REQUEST poll = this.sentRequestQueue.poll();
            try {
                sideEffectRequestToCancel(poll);
                poll.observable().onError(new RequestCancelledException("Request cancelled in-flight."));
            } catch (Exception e) {
                LOGGER.info("Exception thrown while cancelling outstanding operation: " + poll, (Throwable) e);
            }
        }
        this.sentRequestTimings.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sideEffectRequestToCancel(REQUEST request) {
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelInboundHandler
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        CouchbaseRequest createKeepAliveRequest;
        if (!(obj instanceof IdleStateEvent)) {
            super.userEventTriggered(channelHandlerContext, obj);
            return;
        }
        if (shouldSendKeepAlive() && (createKeepAliveRequest = createKeepAliveRequest()) != null) {
            createKeepAliveRequest.observable().subscribe(new KeepAliveResponseAction(channelHandlerContext));
            onKeepAliveFired(channelHandlerContext, createKeepAliveRequest);
            Channel channel = channelHandlerContext.channel();
            if (channel.isActive() && channel.isWritable()) {
                channelHandlerContext.pipeline().writeAndFlush(createKeepAliveRequest);
            }
        }
    }

    private boolean shouldSendKeepAlive() {
        if (this.pipeline) {
            return true;
        }
        return this.sentRequestQueue.isEmpty() && this.currentDecodingState == DecodingState.INITIAL;
    }

    protected CouchbaseRequest createKeepAliveRequest() {
        return null;
    }

    protected void onKeepAliveFired(ChannelHandlerContext channelHandlerContext, CouchbaseRequest couchbaseRequest) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(logIdent(channelHandlerContext, this.endpoint) + "KeepAlive fired");
        }
    }

    protected void onKeepAliveResponse(ChannelHandlerContext channelHandlerContext, CouchbaseResponse couchbaseResponse) {
        if (this.traceEnabled) {
            LOGGER.trace(logIdent(channelHandlerContext, this.endpoint) + "keepAlive was answered, status " + couchbaseResponse.status());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public REQUEST currentRequest() {
        return this.currentRequest;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void currentRequest(REQUEST request) {
        this.currentRequest = request;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String remoteHostname() {
        return this.remoteHostname;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CoreEnvironment env() {
        return this.endpoint.environment();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractEndpoint endpoint() {
        return this.endpoint;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String logIdent(ChannelHandlerContext channelHandlerContext, Endpoint endpoint) {
        return "[" + channelHandlerContext.channel().remoteAddress() + "][" + endpoint.getClass().getSimpleName() + "]: ";
    }

    public static void addHttpBasicAuth(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, String str, String str2) {
        String str3 = str2 == null ? "" : str2;
        ByteBuf buffer = channelHandlerContext.alloc().buffer(str.length() + str3.length() + 1);
        buffer.writeBytes((str + ":" + str3).getBytes(CHARSET));
        ByteBuf encode = Base64.encode(buffer, false);
        httpRequest.headers().add("Authorization", (Object) ("Basic " + encode.toString(CHARSET)));
        encode.release();
        buffer.release();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String remoteHttpHost(ChannelHandlerContext channelHandlerContext) {
        if (this.remoteHttpHost == null) {
            SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
            if (remoteAddress instanceof InetSocketAddress) {
                InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
                this.remoteHttpHost = inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort();
            } else {
                this.remoteHttpHost = remoteAddress.toString();
            }
        }
        return this.remoteHttpHost;
    }

    public DecodingState getDecodingState() {
        return this.currentDecodingState;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageCodec
    protected /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
        encode(channelHandlerContext, (ChannelHandlerContext) obj, (List<Object>) list);
    }
}
