package org.apache.sysds.runtime.controlprogram.federated;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.Serializable;
import java.security.cert.CertificateException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import org.antlr.v4.runtime.atn.PredictionContext;
import org.apache.log4j.Logger;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.paramserv.NetworkTrafficCounter;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysds.runtime.lineage.LineageCache;
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
import org.apache.sysds.runtime.lineage.LineageItem;

/* loaded from: input_file:org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.class */
public class FederatedWorker {
    protected static Logger log = Logger.getLogger(FederatedWorker.class);
    private final int _port;
    private final FederatedWorkloadAnalyzer _fan;
    private final boolean _debug;
    private Timing networkTimer = new Timing();
    private final FederatedLookupTable _flt = new FederatedLookupTable();
    private final FederatedReadCache _frc = new FederatedReadCache();

    /* loaded from: input_file:org/apache/sysds/runtime/controlprogram/federated/FederatedWorker$FederatedResponseEncoder.class */
    public static class FederatedResponseEncoder extends ObjectEncoder {
        /* JADX INFO: Access modifiers changed from: protected */
        public ByteBuf allocateBuffer(ChannelHandlerContext channelHandlerContext, Serializable serializable, boolean z) throws Exception {
            int i = 256;
            if (serializable instanceof FederatedResponse) {
                try {
                    i = Math.toIntExact(((FederatedResponse) serializable).estimateSerializationBufferSize());
                } catch (ArithmeticException e) {
                    i = Integer.MAX_VALUE;
                }
            }
            return z ? channelHandlerContext.alloc().ioBuffer(i) : channelHandlerContext.alloc().heapBuffer(i);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void encode(ChannelHandlerContext channelHandlerContext, Serializable serializable, ByteBuf byteBuf) throws Exception {
            LineageItem lineageItem = null;
            boolean z = !LineageCacheConfig.ReuseCacheType.isNone() && (serializable instanceof FederatedResponse);
            if (z) {
                FederatedResponse federatedResponse = (FederatedResponse) serializable;
                if (federatedResponse.getData() != null && federatedResponse.getData().length != 0 && (federatedResponse.getData()[0] instanceof CacheBlock)) {
                    lineageItem = federatedResponse.getLineageItem();
                    byte[] reuseSerialization = LineageCache.reuseSerialization(lineageItem);
                    if (reuseSerialization != null) {
                        byteBuf.writeBytes(reuseSerialization);
                        return;
                    }
                }
            }
            boolean z2 = z & (lineageItem != null);
            int writerIndex = z2 ? byteBuf.writerIndex() : 0;
            long nanoTime = z2 ? System.nanoTime() : 0L;
            super.encode(channelHandlerContext, serializable, byteBuf);
            long nanoTime2 = z2 ? System.nanoTime() : 0L;
            if (z2) {
                byteBuf.readerIndex(writerIndex);
                byte[] bArr = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(bArr);
                LineageCache.putSerializedObject(bArr, lineageItem, nanoTime2 - nanoTime);
                byteBuf.resetReaderIndex();
            }
        }
    }

    public FederatedWorker(int i, boolean z) {
        if (ConfigurationManager.getCompressConfig().isWorkload()) {
            this._fan = new FederatedWorkloadAnalyzer();
        } else {
            this._fan = null;
        }
        this._port = i == -1 ? DMLConfig.DEFAULT_FEDERATED_PORT : i;
        this._debug = z;
        LineageCacheConfig.setConfig(DMLScript.LINEAGE_REUSE);
        LineageCacheConfig.setCachePolicy(DMLScript.LINEAGE_POLICY);
        LineageCacheConfig.setEstimator(DMLScript.LINEAGE_ESTIMATE);
        run();
    }

    private void run() {
        log.info("Setting up Federated Worker on port " + this._port);
        int intValue = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_CONN);
        int localParallelism = intValue > 0 ? intValue : InfrastructureAnalyzer.getLocalParallelism();
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup nioEventLoopGroup2 = new NioEventLoopGroup(localParallelism, new ThreadPoolExecutor(1, PredictionContext.EMPTY_RETURN_STATE, 10L, TimeUnit.SECONDS, new SynchronousQueue(true)));
        boolean isFederatedSSL = ConfigurationManager.isFederatedSSL();
        try {
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(nioEventLoopGroup, nioEventLoopGroup2);
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.childHandler(createChannel(isFederatedSSL));
                serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
                serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
                log.info("Starting Federated Worker server at port: " + this._port);
                ChannelFuture sync = serverBootstrap.bind(this._port).sync();
                log.info("Started Federated Worker at port: " + this._port);
                sync.channel().closeFuture().sync();
                log.info("Federated Worker Shutting down.");
                nioEventLoopGroup2.shutdownGracefully();
                nioEventLoopGroup.shutdownGracefully();
            } catch (Exception e) {
                log.info("Federated worker interrupted");
                if (this._debug) {
                    log.error(e.getMessage());
                    e.printStackTrace();
                }
                log.info("Federated Worker Shutting down.");
                nioEventLoopGroup2.shutdownGracefully();
                nioEventLoopGroup.shutdownGracefully();
            }
        } catch (Throwable th) {
            log.info("Federated Worker Shutting down.");
            nioEventLoopGroup2.shutdownGracefully();
            nioEventLoopGroup.shutdownGracefully();
            throw th;
        }
    }

    private ChannelInitializer<SocketChannel> createChannel(final boolean z) {
        try {
            SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
            final SslContext build = SslContextBuilder.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()).build();
            return new ChannelInitializer<SocketChannel>() { // from class: org.apache.sysds.runtime.controlprogram.federated.FederatedWorker.1
                public void initChannel(SocketChannel socketChannel) {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    if (ConfigurationManager.getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION)) {
                        pipeline.addLast(new ChannelHandler[]{build.newHandler(socketChannel.alloc())});
                    }
                    if (z) {
                        pipeline.addLast(new ChannelHandler[]{build.newHandler(socketChannel.alloc())});
                    }
                    pipeline.addLast("NetworkTrafficCounter", new NetworkTrafficCounter((v0, v1) -> {
                        FederatedStatistics.logWorkerTraffic(v0, v1);
                    }));
                    pipeline.addLast("ObjectDecoder", new ObjectDecoder(PredictionContext.EMPTY_RETURN_STATE, ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())));
                    pipeline.addLast("ObjectEncoder", new ObjectEncoder());
                    pipeline.addLast(new ChannelHandler[]{FederationUtils.decoder(), new FederatedResponseEncoder()});
                    pipeline.addLast(new ChannelHandler[]{new FederatedWorkerHandler(FederatedWorker.this._flt, FederatedWorker.this._frc, FederatedWorker.this._fan, FederatedWorker.this.networkTimer)});
                }
            };
        } catch (CertificateException | SSLException e) {
            throw new DMLRuntimeException("Failed creating channel SSL", e);
        }
    }
}
