package org.apache.beam.runners.flink;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.base.Splitter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkExecutionEnvironments.class */
public class FlinkExecutionEnvironments {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class);

    public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions, List<String> list) {
        return createBatchExecutionEnvironment(flinkPipelineOptions, list, null);
    }

    @VisibleForTesting
    static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions, List<String> list, @Nullable String str) {
        LocalEnvironment executionEnvironment;
        LOG.info("Creating a Batch Execution Environment.");
        String flinkMaster = flinkPipelineOptions.getFlinkMaster();
        if ("[local]".equals(flinkMaster)) {
            executionEnvironment = ExecutionEnvironment.createLocalEnvironment();
        } else if ("[collection]".equals(flinkMaster)) {
            executionEnvironment = new CollectionEnvironment();
        } else if (FlinkPipelineOptions.AUTO.equals(flinkMaster)) {
            executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        } else if (flinkMaster.matches(".*:\\d*")) {
            List<String> splitToList = Splitter.on(':').splitToList(flinkMaster);
            executionEnvironment = ExecutionEnvironment.createRemoteEnvironment(splitToList.get(0), Integer.parseInt(splitToList.get(1)), (String[]) list.toArray(new String[list.size()]));
        } else {
            LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", flinkMaster);
            executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        }
        executionEnvironment.getConfig().setExecutionMode(flinkPipelineOptions.getExecutionModeForBatch());
        if (flinkPipelineOptions.getParallelism().intValue() != -1 && !(executionEnvironment instanceof CollectionEnvironment)) {
            executionEnvironment.setParallelism(flinkPipelineOptions.getParallelism().intValue());
        }
        int determineParallelism = executionEnvironment instanceof CollectionEnvironment ? 1 : determineParallelism(flinkPipelineOptions.getParallelism().intValue(), executionEnvironment.getParallelism(), str);
        executionEnvironment.setParallelism(determineParallelism);
        flinkPipelineOptions.setParallelism(Integer.valueOf(determineParallelism));
        if (flinkPipelineOptions.getObjectReuse().booleanValue()) {
            executionEnvironment.getConfig().enableObjectReuse();
        } else {
            executionEnvironment.getConfig().disableObjectReuse();
        }
        applyLatencyTrackingInterval(executionEnvironment.getConfig(), flinkPipelineOptions);
        return executionEnvironment;
    }

    public static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions, List<String> list) {
        return createStreamExecutionEnvironment(flinkPipelineOptions, list, null);
    }

    @VisibleForTesting
    static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions flinkPipelineOptions, List<String> list, @Nullable String str) {
        LocalStreamEnvironment executionEnvironment;
        LOG.info("Creating a Streaming Environment.");
        String flinkMaster = flinkPipelineOptions.getFlinkMaster();
        if ("[local]".equals(flinkMaster)) {
            executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        } else if (FlinkPipelineOptions.AUTO.equals(flinkMaster)) {
            executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        } else if (flinkMaster.matches(".*:\\d*")) {
            List<String> splitToList = Splitter.on(':').splitToList(flinkMaster);
            Configuration configuration = new Configuration();
            configuration.setInteger(RestOptions.PORT, Integer.parseInt(splitToList.get(1)));
            executionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment(splitToList.get(0), Integer.parseInt(splitToList.get(1)), configuration, (String[]) list.toArray(new String[list.size()]));
        } else {
            LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", flinkMaster);
            executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        }
        int determineParallelism = determineParallelism(flinkPipelineOptions.getParallelism().intValue(), executionEnvironment.getParallelism(), str);
        executionEnvironment.setParallelism(determineParallelism);
        flinkPipelineOptions.setParallelism(Integer.valueOf(determineParallelism));
        if (flinkPipelineOptions.getObjectReuse().booleanValue()) {
            executionEnvironment.getConfig().enableObjectReuse();
        } else {
            executionEnvironment.getConfig().disableObjectReuse();
        }
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        int intValue = flinkPipelineOptions.getNumberOfExecutionRetries().intValue();
        if (intValue != -1) {
            executionEnvironment.setNumberOfExecutionRetries(intValue);
        }
        long longValue = flinkPipelineOptions.getExecutionRetryDelay().longValue();
        if (longValue != -1) {
            executionEnvironment.getConfig().setExecutionRetryDelay(longValue);
        }
        long longValue2 = flinkPipelineOptions.getCheckpointingInterval().longValue();
        if (longValue2 == -1) {
            flinkPipelineOptions.setShutdownSourcesOnFinalWatermark(true);
        } else {
            if (longValue2 < 1) {
                throw new IllegalArgumentException("The checkpoint interval must be positive");
            }
            executionEnvironment.enableCheckpointing(longValue2, flinkPipelineOptions.getCheckpointingMode());
            if (flinkPipelineOptions.getCheckpointTimeoutMillis().longValue() != -1) {
                executionEnvironment.getCheckpointConfig().setCheckpointTimeout(flinkPipelineOptions.getCheckpointTimeoutMillis().longValue());
            }
            boolean booleanValue = flinkPipelineOptions.isExternalizedCheckpointsEnabled().booleanValue();
            boolean booleanValue2 = flinkPipelineOptions.getRetainExternalizedCheckpointsOnCancellation().booleanValue();
            if (booleanValue) {
                executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(booleanValue2 ? CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION : CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
            }
            long longValue3 = flinkPipelineOptions.getMinPauseBetweenCheckpoints().longValue();
            if (longValue3 != -1) {
                executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(longValue3);
            }
        }
        applyLatencyTrackingInterval(executionEnvironment.getConfig(), flinkPipelineOptions);
        if (flinkPipelineOptions.getAutoWatermarkInterval() != null) {
            executionEnvironment.getConfig().setAutoWatermarkInterval(flinkPipelineOptions.getAutoWatermarkInterval().longValue());
        }
        StateBackend stateBackend = flinkPipelineOptions.getStateBackend();
        if (stateBackend != null) {
            executionEnvironment.setStateBackend(stateBackend);
        }
        return executionEnvironment;
    }

    private static int determineParallelism(int i, int i2, @Nullable String str) {
        if (i > 0) {
            return i;
        }
        if (i2 > 0) {
            return i2;
        }
        int integer = (str == null ? GlobalConfiguration.loadConfiguration() : GlobalConfiguration.loadConfiguration(str)).getInteger(CoreOptions.DEFAULT_PARALLELISM.key(), -1);
        if (integer > 0) {
            return integer;
        }
        LOG.warn("No default parallelism could be found. Defaulting to parallelism 1. Please set an explicit parallelism with --parallelism");
        return 1;
    }

    private static void applyLatencyTrackingInterval(ExecutionConfig executionConfig, FlinkPipelineOptions flinkPipelineOptions) {
        executionConfig.setLatencyTrackingInterval(flinkPipelineOptions.getLatencyTrackingInterval().longValue());
    }
}
