package org.apache.flink.runtime.operators.chaining;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.operators.BatchTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.class */
public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> {
    private static final Logger LOG = LoggerFactory.getLogger(ChainedAllReduceDriver.class);
    private ReduceFunction<IT> reducer;
    private TypeSerializer<IT> serializer;
    private IT base;

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void setup(AbstractInvokable abstractInvokable) {
        ReduceFunction<IT> reduceFunction = (ReduceFunction) BatchTask.instantiateUserCode(this.config, this.userCodeClassLoader, ReduceFunction.class);
        this.reducer = reduceFunction;
        FunctionUtils.setFunctionRuntimeContext(reduceFunction, getUdfRuntimeContext());
        this.serializer = this.config.getInputSerializer(0, this.userCodeClassLoader).getSerializer();
        if (LOG.isDebugEnabled()) {
            LOG.debug("ChainedAllReduceDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ScopeFormat.SCOPE_SEPARATOR);
        }
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void openTask() throws Exception {
        BatchTask.openUserCode(this.reducer, this.config.getStubParameters());
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void closeTask() throws Exception {
        BatchTask.closeUserCode(this.reducer);
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void cancelTask() {
        try {
            FunctionUtils.closeFunction(this.reducer);
        } catch (Throwable th) {
        }
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    /* renamed from: getStub */
    public Function mo2166getStub() {
        return this.reducer;
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public String getTaskName() {
        return this.taskName;
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void collect(IT it) {
        this.numRecordsIn.inc();
        try {
            if (this.base == null) {
                this.base = (IT) this.serializer.copy(it);
            } else {
                this.base = this.objectReuseEnabled ? (IT) this.reducer.reduce(this.base, it) : (IT) this.serializer.copy(this.reducer.reduce(this.base, it));
            }
        } catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
    }

    public void close() {
        try {
            if (this.base != null) {
                this.outputCollector.collect(this.base);
                this.base = null;
            }
            this.outputCollector.close();
        } catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
    }
}
