/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.publisher;

import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.hive.HiveRegProps;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias(value="hivereg")
public class HiveRegistrationPublisher
extends DataPublisher {
    private static final Logger log = LoggerFactory.getLogger(HiveRegistrationPublisher.class);
    private static final String DATA_PUBLISH_TIME = HiveRegistrationPublisher.class.getName() + ".lastDataPublishTime";
    private static final Splitter LIST_SPLITTER_COMMA = Splitter.on((String)",").trimResults().omitEmptyStrings();
    public static final String HIVE_SPEC_COMPUTATION_TIMER = "hiveSpecComputationTimer";
    private static final String PATH_DEDUPE_ENABLED = "hive.registration.path.dedupe.enabled";
    private static final boolean DEFAULT_PATH_DEDUPE_ENABLED = true;
    private final Closer closer = Closer.create();
    private final HiveRegister hiveRegister;
    private final ExecutorService hivePolicyExecutor;
    private final MetricContext metricContext;
    private boolean isPathDedupeEnabled;
    private static Set<String> pathsToRegisterFromSingleState = Sets.newHashSet();

    public HiveRegistrationPublisher(State state) {
        super(state);
        this.hiveRegister = (HiveRegister)this.closer.register((Closeable)HiveRegister.get((State)state));
        this.hivePolicyExecutor = ExecutorsUtils.loggingDecorator((ExecutorService)Executors.newFixedThreadPool(new HiveRegProps(state).getNumThreads(), ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)log), (Optional)Optional.of((Object)"HivePolicyExecutor-%d"))));
        this.metricContext = Instrumented.getMetricContext((State)state, HiveRegistrationPublisher.class);
        this.isPathDedupeEnabled = state.getPropAsBoolean(PATH_DEDUPE_ENABLED, true);
    }

    public void close() throws IOException {
        try {
            ExecutorsUtils.shutdownExecutorService((ExecutorService)this.hivePolicyExecutor, (Optional)Optional.of((Object)log));
        }
        finally {
            this.closer.close();
        }
    }

    @Deprecated
    public void initialize() throws IOException {
    }

    public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
        ExecutorCompletionService<Collection<HiveSpec>> completionService = new ExecutorCompletionService<Collection<HiveSpec>>(this.hivePolicyExecutor);
        int toRegisterPathCount = 0;
        Iterator<? extends WorkUnitState> iterator = states.iterator();
        while (iterator.hasNext()) {
            State state;
            State taskSpecificState = state = (State)iterator.next();
            if (!state.contains("data.publisher.output.dirs")) continue;
            if (this.hiveRegister.getProps().getUpstreamDataAttrName().isPresent()) {
                for (String attrName : LIST_SPLITTER_COMMA.splitToList((CharSequence)this.hiveRegister.getProps().getUpstreamDataAttrName().get())) {
                    if (!state.contains(attrName)) continue;
                    taskSpecificState.appendToListProp("runtime.props", attrName + ":" + state.getProp(attrName));
                }
            }
            final HiveRegistrationPolicy policy = HiveRegistrationPolicyBase.getPolicy((State)taskSpecificState);
            for (final String path : state.getPropAsList("data.publisher.output.dirs")) {
                if (this.isPathDedupeEnabled && pathsToRegisterFromSingleState.contains(path)) continue;
                pathsToRegisterFromSingleState.add(path);
                ++toRegisterPathCount;
                completionService.submit(new Callable<Collection<HiveSpec>>(){

                    @Override
                    public Collection<HiveSpec> call() throws Exception {
                        try (Timer.Context context = HiveRegistrationPublisher.this.metricContext.timer(HiveRegistrationPublisher.HIVE_SPEC_COMPUTATION_TIMER).time();){
                            Collection collection = policy.getHiveSpecs(new Path(path));
                            return collection;
                        }
                    }
                });
            }
        }
        for (int i = 0; i < toRegisterPathCount; ++i) {
            try {
                for (HiveSpec spec : (Collection)completionService.take().get()) {
                    this.hiveRegister.register(spec);
                }
                continue;
            }
            catch (InterruptedException | ExecutionException exception) {
                log.info("Failed to generate HiveSpec", (Throwable)exception);
                throw new IOException(exception);
            }
        }
        log.info("Finished registering all HiveSpecs");
    }

    public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException {
    }

    private static void addRuntimeHiveRegistrationProperties(State state) {
        state.appendToListProp("hive.table.partition.props", String.format("%s:%d", DATA_PUBLISH_TIME, TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)));
    }
}

