package org.wso2.choreo.connect.enforcer.analytics;

import io.envoyproxy.envoy.service.accesslog.v3.AccessLogServiceGrpc;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse;
import io.grpc.BindableService;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.wso2.choreo.connect.enforcer.commons.logging.ErrorDetails;
import org.wso2.choreo.connect.enforcer.commons.logging.LoggingConstants;
import org.wso2.choreo.connect.enforcer.config.ConfigHolder;
import org.wso2.choreo.connect.enforcer.config.dto.AnalyticsReceiverConfigDTO;
import org.wso2.choreo.connect.enforcer.metrics.MetricsUtils;
import org.wso2.choreo.connect.enforcer.server.Constants;
import org.wso2.choreo.connect.enforcer.server.EnforcerThreadPoolExecutor;
import org.wso2.choreo.connect.enforcer.server.NativeThreadFactory;
import org.wso2.choreo.connect.enforcer.util.TLSUtils;

/* loaded from: input_file:org/wso2/choreo/connect/enforcer/analytics/AccessLoggingService.class */
public class AccessLoggingService extends AccessLogServiceGrpc.AccessLogServiceImplBase {
    private static final Logger logger = LogManager.getLogger(AccessLoggingService.class);

    public void init() throws IOException {
        if (ConfigHolder.getInstance().getConfig().getAnalyticsConfig().isEnabled()) {
            AnalyticsFilter.getInstance();
        }
        startAccessLoggingServer();
    }

    @Override // io.envoyproxy.envoy.service.accesslog.v3.AccessLogServiceGrpc.AccessLogServiceImplBase
    public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(final StreamObserver<StreamAccessLogsResponse> streamObserver) {
        return new StreamObserver<StreamAccessLogsMessage>() { // from class: org.wso2.choreo.connect.enforcer.analytics.AccessLoggingService.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(StreamAccessLogsMessage streamAccessLogsMessage) {
                if (ConfigHolder.getInstance().getConfig().getAnalyticsConfig().isEnabled()) {
                    AnalyticsFilter.getInstance().handleGRPCLogMsg(streamAccessLogsMessage);
                }
                if (ConfigHolder.getInstance().getConfig().getMetricsConfig().isMetricsEnabled()) {
                    MetricsUtils.handlePublishingMetrics(streamAccessLogsMessage);
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                AccessLoggingService.logger.error("Error while receiving access log entries from router. " + th.getMessage(), ErrorDetails.errorLog(LoggingConstants.Severity.CRITICAL, 5101));
                streamObserver.onCompleted();
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                streamObserver.onNext(StreamAccessLogsResponse.newBuilder().build());
                streamObserver.onCompleted();
                AccessLoggingService.logger.info("Access Log processing is completed.");
            }
        };
    }

    private void startAccessLoggingServer() throws IOException {
        AnalyticsReceiverConfigDTO serverConfig = ConfigHolder.getInstance().getConfig().getAnalyticsConfig().getServerConfig();
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
        NettyServerBuilder.forPort(serverConfig.getPort()).keepAliveTime(serverConfig.getKeepAliveTime(), TimeUnit.SECONDS).maxInboundMessageSize(serverConfig.getMaxMessageSize()).bossEventLoopGroup(nioEventLoopGroup).workerEventLoopGroup(new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2)).addService((BindableService) this).sslContext(TLSUtils.buildGRPCServerSSLContext()).channelType(NioServerSocketChannel.class).executor((Executor) new EnforcerThreadPoolExecutor(serverConfig.getThreadPoolConfig().getCoreSize(), serverConfig.getThreadPoolConfig().getMaxSize(), serverConfig.getThreadPoolConfig().getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue(serverConfig.getThreadPoolConfig().getQueueSize()), new NativeThreadFactory(new ThreadGroup(Constants.ANALYTICS_THREAD_GROUP), Constants.ANALYTICS_THREAD_ID))).build().start();
        logger.info("Access log Receiver started Listening in port : " + serverConfig.getPort());
    }
}
