/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.publisher;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.publisher.TimePartitionedDataPublisher;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimePartitionedStreamingDataPublisher
extends TimePartitionedDataPublisher {
    private static final Logger log = LoggerFactory.getLogger(TimePartitionedStreamingDataPublisher.class);
    private final MetricContext metricContext;

    public TimePartitionedStreamingDataPublisher(State state) throws IOException {
        super(state);
        this.metricContext = Instrumented.getMetricContext((State)state, TimePartitionedStreamingDataPublisher.class);
    }

    @Override
    protected void publishMultiTaskData(WorkUnitState state, int branchId, Set<Path> writerOutputPathsMoved) throws IOException {
        state.setProp("data.publisher.dataset.dir", (Object)this.getPublisherOutputDir(state, branchId).toString());
        super.publishMultiTaskData(state, branchId, writerOutputPathsMoved);
    }

    @Override
    protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData, Set<Path> writerOutputPathsMoved) throws IOException {
        Path writerOutputDir = WriterUtils.getWriterOutputDir((State)state, (int)this.numBranches, (int)branchId);
        if (!((FileSystem)this.writerFileSystemByBranches.get(branchId)).exists(writerOutputDir)) {
            log.warn(String.format("Branch %d of WorkUnit %s produced no data", branchId, state.getId()));
            return;
        }
        Path publisherOutputDir = this.getPublisherOutputDir(state, branchId);
        if (!((FileSystem)this.publisherFileSystemByBranches.get(branchId)).exists(publisherOutputDir)) {
            WriterUtils.mkdirsWithRecursivePermissionWithRetry((FileSystem)((FileSystem)this.publisherFileSystemByBranches.get(branchId)), (Path)publisherOutputDir, (FsPermission)((FsPermission)this.permissions.get(branchId)), (Config)this.retryerConfig);
        }
        super.publishData(state, branchId, publishSingleTaskData, writerOutputPathsMoved);
    }

    @Override
    public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
        this.publishDataImpl(states);
        this.wusCleanUp(states);
    }

    public void publishDataImpl(Collection<? extends WorkUnitState> states) throws IOException {
        HashSet writerOutputPathsMoved = Sets.newHashSet();
        for (WorkUnitState workUnitState : states) {
            for (int i = 0; i < this.numBranches; ++i) {
                this.publishMultiTaskData(workUnitState, i, writerOutputPathsMoved);
            }
        }
        for (ParallelRunner parallelRunner : this.parallelRunners.values()) {
            parallelRunner.waitForTasks();
        }
        for (WorkUnitState workUnitState : states) {
            workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
        }
        ArrayList statesWithLineage = Lists.newArrayList();
        for (WorkUnitState workUnitState : states) {
            if (!LineageInfo.hasLineageInfo((State)workUnitState)) continue;
            statesWithLineage.add(workUnitState);
        }
        long l = System.currentTimeMillis();
        this.submitLineageEvents(statesWithLineage);
        log.info("Emitting lineage events took {} millis", (Object)(System.currentTimeMillis() - l));
    }

    private void submitLineageEvents(Collection<? extends WorkUnitState> states) {
        for (Map.Entry entry : LineageInfo.aggregateByLineageEvent(states).entrySet()) {
            LineageInfo.submitLineageEvent((String)((String)entry.getKey()), (Collection)((Collection)entry.getValue()), (MetricContext)this.metricContext);
        }
    }

    protected void wusCleanUp(Collection<? extends WorkUnitState> states) {
        WorkUnitState wuState = states.stream().findFirst().get();
        int numBranches = wuState.getPropAsInt("fork.branches", 1);
        for (WorkUnitState workUnitState : states) {
            for (int branchId = 0; branchId < numBranches; ++branchId) {
                String outputFilePropName = ForkOperatorUtils.getPropertyNameForBranch((String)"writer.final.output.file.paths", (int)numBranches, (int)branchId);
                if (workUnitState.contains(outputFilePropName)) {
                    workUnitState.removeProp(outputFilePropName);
                }
                LineageInfo.removeDestinationProp((State)workUnitState, (int)branchId);
            }
        }
    }

    @VisibleForTesting
    Set<Path> getPublishOutputDirs() {
        return this.publisherOutputDirs;
    }
}

