/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.source.extractor.extract;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientCache;
import org.apache.gobblin.config.client.api.ConfigStoreFactoryDoesNotExistsException;
import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
import org.apache.gobblin.config.store.api.VersionDoesNotExistException;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.extractor.partition.Partition;
import org.apache.gobblin.source.extractor.partition.Partitioner;
import org.apache.gobblin.source.extractor.utils.Utils;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.dataset.DatasetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class QueryBasedSource<S, D>
extends AbstractSource<S, D> {
    private static final Logger log = LoggerFactory.getLogger(QueryBasedSource.class);
    public static final String ENTITY_BLACKLIST = "entity.blacklist";
    public static final String ENTITY_WHITELIST = "entity.whitelist";
    public static final String SOURCE_OBTAIN_TABLE_PROPS_FROM_CONFIG_STORE = "source.obtain_table_props_from_config_store";
    public static final boolean DEFAULT_SOURCE_OBTAIN_TABLE_PROPS_FROM_CONFIG_STORE = false;
    private static final String QUERY_BASED_SOURCE = "query_based_source";
    public static final String WORK_UNIT_STATE_VERSION_KEY = "source.querybased.workUnitState.version";
    public static final Integer CURRENT_WORK_UNIT_STATE_VERSION = 3;
    protected Optional<LineageInfo> lineageInfo;

    public List<WorkUnit> getWorkunits(SourceState state) {
        QueryBasedSource.initLogger(state);
        this.lineageInfo = LineageInfo.getLineageInfo((SharedResourcesBroker)state.getBroker());
        ArrayList workUnits = Lists.newArrayList();
        Set<SourceEntity> entities = this.getFilteredSourceEntities(state);
        Map<SourceEntity, State> tableSpecificPropsMap = QueryBasedSource.shouldObtainTablePropsFromConfigStore(state) ? QueryBasedSource.getTableSpecificPropsFromConfigStore(entities, (State)state) : QueryBasedSource.getTableSpecificPropsFromState(entities, state);
        Map<SourceEntity, Long> prevWatermarksByTable = QueryBasedSource.getPreviousWatermarksForAllTables(state);
        UnmodifiableIterator unmodifiableIterator = Sets.union(entities, prevWatermarksByTable.keySet()).iterator();
        while (unmodifiableIterator.hasNext()) {
            long previousWatermark;
            SourceEntity sourceEntity;
            log.info("Source entity to be processed: {}, carry-over from previous state: {} ", (Object)sourceEntity, (Object)(!entities.contains(sourceEntity = (SourceEntity)unmodifiableIterator.next()) ? 1 : 0));
            SourceState combinedState = QueryBasedSource.getCombinedState(state, tableSpecificPropsMap.get(sourceEntity));
            long l = previousWatermark = prevWatermarksByTable.containsKey(sourceEntity) ? prevWatermarksByTable.get(sourceEntity) : -1L;
            if (!entities.contains(sourceEntity)) {
                combinedState.setProp("source.querybased.end.value", (Object)previousWatermark);
            }
            workUnits.addAll(this.generateWorkUnits(sourceEntity, combinedState, previousWatermark));
        }
        log.info("Total number of workunits for the current run: " + workUnits.size());
        List previousWorkUnits = this.getPreviousWorkUnitsForRetry(state);
        log.info("Total number of incomplete tasks from the previous run: " + previousWorkUnits.size());
        workUnits.addAll(previousWorkUnits);
        int numOfMultiWorkunits = state.getPropAsInt("mr.job.max.mappers", 100);
        return QueryBasedSource.pack(workUnits, numOfMultiWorkunits);
    }

    protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) {
        ArrayList workUnits = Lists.newArrayList();
        String nameSpaceName = state.getProp("extract.namespace");
        Extract.TableType tableType = Extract.TableType.valueOf((String)state.getProp("extract.table.type").toUpperCase());
        List<Partition> partitions = new Partitioner(state).getPartitionList(previousWatermark);
        Collections.sort(partitions, Partitioner.ascendingComparator);
        String outputTableName = sourceEntity.getDestTableName();
        log.info("Create extract output with table name is " + outputTableName);
        Extract extract = this.createExtract(tableType, nameSpaceName, outputTableName);
        if (Boolean.valueOf(state.getProp("extract.is.full")).booleanValue()) {
            extract.setFullTrue(System.currentTimeMillis());
        }
        Optional highestWaterMark = Optional.absent();
        Optional lowestWaterMark = Optional.absent();
        for (Partition partition : partitions) {
            WorkUnit workunit = WorkUnit.create((Extract)extract);
            workunit.setProp("source.entity", (Object)sourceEntity.getSourceEntityName());
            workunit.setProp("extract.table.name", (Object)sourceEntity.getDestTableName());
            workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, (Object)CURRENT_WORK_UNIT_STATE_VERSION);
            this.addLineageSourceInfo(state, sourceEntity, workunit);
            partition.serialize(workunit);
            workUnits.add(workunit);
            highestWaterMark = highestWaterMark.isPresent() ? highestWaterMark.transform(hw -> Math.max(hw, partition.getHighWatermark())) : Optional.of((Object)partition.getHighWatermark());
            lowestWaterMark = lowestWaterMark.isPresent() ? lowestWaterMark.transform(lw -> Math.min(lw, partition.getLowWatermark())) : Optional.of((Object)partition.getLowWatermark());
        }
        if (highestWaterMark.isPresent() && lowestWaterMark.isPresent()) {
            state.appendToListProp("highWatermark", String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, highestWaterMark.get()));
            state.appendToListProp("lowWatermark", String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, lowestWaterMark.get()));
        }
        return workUnits;
    }

    protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) {
    }

    protected Set<SourceEntity> getFilteredSourceEntities(SourceState state) {
        Set<SourceEntity> unfilteredEntities = this.getSourceEntities((State)state);
        return QueryBasedSource.getFilteredSourceEntitiesHelper(state, unfilteredEntities);
    }

    static Set<SourceEntity> getFilteredSourceEntitiesHelper(SourceState state, Iterable<SourceEntity> unfilteredEntities) {
        HashSet<SourceEntity> entities = new HashSet<SourceEntity>();
        List blacklist = DatasetFilterUtils.getPatternList((State)state, (String)ENTITY_BLACKLIST);
        List whitelist = DatasetFilterUtils.getPatternList((State)state, (String)ENTITY_WHITELIST);
        for (SourceEntity entity : unfilteredEntities) {
            if (!DatasetFilterUtils.survived((String)entity.getSourceEntityName(), (List)blacklist, (List)whitelist)) continue;
            entities.add(entity);
        }
        return entities;
    }

    public static Map<SourceEntity, State> getTableSpecificPropsFromState(Iterable<SourceEntity> entities, SourceState state) {
        HashMap<String, SourceEntity> sourceEntityByName = new HashMap<String, SourceEntity>();
        for (SourceEntity entity : entities) {
            sourceEntityByName.put(entity.getDatasetName(), entity);
        }
        Map datasetProps = DatasetUtils.getDatasetSpecificProps(sourceEntityByName.keySet(), (State)state);
        HashMap<SourceEntity, State> res = new HashMap<SourceEntity, State>();
        for (Map.Entry entry : datasetProps.entrySet()) {
            res.put((SourceEntity)sourceEntityByName.get(entry.getKey()), (State)entry.getValue());
        }
        return res;
    }

    protected Set<SourceEntity> getSourceEntities(State state) {
        return QueryBasedSource.getSourceEntitiesHelper(state);
    }

    static Set<SourceEntity> getSourceEntitiesHelper(State state) {
        if (state.contains("source.entities")) {
            log.info("Using entity names in source.entities");
            HashSet<SourceEntity> res = new HashSet<SourceEntity>();
            for (String sourceEntityName : state.getPropAsList("source.entities")) {
                res.add(SourceEntity.fromSourceEntityName(sourceEntityName));
            }
            return res;
        }
        if (state.contains("source.entity") || state.contains("extract.table.name")) {
            Optional<SourceEntity> sourceEntity = SourceEntity.fromState(state);
            log.info("Using entity name in " + sourceEntity.get());
            return ImmutableSet.of((Object)sourceEntity.get());
        }
        throw new IllegalStateException(String.format("One of the following properties must be specified: %s, %s.", "source.entities", "source.entity"));
    }

    private static boolean shouldObtainTablePropsFromConfigStore(SourceState state) {
        return state.getPropAsBoolean(SOURCE_OBTAIN_TABLE_PROPS_FROM_CONFIG_STORE, false);
    }

    private static Map<SourceEntity, State> getTableSpecificPropsFromConfigStore(Collection<SourceEntity> tables, State state) {
        ConfigClient client = ConfigClientCache.getClient((VersionStabilityPolicy)VersionStabilityPolicy.STRONG_LOCAL_STABILITY);
        String configStoreUri = state.getProp("gobblin.config.management.store.uri");
        Preconditions.checkNotNull((Object)configStoreUri);
        HashMap result = Maps.newHashMap();
        for (SourceEntity table : tables) {
            try {
                result.put(table, ConfigUtils.configToState((Config)client.getConfig(PathUtils.combinePaths((String[])new String[]{configStoreUri, QUERY_BASED_SOURCE, table.getDatasetName()}).toUri())));
            }
            catch (ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException | VersionDoesNotExistException e) {
                throw new RuntimeException("Unable to get table config for " + table, e);
            }
        }
        return result;
    }

    private static SourceState getCombinedState(SourceState state, State tableSpecificState) {
        if (tableSpecificState == null) {
            return state;
        }
        SourceState combinedState = new SourceState((State)state, state.getPreviousDatasetStatesByUrns(), (Iterable)state.getPreviousWorkUnitStates());
        combinedState.addAll(tableSpecificState);
        return combinedState;
    }

    private static List<WorkUnit> pack(List<WorkUnit> workUnits, int numOfMultiWorkunits) {
        int i;
        Preconditions.checkArgument((numOfMultiWorkunits > 0 ? 1 : 0) != 0);
        if (workUnits.size() <= numOfMultiWorkunits) {
            return workUnits;
        }
        ArrayList result = Lists.newArrayListWithCapacity((int)numOfMultiWorkunits);
        for (i = 0; i < numOfMultiWorkunits; ++i) {
            result.add(MultiWorkUnit.createEmpty());
        }
        for (i = 0; i < workUnits.size(); ++i) {
            ((MultiWorkUnit)result.get(i % numOfMultiWorkunits)).addWorkUnit(workUnits.get(i));
        }
        return result;
    }

    public void shutdown(SourceState state) {
    }

    static Map<SourceEntity, Long> getPreviousWatermarksForAllTables(SourceState state) {
        HashMap result = Maps.newHashMap();
        HashMap prevLowWatermarksByTable = Maps.newHashMap();
        HashMap prevActualHighWatermarksByTable = Maps.newHashMap();
        HashSet tablesWithFailedTasks = Sets.newHashSet();
        HashSet tablesWithNoUpdatesOnPreviousRun = Sets.newHashSet();
        boolean commitOnFullSuccess = JobCommitPolicy.getCommitPolicy((State)state) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
        for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) {
            Optional<SourceEntity> sourceEntity = SourceEntity.fromState((State)workUnitState);
            if (!sourceEntity.isPresent()) {
                log.warn("Missing source entity for WorkUnit state: " + workUnitState);
                continue;
            }
            SourceEntity table = (SourceEntity)sourceEntity.get();
            long lowWm = -1L;
            LongWatermark waterMarkObj = (LongWatermark)workUnitState.getWorkunit().getLowWatermark(LongWatermark.class);
            if (waterMarkObj != null) {
                lowWm = waterMarkObj.getValue();
            } else if (workUnitState.getProperties().containsKey("workunit.low.water.mark")) {
                lowWm = Long.parseLong(workUnitState.getProperties().getProperty("workunit.low.water.mark"));
                log.warn("can not find low water mark in json format, getting value from workunit.low.water.mark low water mark " + lowWm);
            }
            if (!prevLowWatermarksByTable.containsKey(table)) {
                prevLowWatermarksByTable.put(table, lowWm);
            } else {
                prevLowWatermarksByTable.put(table, Math.min((Long)prevLowWatermarksByTable.get(table), lowWm));
            }
            long highWm = -1L;
            waterMarkObj = (LongWatermark)workUnitState.getActualHighWatermark(LongWatermark.class);
            if (waterMarkObj != null) {
                highWm = waterMarkObj.getValue();
            } else if (workUnitState.getProperties().containsKey("workunit.state.runtime.high.water.mark")) {
                highWm = Long.parseLong(workUnitState.getProperties().getProperty("workunit.state.runtime.high.water.mark"));
                log.warn("can not find high water mark in json format, getting value from workunit.state.runtime.high.water.mark high water mark " + highWm);
            }
            if (!prevActualHighWatermarksByTable.containsKey(table)) {
                prevActualHighWatermarksByTable.put(table, highWm);
            } else {
                prevActualHighWatermarksByTable.put(table, Math.max((Long)prevActualHighWatermarksByTable.get(table), highWm));
            }
            if (commitOnFullSuccess && !QueryBasedSource.isSuccessfulOrCommited(workUnitState)) {
                tablesWithFailedTasks.add(table);
            }
            if (QueryBasedSource.isAnyDataProcessed(workUnitState)) continue;
            tablesWithNoUpdatesOnPreviousRun.add(table);
        }
        for (Map.Entry entry : prevLowWatermarksByTable.entrySet()) {
            if (tablesWithFailedTasks.contains(entry.getKey())) {
                log.info("Resetting low watermark to {} because previous run failed.", entry.getValue());
                result.put(entry.getKey(), entry.getValue());
                continue;
            }
            if (tablesWithNoUpdatesOnPreviousRun.contains(entry.getKey()) && state.getPropAsBoolean("source.querybased.resetEmptyPartitionWatermark", true)) {
                log.info("Resetting low watermakr to {} because previous run processed no data.", entry.getValue());
                result.put(entry.getKey(), entry.getValue());
                continue;
            }
            result.put(entry.getKey(), prevActualHighWatermarksByTable.get(entry.getKey()));
        }
        return result;
    }

    private static boolean isSuccessfulOrCommited(WorkUnitState wus) {
        return wus.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL || wus.getWorkingState() == WorkUnitState.WorkingState.COMMITTED;
    }

    private static boolean isAnyDataProcessed(WorkUnitState wus) {
        return wus.getPropAsLong("qualitychecker.rows.expected", 0L) > 0L;
    }

    private static void initLogger(SourceState state) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        sb.append(StringUtils.stripToEmpty((String)state.getProp("source.querybased.schema")));
        sb.append("_");
        sb.append(StringUtils.stripToEmpty((String)state.getProp("source.entity")));
        sb.append("]");
        MDC.put((String)"sourceInfo", (String)sb.toString());
    }

    public static final class SourceEntity {
        private final String sourceEntityName;
        private final String destTableName;

        public String getDatasetName() {
            return this.sourceEntityName;
        }

        static String sanitizeEntityName(String entity) {
            return Utils.escapeSpecialCharacters(entity, "$,&", "_");
        }

        public static SourceEntity fromSourceEntityName(String sourceEntityName) {
            return new SourceEntity(sourceEntityName, SourceEntity.sanitizeEntityName(sourceEntityName));
        }

        public static Optional<SourceEntity> fromState(State state) {
            String destTableName;
            String sourceEntityName;
            if (state.contains("source.entity")) {
                sourceEntityName = state.getProp("source.entity");
                destTableName = state.getProp("extract.table.name", SourceEntity.sanitizeEntityName(sourceEntityName));
            } else if (state.contains("extract.table.name")) {
                sourceEntityName = destTableName = state.getProp("extract.table.name");
            } else {
                return Optional.absent();
            }
            return Optional.of((Object)new SourceEntity(sourceEntityName, destTableName));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            SourceEntity other = (SourceEntity)obj;
            return !(this.getDatasetName() == null ? other.getDatasetName() != null : !this.getDatasetName().equals(other.getDatasetName()));
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this.getDatasetName() == null ? 0 : this.getDatasetName().hashCode());
            return result;
        }

        public SourceEntity(String sourceEntityName, String destTableName) {
            this.sourceEntityName = sourceEntityName;
            this.destTableName = destTableName;
        }

        public String getSourceEntityName() {
            return this.sourceEntityName;
        }

        public String getDestTableName() {
            return this.destTableName;
        }

        public String toString() {
            return "QueryBasedSource.SourceEntity(sourceEntityName=" + this.getSourceEntityName() + ", destTableName=" + this.getDestTableName() + ")";
        }
    }
}

