package org.apache.sysds.runtime.controlprogram.federated;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.lops.Compression;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.parser.DataExpression;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.InstructionParser;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ListObject;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.lineage.LineageCache;
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.lineage.LineageItemUtils;
import org.apache.sysds.runtime.matrix.operators.MultiThreadedOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaDataAll;
import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.runtime.privacy.DMLPrivacyException;
import org.apache.sysds.runtime.privacy.PrivacyMonitor;
import org.apache.sysds.utils.Statistics;
import org.apache.sysds.utils.stats.ParamServStatistics;

/* loaded from: input_file:org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.class */
public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
    private static final Log LOG = LogFactory.getLog(FederatedWorkerHandler.class.getName());
    private final FederatedLookupTable _flt;
    private final FederatedReadCache _frc;
    private Timing _timing;
    private final FederatedWorkloadAnalyzer _fan;
    private String _remoteAddress;

    /* loaded from: input_file:org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler$CloseListener.class */
    private static class CloseListener implements ChannelFutureListener {
        private CloseListener() {
        }

        public void operationComplete(ChannelFuture channelFuture) throws InterruptedException {
            if (channelFuture.isSuccess()) {
                PrivacyMonitor.clearCheckedConstraints();
                channelFuture.channel().close().sync();
            } else {
                FederatedWorkerHandler.LOG.error("Federated Worker Write failed");
                channelFuture.channel().writeAndFlush(new FederatedResponse(FederatedResponse.ResponseType.ERROR, new FederatedWorkerHandlerException("Error while sending response."))).channel().close().sync();
            }
        }
    }

    public FederatedWorkerHandler(FederatedLookupTable federatedLookupTable, FederatedReadCache federatedReadCache, FederatedWorkloadAnalyzer federatedWorkloadAnalyzer) {
        this._timing = null;
        this._remoteAddress = FederatedLookupTable.NOHOST;
        this._flt = federatedLookupTable;
        this._frc = federatedReadCache;
        this._fan = federatedWorkloadAnalyzer;
    }

    public FederatedWorkerHandler(FederatedLookupTable federatedLookupTable, FederatedReadCache federatedReadCache, FederatedWorkloadAnalyzer federatedWorkloadAnalyzer, Timing timing) {
        this(federatedLookupTable, federatedReadCache, federatedWorkloadAnalyzer);
        this._timing = timing;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        channelHandlerContext.writeAndFlush(createResponse(obj, channelHandlerContext.channel().remoteAddress())).addListener(new CloseListener());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FederatedResponse createResponse(Object obj) {
        return createResponse(obj, FederatedLookupTable.NOHOST);
    }

    private FederatedResponse createResponse(Object obj, SocketAddress socketAddress) {
        String str;
        try {
            if (this._timing != null) {
                ParamServStatistics.accFedNetworkTime((long) this._timing.stop());
            }
        } catch (RuntimeException e) {
        }
        this._remoteAddress = socketAddress.toString();
        if (socketAddress instanceof InetSocketAddress) {
            str = ((InetSocketAddress) socketAddress).getHostString();
        } else if (socketAddress instanceof SocketAddress) {
            str = socketAddress.toString().split(":")[0].split(Lop.FILE_SEPARATOR)[1];
        } else {
            LOG.warn("Given remote address of coordinator is null. Continuing with nohost as host identifier.");
            str = FederatedLookupTable.NOHOST;
        }
        FederatedResponse createResponse = createResponse(obj, str);
        if (this._timing != null) {
            this._timing.start();
        }
        return createResponse;
    }

    private FederatedResponse createResponse(Object obj, String str) {
        if (!(obj instanceof FederatedRequest[])) {
            return new FederatedResponse(FederatedResponse.ResponseType.ERROR, new FederatedWorkerHandlerException("Received object of wrong instance 'FederatedRequest[]'."));
        }
        FederatedRequest[] federatedRequestArr = (FederatedRequest[]) obj;
        try {
            return createResponse(federatedRequestArr, str);
        } catch (FederatedWorkerHandlerException | DMLPrivacyException e) {
            LOG.error("Exception in FederatedWorkerHandler while processing requests:\n" + Arrays.toString(federatedRequestArr), e);
            return new FederatedResponse(FederatedResponse.ResponseType.ERROR, e);
        } catch (Exception e2) {
            String str2 = "Exception thrown while processing requests:\n" + Arrays.toString(federatedRequestArr);
            LOG.error(str2, e2);
            return new FederatedResponse(FederatedResponse.ResponseType.ERROR, new FederatedWorkerHandlerException(str2));
        }
    }

    private FederatedResponse createResponse(FederatedRequest[] federatedRequestArr, String str) throws DMLPrivacyException, FederatedWorkerHandlerException, Exception {
        FederatedResponse federatedResponse = null;
        boolean z = false;
        for (int i = 0; i < federatedRequestArr.length; i++) {
            FederatedRequest federatedRequest = federatedRequestArr[i];
            FederatedRequest.RequestType type = federatedRequest.getType();
            ExecutionContextMap ecm = this._flt.getECM(str, federatedRequest.getPID());
            logRequests(federatedRequest, i, federatedRequestArr.length);
            PrivacyMonitor.setCheckPrivacy(federatedRequest.checkPrivacy());
            PrivacyMonitor.clearCheckedConstraints();
            FederatedResponse executeCommand = executeCommand(federatedRequest, ecm);
            conditionalAddCheckedConstraints(federatedRequest, executeCommand);
            if (!executeCommand.isSuccessful()) {
                LOG.error("Command " + type + " resulted in error:\n" + executeCommand.getErrorMessage());
                return executeCommand;
            }
            if (type == FederatedRequest.RequestType.GET_VAR) {
                if (federatedResponse != null) {
                    LOG.error("Multiple GET_VAR are not supported in single batch of requests.");
                    throw new FederatedWorkerHandlerException("Multiple GET_VAR are not supported in single batch of requests.");
                }
                federatedResponse = executeCommand;
            } else if (federatedResponse == null && i == federatedRequestArr.length - 1) {
                federatedResponse = executeCommand;
            }
            if (type == FederatedRequest.RequestType.PUT_VAR || type == FederatedRequest.RequestType.EXEC_UDF) {
                for (int i2 = 0; i2 < federatedRequest.getNumParams(); i2++) {
                    FederatedStatistics.incFedTransfer(federatedRequest.getParam(i2), this._remoteAddress);
                }
            }
            if (type == FederatedRequest.RequestType.GET_VAR) {
                Object[] data = federatedResponse.getData();
                for (int i3 = 0; i3 < Arrays.stream(data).count(); i3++) {
                    FederatedStatistics.incFedTransfer(data[i3], this._remoteAddress);
                }
            }
            if (type == FederatedRequest.RequestType.CLEAR) {
                z = true;
            }
        }
        if (z) {
            this._flt.clear();
            printStatistics();
        }
        return federatedResponse;
    }

    private static void printStatistics() {
        if (DMLScript.STATISTICS && Statistics.allowWorkerStatistics) {
            System.out.println("Federated Worker " + Statistics.display());
            Statistics.reset();
        }
    }

    private static void logRequests(FederatedRequest federatedRequest, int i, int i2) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Executing command " + (i + 1) + "/" + i2 + ": " + federatedRequest.getType().name());
            if (LOG.isTraceEnabled()) {
                LOG.trace("full command: " + federatedRequest.toString());
            }
        }
    }

    private static void conditionalAddCheckedConstraints(FederatedRequest federatedRequest, FederatedResponse federatedResponse) {
        if (federatedRequest.checkPrivacy()) {
            federatedResponse.setCheckedConstraints(PrivacyMonitor.getCheckedConstraints());
        }
    }

    private FederatedResponse executeCommand(FederatedRequest federatedRequest, ExecutionContextMap executionContextMap) throws DMLPrivacyException, FederatedWorkerHandlerException, Exception {
        FederatedRequest.RequestType type = federatedRequest.getType();
        switch (type) {
            case READ_VAR:
                return readData(federatedRequest, executionContextMap);
            case PUT_VAR:
                return putVariable(federatedRequest, executionContextMap);
            case GET_VAR:
                return getVariable(federatedRequest, executionContextMap);
            case EXEC_INST:
                return execInstruction(federatedRequest, executionContextMap);
            case EXEC_UDF:
                return execUDF(federatedRequest, executionContextMap);
            case CLEAR:
                return execClear(executionContextMap);
            case NOOP:
                return execNoop();
            default:
                throw new FederatedWorkerHandlerException(String.format("Method %s is not supported.", type));
        }
    }

    private FederatedResponse readData(FederatedRequest federatedRequest, ExecutionContextMap executionContextMap) {
        checkNumParams(federatedRequest.getNumParams(), 2, 3);
        return readData((String) federatedRequest.getParam(0), Types.DataType.valueOf((String) federatedRequest.getParam(1)), federatedRequest.getID(), federatedRequest.getTID(), executionContextMap, federatedRequest.getNumParams() == 2 ? null : (CacheBlock) federatedRequest.getParam(2));
    }

    private FederatedResponse readData(String str, Types.DataType dataType, long j, long j2, ExecutionContextMap executionContextMap, CacheBlock cacheBlock) {
        MatrixCharacteristics matrixCharacteristics = new MatrixCharacteristics();
        matrixCharacteristics.setBlocksize(ConfigurationManager.getBlocksize());
        if (dataType != Types.DataType.MATRIX && dataType != Types.DataType.FRAME) {
            throw new FederatedWorkerHandlerException("Could not recognize datatype");
        }
        ExecutionContext executionContext = executionContextMap.get(j2);
        LineageItem lineageItem = new LineageItem(str);
        CacheableData<?> cacheableData = null;
        String valueOf = String.valueOf(j);
        boolean z = !LineageCacheConfig.ReuseCacheType.isNone() && dataType == Types.DataType.MATRIX;
        if (!z || !LineageCache.reuseFedRead(valueOf, dataType, lineageItem, executionContext)) {
            cacheableData = this._frc.get(str, !z);
            try {
                if (cacheableData == null) {
                    cacheableData = cacheBlock == null ? readDataNoReuse(str, dataType, matrixCharacteristics) : ExecutionContext.createCacheableData(cacheBlock);
                    if (z) {
                        LineageCache.putFedReadObject(cacheableData, lineageItem, executionContext);
                    } else {
                        this._frc.setData(str, cacheableData);
                    }
                }
                executionContext.setVariable(valueOf, cacheableData);
            } catch (Exception e) {
                if (z) {
                    LineageCache.putFedReadObject(null, lineageItem, executionContext);
                } else {
                    this._frc.setInvalid(str);
                }
                throw e;
            }
        }
        if (shouldTryAsyncCompress()) {
            CompressedMatrixBlockFactory.compressAsync(executionContext, valueOf);
        }
        if (DMLScript.LINEAGE) {
            executionContext.getLineage().set(valueOf, lineageItem);
        }
        if (dataType != Types.DataType.FRAME) {
            return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, new Object[]{Long.valueOf(j), matrixCharacteristics});
        }
        FrameObject frameObject = (FrameObject) cacheableData;
        frameObject.acquireRead();
        frameObject.refreshMetaData();
        frameObject.release();
        return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, new Object[]{Long.valueOf(j), frameObject.getSchema(), matrixCharacteristics});
    }

    private CacheableData<?> readDataNoReuse(String str, Types.DataType dataType, MatrixCharacteristics matrixCharacteristics) {
        CacheableData frameObject;
        switch (dataType) {
            case MATRIX:
                frameObject = new MatrixObject(Types.ValueType.FP64, str);
                break;
            case FRAME:
                frameObject = new FrameObject(str);
                break;
            default:
                throw new FederatedWorkerHandlerException("Could not recognize datatype");
        }
        try {
            try {
                String mTDFileName = DataExpression.getMTDFileName(str);
                Path path = new Path(mTDFileName);
                FileSystem fileSystem = IOUtilFunctions.getFileSystem(mTDFileName);
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(path)));
                try {
                    MetaDataAll metaDataAll = new MetaDataAll(bufferedReader);
                    if (!metaDataAll.mtdExists()) {
                        throw new FederatedWorkerHandlerException("Could not parse metadata file");
                    }
                    matrixCharacteristics.setRows(metaDataAll.getDim1());
                    matrixCharacteristics.setCols(metaDataAll.getDim2());
                    matrixCharacteristics.setNonZeros(metaDataAll.getNnz());
                    boolean hasHeader = metaDataAll.getHasHeader();
                    CacheableData<?> parseAndSetPrivacyConstraint = metaDataAll.parseAndSetPrivacyConstraint(frameObject);
                    Types.FileFormat fileFormat = metaDataAll.getFileFormat();
                    String delim = metaDataAll.getDelim();
                    bufferedReader.close();
                    IOUtilFunctions.closeSilently((Closeable) fileSystem);
                    parseAndSetPrivacyConstraint.setMetaData(new MetaDataFormat(matrixCharacteristics, fileFormat));
                    if (fileFormat == Types.FileFormat.CSV) {
                        parseAndSetPrivacyConstraint.setFileFormatProperties(new FileFormatPropertiesCSV(hasHeader, delim, false));
                    }
                    parseAndSetPrivacyConstraint.enableCleanup(false);
                    return parseAndSetPrivacyConstraint;
                } catch (Throwable th) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } catch (FederatedWorkerHandlerException | DMLPrivacyException e) {
                throw e;
            } catch (Exception e2) {
                String str2 = "Exception of type " + e2.getClass() + " thrown when processing READ request";
                LOG.error(str2, e2);
                throw new DMLRuntimeException(str2);
            }
        } catch (Throwable th3) {
            IOUtilFunctions.closeSilently((Closeable) null);
            throw th3;
        }
    }

    private FederatedResponse putVariable(FederatedRequest federatedRequest, ExecutionContextMap executionContextMap) {
        Data createMatrixObject;
        checkNumParams(federatedRequest.getNumParams(), 1, 2);
        String valueOf = String.valueOf(federatedRequest.getID());
        ExecutionContext executionContext = executionContextMap.get(federatedRequest.getTID());
        if (executionContext.containsVariable(valueOf)) {
            Data removeVariable = executionContext.removeVariable(valueOf);
            if (removeVariable != null) {
                executionContext.cleanupDataObject(removeVariable);
            }
            LOG.warn("Variable" + federatedRequest.getID() + " already existing, fallback to overwritten.");
        }
        Object param = federatedRequest.getParam(0);
        if (param instanceof CacheBlock) {
            createMatrixObject = ExecutionContext.createCacheableData((CacheBlock) param);
        } else if (param instanceof ScalarObject) {
            createMatrixObject = (ScalarObject) param;
        } else if (param instanceof ListObject) {
            createMatrixObject = (ListObject) param;
        } else {
            if (federatedRequest.getNumParams() != 2) {
                throw new FederatedWorkerHandlerException("Unsupported object type, has to be of type CacheBlock or ScalarObject");
            }
            createMatrixObject = federatedRequest.getParam(1) == Types.DataType.MATRIX ? ExecutionContext.createMatrixObject((MatrixCharacteristics) param) : ExecutionContext.createFrameObject((MatrixCharacteristics) param);
        }
        executionContext.setVariable(valueOf, createMatrixObject);
        if (shouldTryAsyncCompress()) {
            CompressedMatrixBlockFactory.compressAsync(executionContext, valueOf);
        }
        if (DMLScript.LINEAGE) {
            if ((federatedRequest.getParam(0) instanceof CacheBlock) && federatedRequest.getLineageTrace() != null) {
                executionContext.getLineage().set(valueOf, Lineage.deserializeSingleTrace(federatedRequest.getLineageTrace()));
                if (DMLScript.STATISTICS) {
                    FederatedStatistics.aggFedPutLineage(federatedRequest.getLineageTrace());
                }
            } else if (federatedRequest.getParam(0) instanceof ScalarObject) {
                executionContext.getLineage().set(valueOf, new LineageItem(CPOperand.getLineageLiteral((ScalarObject) federatedRequest.getParam(0), true)));
            } else if (federatedRequest.getNumParams() == 1) {
                executionContext.getLineage().set(valueOf, new LineageItem(String.valueOf(federatedRequest.getChecksum(0))));
            }
        }
        return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS_EMPTY);
    }

    private FederatedResponse getVariable(FederatedRequest federatedRequest, ExecutionContextMap executionContextMap) {
        try {
            checkNumParams(federatedRequest.getNumParams(), 0);
            ExecutionContext executionContext = executionContextMap.get(federatedRequest.getTID());
            if (!executionContext.containsVariable(String.valueOf(federatedRequest.getID()))) {
                throw new FederatedWorkerHandlerException("Variable " + federatedRequest.getID() + " does not exist at federated worker.");
            }
            Data handlePrivacy = PrivacyMonitor.handlePrivacy(executionContext.getVariable(String.valueOf(federatedRequest.getID())));
            switch (handlePrivacy.getDataType()) {
                case MATRIX:
                case FRAME:
                case TENSOR:
                    return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, ((CacheableData) handlePrivacy).acquireReadAndRelease(), LineageCacheConfig.ReuseCacheType.isNone() ? null : executionContext.getLineage().get(String.valueOf(federatedRequest.getID())));
                case LIST:
                    return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, ((ListObject) handlePrivacy).getData());
                case SCALAR:
                    return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, handlePrivacy);
                default:
                    throw new FederatedWorkerHandlerException("Unsupported return datatype " + handlePrivacy.getDataType().name());
            }
        } catch (Exception e) {
            throw new FederatedWorkerHandlerException("Failed to getVariable ", e);
        }
    }

    private FederatedResponse execInstruction(FederatedRequest federatedRequest, ExecutionContextMap executionContextMap) throws Exception {
        Instruction parseSingleInstruction = InstructionParser.parseSingleInstruction((String) federatedRequest.getParam(0));
        long tid = federatedRequest.getTID();
        ExecutionContext contextForInstruction = getContextForInstruction(tid, parseSingleInstruction, executionContextMap);
        setThreads(parseSingleInstruction);
        exec(contextForInstruction, parseSingleInstruction);
        adaptToWorkload(contextForInstruction, this._fan, tid, parseSingleInstruction);
        return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS_EMPTY);
    }

    private static ExecutionContext getContextForInstruction(long j, Instruction instruction, ExecutionContextMap executionContextMap) {
        ExecutionContext executionContext = executionContextMap.get(j);
        if (instruction.getType() != Instruction.IType.SPARK || (executionContext instanceof SparkExecutionContext)) {
            return executionContext;
        }
        executionContextMap.convertToSparkCtx();
        return executionContextMap.get(j);
    }

    private static void setThreads(Instruction instruction) {
        Operator operator = instruction.getOperator();
        if (operator instanceof MultiThreadedOperator) {
            int intValue = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_INST);
            ((MultiThreadedOperator) operator).setNumThreads(intValue > 0 ? intValue : InfrastructureAnalyzer.getLocalParallelism());
        }
    }

    private static void exec(ExecutionContext executionContext, Instruction instruction) {
        BasicProgramBlock basicProgramBlock = new BasicProgramBlock(null);
        basicProgramBlock.getInstructions().clear();
        basicProgramBlock.getInstructions().add(instruction);
        if (DMLScript.LINEAGE) {
            LineageCacheConfig.setCompAssRW(false);
        }
        basicProgramBlock.execute(executionContext);
    }

    private static void adaptToWorkload(ExecutionContext executionContext, FederatedWorkloadAnalyzer federatedWorkloadAnalyzer, long j, Instruction instruction) {
        if (federatedWorkloadAnalyzer != null) {
            CompletableFuture.runAsync(() -> {
                federatedWorkloadAnalyzer.incrementWorkload(executionContext, j, instruction);
                federatedWorkloadAnalyzer.compressRun(executionContext, j);
            });
        }
    }

    private FederatedResponse execUDF(FederatedRequest federatedRequest, ExecutionContextMap executionContextMap) {
        checkNumParams(federatedRequest.getNumParams(), 1);
        ExecutionContext executionContext = executionContextMap.get(federatedRequest.getTID());
        try {
            try {
                FederatedUDF federatedUDF = (FederatedUDF) federatedRequest.getParam(0);
                Data[] dataArr = (Data[]) Arrays.stream(federatedUDF.getInputIDs()).mapToObj(j -> {
                    return executionContext.getVariable(String.valueOf(j));
                }).map(PrivacyMonitor::handlePrivacy).toArray(i -> {
                    return new Data[i];
                });
                if (DMLScript.LINEAGE) {
                    LineageItemUtils.traceFedUDF(executionContext, federatedUDF);
                }
                FederatedResponse reuse = LineageCache.reuse(federatedUDF, executionContext);
                if (reuse.isSuccessful()) {
                    return reuse;
                }
                long nanoTime = !LineageCacheConfig.ReuseCacheType.isNone() ? System.nanoTime() : 0L;
                FederatedResponse execute = federatedUDF.execute(executionContext, dataArr);
                LineageCache.putValue(federatedUDF, executionContext, (!LineageCacheConfig.ReuseCacheType.isNone() ? System.nanoTime() : 0L) - nanoTime);
                return execute;
            } catch (FederatedWorkerHandlerException | DMLPrivacyException e) {
                LOG.debug("FederatedWorkerHandler Privacy Constraint exception thrown when processing EXEC_UDF request ", e);
                throw e;
            }
        } catch (Exception e2) {
            String str = "Exception of type " + e2.getClass() + " thrown when processing EXEC_UDF request";
            LOG.error(str, e2);
            throw new FederatedWorkerHandlerException(str);
        }
    }

    private FederatedResponse execClear(ExecutionContextMap executionContextMap) {
        try {
            executionContextMap.clear();
            return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS_EMPTY);
        } catch (FederatedWorkerHandlerException | DMLPrivacyException e) {
            throw e;
        } catch (Exception e2) {
            String str = "Exception of type " + e2.getClass() + " thrown when processing CLEAR request";
            LOG.error(str, e2);
            throw new FederatedWorkerHandlerException(str);
        }
    }

    private static FederatedResponse execNoop() {
        return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS_EMPTY);
    }

    private static void checkNumParams(int i, int... iArr) {
        if (!Arrays.stream(iArr).anyMatch(i2 -> {
            return i2 == i;
        })) {
            throw new DMLRuntimeException("FederatedWorkerHandler: Received wrong amount of params: expected=" + Arrays.toString(iArr) + ", actual=" + i);
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        th.printStackTrace();
        channelHandlerContext.close();
    }

    private boolean shouldTryAsyncCompress() {
        return Compression.CompressConfig.valueOf(ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.COMPRESSED_LINALG).toUpperCase()) == Compression.CompressConfig.TRUE;
    }
}
