/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.source.extractor.partition;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.ExtractType;
import org.apache.gobblin.source.extractor.partition.AppendMaxLimitType;
import org.apache.gobblin.source.extractor.partition.Partition;
import org.apache.gobblin.source.extractor.utils.Utils;
import org.apache.gobblin.source.extractor.watermark.DateWatermark;
import org.apache.gobblin.source.extractor.watermark.HourWatermark;
import org.apache.gobblin.source.extractor.watermark.SimpleWatermark;
import org.apache.gobblin.source.extractor.watermark.TimestampWatermark;
import org.apache.gobblin.source.extractor.watermark.WatermarkPredicate;
import org.apache.gobblin.source.extractor.watermark.WatermarkType;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Partitioner {
    private static final Logger LOG = LoggerFactory.getLogger(Partitioner.class);
    public static final String WATERMARKTIMEFORMAT = "yyyyMMddHHmmss";
    public static final String HAS_USER_SPECIFIED_PARTITIONS = "partitioner.hasUserSpecifiedPartitions";
    public static final String USER_SPECIFIED_PARTITIONS = "partitioner.userSpecifiedPartitions";
    public static final Comparator<Partition> ascendingComparator = new Comparator<Partition>(){

        @Override
        public int compare(Partition p1, Partition p2) {
            if (p1 == null && p2 == null) {
                return 0;
            }
            if (p1 == null) {
                return -1;
            }
            if (p2 == null) {
                return 1;
            }
            return Long.compare(p1.getLowWatermark(), p2.getLowWatermark());
        }
    };
    private SourceState state;
    @VisibleForTesting
    protected boolean hasUserSpecifiedHighWatermark;

    public Partitioner(SourceState state) {
        this.state = state;
        this.hasUserSpecifiedHighWatermark = false;
    }

    public Partition getGlobalPartition(long previousWatermark) {
        ExtractType extractType = ExtractType.valueOf(this.state.getProp("source.querybased.extract.type").toUpperCase());
        WatermarkType watermarkType = WatermarkType.valueOf(this.state.getProp("source.querybased.watermark.type", "timestamp").toUpperCase());
        WatermarkPredicate watermark = new WatermarkPredicate(null, watermarkType);
        int deltaForNextWatermark = watermark.getDeltaNumForNextWatermark();
        long lowWatermark = this.getLowWatermark(extractType, watermarkType, previousWatermark, deltaForNextWatermark);
        long highWatermark = this.getHighWatermark(extractType, watermarkType);
        return new Partition(lowWatermark, highWatermark, true, this.hasUserSpecifiedHighWatermark);
    }

    @Deprecated
    public HashMap<Long, Long> getPartitions(long previousWatermark) {
        HashMap defaultPartition = Maps.newHashMap();
        if (!this.isWatermarkExists()) {
            defaultPartition.put(-1L, -1L);
            LOG.info("Watermark column or type not found - Default partition with low watermark and high watermark as -1");
            return defaultPartition;
        }
        ExtractType extractType = ExtractType.valueOf(this.state.getProp("source.querybased.extract.type").toUpperCase());
        WatermarkType watermarkType = WatermarkType.valueOf(this.state.getProp("source.querybased.watermark.type", "timestamp").toUpperCase());
        int interval = Partitioner.getUpdatedInterval(this.state.getPropAsInt("source.querybased.partition.interval", 0), extractType, watermarkType);
        int sourceMaxAllowedPartitions = this.state.getPropAsInt("source.max.number.of.partitions", 0);
        int maxPartitions = sourceMaxAllowedPartitions != 0 ? sourceMaxAllowedPartitions : 20;
        WatermarkPredicate watermark = new WatermarkPredicate(null, watermarkType);
        int deltaForNextWatermark = watermark.getDeltaNumForNextWatermark();
        LOG.info("is watermark override: " + this.isWatermarkOverride());
        LOG.info("is full extract: " + this.isFullDump());
        long lowWatermark = this.getLowWatermark(extractType, watermarkType, previousWatermark, deltaForNextWatermark);
        long highWatermark = this.getHighWatermark(extractType, watermarkType);
        if (lowWatermark == -1L || highWatermark == -1L) {
            LOG.info("Low watermark or high water mark is not found. Hence cannot generate partitions - Default partition with low watermark:  " + lowWatermark + " and high watermark: " + highWatermark);
            defaultPartition.put(lowWatermark, highWatermark);
            return defaultPartition;
        }
        LOG.info("Generate partitions with low watermark: " + lowWatermark + "; high watermark: " + highWatermark + "; partition interval in hours: " + interval + "; Maximum number of allowed partitions: " + maxPartitions);
        return watermark.getPartitions(lowWatermark, highWatermark, interval, maxPartitions);
    }

    public List<Partition> getPartitionList(long previousWatermark) {
        if (this.state.getPropAsBoolean(HAS_USER_SPECIFIED_PARTITIONS)) {
            return this.createUserSpecifiedPartitions();
        }
        ArrayList<Partition> partitions = new ArrayList<Partition>();
        HashMap<Long, Long> partitionMap = this.getPartitions(previousWatermark);
        Long highestWatermark = Collections.max(partitionMap.values());
        for (Map.Entry<Long, Long> entry : partitionMap.entrySet()) {
            Long partitionHighWatermark = entry.getValue();
            if (partitionHighWatermark.equals(highestWatermark)) {
                partitions.add(new Partition(entry.getKey(), partitionHighWatermark, true, this.hasUserSpecifiedHighWatermark));
                continue;
            }
            partitions.add(new Partition(entry.getKey(), partitionHighWatermark, false));
        }
        return partitions;
    }

    private List<Partition> createUserSpecifiedPartitions() {
        int i;
        ArrayList<Partition> partitions = new ArrayList<Partition>();
        List watermarkPoints = this.state.getPropAsList(USER_SPECIFIED_PARTITIONS);
        if (watermarkPoints == null || watermarkPoints.size() == 0) {
            LOG.info("There should be some partition points");
            long defaultWatermark = -1L;
            partitions.add(new Partition(defaultWatermark, defaultWatermark, true, true));
            return partitions;
        }
        WatermarkType watermarkType = WatermarkType.valueOf(this.state.getProp("source.querybased.watermark.type", "timestamp").toUpperCase());
        long lowWatermark = Partitioner.adjustWatermark((String)watermarkPoints.get(0), watermarkType);
        long highWatermark = -1L;
        if (watermarkPoints.size() == 1) {
            if (watermarkType != WatermarkType.SIMPLE) {
                String timeZone = this.state.getProp("source.timezone");
                String currentTime = Utils.dateTimeToString(this.getCurrentTime(timeZone), WATERMARKTIMEFORMAT, timeZone);
                highWatermark = Partitioner.adjustWatermark(currentTime, watermarkType);
            }
            partitions.add(new Partition(lowWatermark, highWatermark, true, false));
            return partitions;
        }
        for (i = 1; i < watermarkPoints.size() - 1; ++i) {
            highWatermark = Partitioner.adjustWatermark((String)watermarkPoints.get(i), watermarkType);
            partitions.add(new Partition(lowWatermark, highWatermark, true));
            lowWatermark = highWatermark;
        }
        highWatermark = Partitioner.adjustWatermark((String)watermarkPoints.get(i), watermarkType);
        ExtractType extractType = ExtractType.valueOf(this.state.getProp("source.querybased.extract.type").toUpperCase());
        if (this.isFullDump() || Partitioner.isSnapshot(extractType)) {
            partitions.add(new Partition(lowWatermark, highWatermark, true, false));
        } else {
            partitions.add(new Partition(lowWatermark, highWatermark, true, true));
        }
        return partitions;
    }

    private static long adjustWatermark(String baseWatermark, WatermarkType watermarkType) {
        long result = -1L;
        switch (watermarkType) {
            case SIMPLE: {
                result = SimpleWatermark.adjustWatermark(baseWatermark, 0);
                break;
            }
            case DATE: {
                result = DateWatermark.adjustWatermark(baseWatermark, 0);
                break;
            }
            case HOUR: {
                result = HourWatermark.adjustWatermark(baseWatermark, 0);
                break;
            }
            case TIMESTAMP: {
                result = TimestampWatermark.adjustWatermark(baseWatermark, 0);
            }
        }
        return result;
    }

    private static int getUpdatedInterval(int inputInterval, ExtractType extractType, WatermarkType watermarkType) {
        LOG.debug("Getting updated interval");
        if (extractType == ExtractType.SNAPSHOT && watermarkType == WatermarkType.DATE) {
            return inputInterval * 24;
        }
        if (extractType == ExtractType.APPEND_DAILY) {
            return (inputInterval < 1 ? 1 : inputInterval) * 24;
        }
        return inputInterval;
    }

    @VisibleForTesting
    protected long getLowWatermark(ExtractType extractType, WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) {
        long lowWatermark = -1L;
        if (this.isFullDump() || this.isWatermarkOverride()) {
            String timeZone = this.state.getProp("source.timezone", "America/Los_Angeles");
            lowWatermark = Utils.getLongWithCurrentDate(this.state.getProp("source.querybased.start.value"), timeZone);
            LOG.info("Overriding low water mark with the given start value: " + lowWatermark);
        } else {
            lowWatermark = Partitioner.isSnapshot(extractType) ? this.getSnapshotLowWatermark(watermarkType, previousWatermark, deltaForNextWatermark) : this.getAppendLowWatermark(watermarkType, previousWatermark, deltaForNextWatermark);
        }
        return lowWatermark == 0L ? -1L : lowWatermark;
    }

    private long getSnapshotLowWatermark(WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) {
        LOG.debug("Getting snapshot low water mark");
        String timeZone = this.state.getProp("source.timezone", "America/Los_Angeles");
        if (Partitioner.isPreviousWatermarkExists(previousWatermark)) {
            if (Partitioner.isSimpleWatermark(watermarkType)) {
                return previousWatermark + (long)deltaForNextWatermark - (long)this.state.getPropAsInt("source.querybased.low.watermark.backup.secs", 0);
            }
            DateTime wm = Utils.toDateTime(previousWatermark, WATERMARKTIMEFORMAT, timeZone).plusSeconds(deltaForNextWatermark - this.state.getPropAsInt("source.querybased.low.watermark.backup.secs", 0));
            return Long.parseLong(Utils.dateTimeToString(wm, WATERMARKTIMEFORMAT, timeZone));
        }
        long startValue = Utils.getLongWithCurrentDate(this.state.getProp("source.querybased.start.value"), timeZone);
        LOG.info("Overriding low water mark with the given start value: " + startValue);
        return startValue;
    }

    private long getAppendLowWatermark(WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) {
        LOG.debug("Getting append low water mark");
        String timeZone = this.state.getProp("source.timezone");
        if (Partitioner.isPreviousWatermarkExists(previousWatermark)) {
            if (Partitioner.isSimpleWatermark(watermarkType)) {
                return previousWatermark + (long)deltaForNextWatermark;
            }
            DateTime wm = Utils.toDateTime(previousWatermark, WATERMARKTIMEFORMAT, timeZone).plusSeconds(deltaForNextWatermark);
            return Long.parseLong(Utils.dateTimeToString(wm, WATERMARKTIMEFORMAT, timeZone));
        }
        LOG.info("Overriding low water mark with start value: source.querybased.start.value");
        return Utils.getLongWithCurrentDate(this.state.getProp("source.querybased.start.value"), timeZone);
    }

    @VisibleForTesting
    protected long getHighWatermark(ExtractType extractType, WatermarkType watermarkType) {
        LOG.debug("Getting high watermark");
        String timeZone = this.state.getProp("source.timezone");
        long highWatermark = -1L;
        if (this.isWatermarkOverride()) {
            highWatermark = this.state.getPropAsLong("source.querybased.end.value", 0L);
            if (highWatermark == 0L) {
                highWatermark = Long.parseLong(Utils.dateTimeToString(this.getCurrentTime(timeZone), WATERMARKTIMEFORMAT, timeZone));
            } else {
                this.hasUserSpecifiedHighWatermark = true;
            }
            LOG.info("Overriding high water mark with the given end value:" + highWatermark);
        } else {
            highWatermark = Partitioner.isSnapshot(extractType) ? this.getSnapshotHighWatermark(watermarkType) : this.getAppendHighWatermark(extractType);
        }
        return highWatermark == 0L ? -1L : highWatermark;
    }

    private long getSnapshotHighWatermark(WatermarkType watermarkType) {
        LOG.debug("Getting snapshot high water mark");
        if (Partitioner.isSimpleWatermark(watermarkType)) {
            return -1L;
        }
        String timeZone = this.state.getProp("source.timezone");
        return Long.parseLong(Utils.dateTimeToString(this.getCurrentTime(timeZone), WATERMARKTIMEFORMAT, timeZone));
    }

    private long getAppendHighWatermark(ExtractType extractType) {
        LOG.debug("Getting append high water mark");
        if (this.isFullDump()) {
            LOG.info("Overriding high water mark with end value:source.querybased.end.value");
            long highWatermark = this.state.getPropAsLong("source.querybased.end.value", 0L);
            if (highWatermark != 0L) {
                this.hasUserSpecifiedHighWatermark = true;
            }
            return highWatermark;
        }
        return this.getAppendWatermarkCutoff(extractType);
    }

    private long getAppendWatermarkCutoff(ExtractType extractType) {
        LOG.debug("Getting append water mark cutoff");
        long highWatermark = -1L;
        String timeZone = this.state.getProp("source.timezone");
        AppendMaxLimitType limitType = Partitioner.getAppendLimitType(extractType, this.state.getProp("source.querybased.append.max.watermark.limit"));
        if (limitType == null) {
            LOG.debug("Limit type is not found");
            return highWatermark;
        }
        int limitDelta = Partitioner.getAppendLimitDelta(this.state.getProp("source.querybased.append.max.watermark.limit"));
        if (limitDelta == 0) {
            highWatermark = Long.parseLong(Utils.dateTimeToString(this.getCurrentTime(timeZone), WATERMARKTIMEFORMAT, timeZone));
        } else {
            int seconds = 3599;
            String format = null;
            switch (limitType) {
                case CURRENTDATE: {
                    format = "yyyyMMdd";
                    limitDelta = limitDelta * 24 * 60 * 60;
                    seconds = 86399;
                    break;
                }
                case CURRENTHOUR: {
                    format = "yyyyMMddHH";
                    limitDelta = limitDelta * 60 * 60;
                    seconds = 3599;
                    break;
                }
                case CURRENTMINUTE: {
                    format = "yyyyMMddHHmm";
                    limitDelta *= 60;
                    seconds = 59;
                    break;
                }
                case CURRENTSECOND: {
                    format = WATERMARKTIMEFORMAT;
                    seconds = 0;
                    break;
                }
            }
            DateTime deltaTime = this.getCurrentTime(timeZone).minusSeconds(limitDelta);
            DateTime previousTime = Utils.toDateTime(Utils.dateTimeToString(deltaTime, format, timeZone), format, timeZone).plusSeconds(seconds);
            highWatermark = Long.parseLong(Utils.dateTimeToString(previousTime, WATERMARKTIMEFORMAT, timeZone));
            this.hasUserSpecifiedHighWatermark = true;
        }
        return highWatermark;
    }

    private static AppendMaxLimitType getAppendLimitType(ExtractType extractType, String maxLimit) {
        AppendMaxLimitType limitType;
        LOG.debug("Getting append limit type");
        switch (extractType) {
            case APPEND_DAILY: {
                limitType = AppendMaxLimitType.CURRENTDATE;
                break;
            }
            case APPEND_HOURLY: {
                limitType = AppendMaxLimitType.CURRENTHOUR;
                break;
            }
            default: {
                limitType = null;
            }
        }
        if (!Strings.isNullOrEmpty((String)maxLimit)) {
            LOG.debug("Getting append limit type from the config");
            String[] limitParams = maxLimit.split("-");
            if (limitParams.length >= 1) {
                limitType = AppendMaxLimitType.valueOf(limitParams[0]);
            }
        }
        return limitType;
    }

    private static int getAppendLimitDelta(String maxLimit) {
        String[] limitParams;
        LOG.debug("Getting append limit delta");
        int limitDelta = 0;
        if (!Strings.isNullOrEmpty((String)maxLimit) && (limitParams = maxLimit.split("-")).length >= 2) {
            limitDelta = Integer.parseInt(limitParams[1]);
        }
        return limitDelta;
    }

    private static boolean isPreviousWatermarkExists(long previousWatermark) {
        return previousWatermark != -1L;
    }

    private boolean isWatermarkExists() {
        return !Strings.isNullOrEmpty((String)this.state.getProp("extract.delta.fields")) && !Strings.isNullOrEmpty((String)this.state.getProp("source.querybased.watermark.type"));
    }

    private static boolean isSnapshot(ExtractType extractType) {
        return extractType == ExtractType.SNAPSHOT;
    }

    private static boolean isSimpleWatermark(WatermarkType watermarkType) {
        return watermarkType == WatermarkType.SIMPLE;
    }

    public boolean isFullDump() {
        return Boolean.valueOf(this.state.getProp("extract.is.full"));
    }

    public boolean isWatermarkOverride() {
        return Boolean.valueOf(this.state.getProp("source.querybased.is.watermark.override"));
    }

    @VisibleForTesting
    public DateTime getCurrentTime(String timeZone) {
        return Utils.getCurrentTime(timeZone);
    }
}

