package org.wso2.carbon.analytics.spark.core.sources;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataService;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl;
import org.wso2.carbon.analytics.datasource.commons.Record;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsTableNotAvailableException;
import org.wso2.carbon.analytics.datasource.core.util.GenericUtils;
import org.wso2.carbon.analytics.spark.core.internal.ServiceHolder;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsCommonUtils;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsConstants;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/wso2/carbon/analytics/spark/core/sources/AnalyticsWritingFunction.class */
public class AnalyticsWritingFunction extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable {
    private static final long serialVersionUID = -1919222653470217466L;
    private static final Log log = LogFactory.getLog(AnalyticsWritingFunction.class);
    private int tId;
    private String tName;
    private StructType sch;
    private boolean globalTenantAccess;
    private String schemaString;
    private String primaryKeys;
    private boolean mergeFlag;
    private String recordStore;

    public AnalyticsWritingFunction(int i, String str, StructType structType, boolean z, String str2, String str3, boolean z2, String str4) {
        this.tId = i;
        this.tName = str;
        this.sch = structType;
        this.globalTenantAccess = z;
        this.schemaString = str2;
        this.primaryKeys = str3;
        this.mergeFlag = z2;
        this.recordStore = str4;
    }

    private void handleAnalyticsTableSchemaInvalidation() {
        AnalyticsDataServiceImpl analyticsDataService = ServiceHolder.getAnalyticsDataService();
        if (analyticsDataService instanceof AnalyticsDataServiceImpl) {
            analyticsDataService.invalidateAnalyticsTableInfo(this.globalTenantAccess ? -2000 : this.tId, this.tName);
        }
    }

    public BoxedUnit apply(Iterator<Row> iterator) {
        ArrayList arrayList = new ArrayList(AnalyticsConstants.MAX_RECORDS);
        handleAnalyticsTableSchemaInvalidation();
        while (iterator.hasNext()) {
            if (arrayList.size() < 1000) {
                arrayList.add((Row) iterator.next());
                if (arrayList.size() == 1000) {
                    recordsPut(arrayList);
                    arrayList.clear();
                }
            }
        }
        if (!arrayList.isEmpty()) {
            recordsPut(arrayList);
        }
        return BoxedUnit.UNIT;
    }

    private void recordsPut(List<Row> list) {
        if (this.globalTenantAccess) {
            recordsPutGlobal(list);
        } else {
            recordsPutNormal(list);
        }
    }

    private void recordsPutNormal(List<Row> list) {
        ArrayList arrayList = new ArrayList(list.size());
        java.util.Iterator<Row> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(convertRowAndSchemaToRecord(it.next(), this.sch, false));
        }
        try {
            ServiceHolder.getAnalyticsDataService().put(arrayList);
        } catch (AnalyticsException e) {
            String str = "Error while inserting data into table " + this.tName + ": " + e.getMessage();
            log.error(str, e);
            throw new RuntimeException(str, e);
        }
    }

    private void recordsPutGlobal(List<Row> list) {
        ArrayList arrayList = new ArrayList(list.size());
        java.util.Iterator<Row> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(convertRowAndSchemaToRecord(it.next(), this.sch, true));
        }
        AnalyticsDataService analyticsDataService = ServiceHolder.getAnalyticsDataService();
        try {
            for (List list2 : GenericUtils.generateRecordBatches(arrayList)) {
                try {
                    analyticsDataService.put(list2);
                } catch (AnalyticsTableNotAvailableException e) {
                    Record record = (Record) list2.get(0);
                    createTargetTableAndSetSchema(analyticsDataService, record.getTenantId(), record.getTableName());
                    analyticsDataService.put(list2);
                }
            }
        } catch (AnalyticsException e2) {
            String str = "Error while inserting data into table " + this.tName + ": " + e2.getMessage();
            log.error(str, e2);
            throw new RuntimeException(str, e2);
        }
    }

    protected void createTargetTableAndSetSchema(AnalyticsDataService analyticsDataService, int i, String str) throws AnalyticsException {
        AnalyticsCommonUtils.createTableIfNotExists(analyticsDataService, this.recordStore, i, str);
        analyticsDataService.setTableSchema(i, str, AnalyticsCommonUtils.createAnalyticsTableSchema(analyticsDataService, i, str, this.schemaString, this.primaryKeys, this.globalTenantAccess, this.mergeFlag, false));
    }

    private Record convertRowAndSchemaToRecord(Row row, StructType structType, boolean z) {
        String[] fieldNames = structType.fieldNames();
        HashMap hashMap = new HashMap();
        long j = -1;
        int i = this.tId;
        boolean z2 = false;
        for (int i2 = 0; i2 < row.length(); i2++) {
            if (fieldNames[i2].equals(AnalyticsConstants.TIMESTAMP_FIELD)) {
                j = row.getLong(i2);
            } else if (z && fieldNames[i2].equals(AnalyticsConstants.TENANT_ID_FIELD)) {
                i = row.getInt(i2);
                z2 = true;
            } else {
                hashMap.put(fieldNames[i2], row.get(i2));
            }
        }
        if (!z || z2) {
            return j < 0 ? new Record(i, this.tName, hashMap) : new Record(i, this.tName, hashMap, j);
        }
        throw new RuntimeException("The field '_tenantId' is not found in row: " + row + " with schema: " + structType + " when creating a global tenant access record");
    }
}
