package org.wso2.carbon.analytics.spark.core.sources;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.Dependency;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.sources.InsertableRelation;
import org.apache.spark.sql.sources.TableScan;
import org.apache.spark.sql.types.StructType;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataService;
import org.wso2.carbon.analytics.datasource.commons.AnalyticsSchema;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsTableNotAvailableException;
import org.wso2.carbon.analytics.spark.core.internal.ServiceHolder;
import org.wso2.carbon.analytics.spark.core.rdd.AnalyticsRDD;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsCommonUtils;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsConstants;
import org.wso2.carbon.analytics.spark.core.util.CarbonScalaUtils;
import org.wso2.carbon.analytics.spark.core.util.IncrementalUtils;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

/* loaded from: input_file:org/wso2/carbon/analytics/spark/core/sources/AnalyticsRelation.class */
public class AnalyticsRelation extends BaseRelation implements TableScan, InsertableRelation, Serializable {
    private static final long serialVersionUID = -7773419083178608517L;
    private static final Log log = LogFactory.getLog(AnalyticsRelation.class);
    private SQLContext sqlContext;
    private StructType schema;
    private int tenantId;
    private String tableName;
    private String recordStore;
    private boolean incEnable;
    private String incID;
    private int incBuffer;
    private boolean globalTenantAccess;
    private AnalyticsConstants.IncrementalWindowUnit windowUnit;
    private String schemaString;
    private String primaryKeys;
    private boolean mergeFlag;

    public AnalyticsRelation() {
    }

    public AnalyticsRelation(int i, String str, String str2, SQLContext sQLContext, StructType structType, String str3, boolean z, String str4, String str5, boolean z2) {
        this.tenantId = i;
        this.tableName = str2;
        this.recordStore = str;
        this.sqlContext = sQLContext;
        this.schema = structType;
        setIncParams(str3);
        this.globalTenantAccess = z;
        this.schemaString = str4;
        this.primaryKeys = str5;
        this.mergeFlag = z2;
    }

    private void setIncParams(String str) {
        if (str.isEmpty()) {
            logDebug("Incremental processing disabled");
            this.incEnable = false;
            return;
        }
        this.incEnable = true;
        logDebug("Incremental processing enabled. Setting incremental parameters " + str);
        String[] split = str.split("\\s*,\\s*");
        if (split.length == 1) {
            this.incID = split[0];
            return;
        }
        if (split.length == 2) {
            this.incID = split[0];
            this.windowUnit = AnalyticsConstants.IncrementalWindowUnit.valueOf(split[1].toUpperCase());
            this.incBuffer = 1;
        } else {
            if (split.length != 3) {
                String str2 = "Error while setting incremental processing parameters : " + str;
                log.error(str2);
                throw new RuntimeException(str2);
            }
            this.incID = split[0];
            this.windowUnit = AnalyticsConstants.IncrementalWindowUnit.valueOf(split[1].toUpperCase());
            this.incBuffer = Integer.parseInt(split[2]);
        }
    }

    public RDD<Row> buildScan() {
        long lastProcessedTimestamp;
        int i;
        if (AnalyticsCommonUtils.isEmptySchema(this.schema)) {
            String str = "Unable to scan through the table as the schema is unavailable for " + this.tableName;
            log.error(str);
            throw new RuntimeException(str);
        }
        if (this.incEnable) {
            try {
                lastProcessedTimestamp = ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(this.tenantId, this.incID, true);
                if (lastProcessedTimestamp > 0) {
                    lastProcessedTimestamp = this.windowUnit != null ? IncrementalUtils.getIncrementalStartTime(lastProcessedTimestamp, this.windowUnit, this.incBuffer) : lastProcessedTimestamp + 1;
                }
            } catch (AnalyticsException e) {
                throw new RuntimeException((Throwable) e);
            }
        } else {
            lastProcessedTimestamp = Long.MIN_VALUE;
        }
        if (!this.globalTenantAccess) {
            i = this.tenantId;
        } else {
            if (this.tenantId != -1234) {
                throw new RuntimeException("Global tenant read can only be done by the super tenant");
            }
            i = -2000;
        }
        return getAnalyticsRDD(i, this.tableName, new ArrayList(Arrays.asList(this.schema.fieldNames())), this.sqlContext.sparkContext(), (Seq) Seq$.MODULE$.empty(), ClassTag$.MODULE$.apply(Row.class), lastProcessedTimestamp, Long.MAX_VALUE, this.incEnable, this.incID);
    }

    private void logDebug(String str) {
        if (log.isDebugEnabled()) {
            log.debug(str);
        }
    }

    public SQLContext sqlContext() {
        return this.sqlContext;
    }

    public StructType schema() {
        if (AnalyticsCommonUtils.isEmptySchema(this.schema)) {
            log.warn("No schema is available for table " + this.tableName);
        }
        return this.schema;
    }

    public void insert(DataFrame dataFrame, boolean z) {
        int i;
        AnalyticsSchema analyticsSchema;
        AnalyticsDataService analyticsDataService = ServiceHolder.getAnalyticsDataService();
        if (!this.globalTenantAccess) {
            i = this.tenantId;
        } else {
            if (this.tenantId != -1234) {
                throw new RuntimeException("Global tenant write can only be done by the super tenant");
            }
            i = -2000;
        }
        try {
            try {
                analyticsSchema = analyticsDataService.getTableSchema(i, this.tableName);
            } catch (AnalyticsException e) {
                String str = "Error while inserting data into table " + this.tableName + " : " + e.getMessage();
                log.error(str, e);
                throw new RuntimeException(str, e);
            }
        } catch (AnalyticsTableNotAvailableException e2) {
            analyticsSchema = null;
        }
        if (z && !AnalyticsCommonUtils.isEmptyAnalyticsSchema(analyticsSchema)) {
            analyticsDataService.deleteTable(i, this.tableName);
            if (!analyticsDataService.listRecordStoreNames().contains(this.recordStore)) {
                throw new RuntimeException("Unknown record store name " + this.recordStore);
            }
            analyticsDataService.createTable(i, this.recordStore, this.tableName);
            analyticsDataService.setTableSchema(i, this.tableName, analyticsSchema);
        }
        writeDataFrameToDAL(dataFrame);
    }

    private void writeDataFrameToDAL(DataFrame dataFrame) {
        for (int i = 0; i < dataFrame.rdd().partitions().length; i++) {
            dataFrame.sqlContext().sparkContext().runJob(dataFrame.rdd(), new AnalyticsWritingFunction(this.tenantId, this.tableName, dataFrame.schema(), this.globalTenantAccess, this.schemaString, this.primaryKeys, this.mergeFlag, this.recordStore), CarbonScalaUtils.getNumberSeq(i, i + 1), false, ClassTag$.MODULE$.Unit());
        }
    }

    protected AnalyticsRDD getAnalyticsRDD(int i, String str, List<String> list, SparkContext sparkContext, Seq<Dependency<?>> seq, ClassTag<Row> classTag, long j, long j2, boolean z, String str2) {
        return new AnalyticsRDD(i, str, list, sparkContext, seq, classTag, j, j2, z, str2);
    }
}
