package org.wso2.carbon.analytics.spark.event;

import java.io.Serializable;
import java.util.ArrayList;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/wso2/carbon/analytics/spark/event/EventIteratorFunction.class */
public class EventIteratorFunction extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable {
    private static final long serialVersionUID = 4048303072566432397L;
    private int tenantId;
    private String streamId;
    private StructType sch;
    private boolean globalTenantAccess;

    public EventIteratorFunction(int i, String str, StructType structType, boolean z) {
        this.tenantId = i;
        this.streamId = str;
        this.sch = structType;
        this.globalTenantAccess = z;
    }

    public BoxedUnit apply(Iterator<Row> iterator) {
        ArrayList arrayList = new ArrayList(1000);
        while (iterator.hasNext()) {
            try {
                arrayList.add(createEventRecordFromRow((Row) iterator.next()));
                if (arrayList.size() % 1000 == 0) {
                    EventStreamDataStore.addToStore(arrayList);
                    arrayList.clear();
                }
            } catch (AnalyticsException e) {
                throw new RuntimeException("Error in writing event store entires: " + e.getMessage(), e);
            }
        }
        EventStreamDataStore.addToStore(arrayList);
        return BoxedUnit.UNIT;
    }

    private EventRecord createEventRecordFromRow(Row row) {
        String[] fieldNames = this.sch.fieldNames();
        boolean z = false;
        int i = this.tenantId;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (int i2 = 0; i2 < row.length(); i2++) {
            if (this.globalTenantAccess && fieldNames[i2].equals("_tenantId")) {
                i = row.getInt(i2);
                z = true;
            } else if (fieldNames[i2].startsWith(EventingConstants.EVENT_META_DATA_PREFIX)) {
                arrayList2.add(row.get(i2));
            } else if (fieldNames[i2].startsWith(EventingConstants.EVENT_CORRELATION_DATA_PREFIX)) {
                arrayList3.add(row.get(i2));
            } else {
                arrayList.add(row.get(i2));
            }
        }
        if (!this.globalTenantAccess || z) {
            return new EventRecord(i, this.streamId, arrayList, arrayList2, arrayList3);
        }
        throw new RuntimeException("The field '_tenantId' is not found in row: " + row + " with schema: " + this.sch + " when creating a global tenant access record");
    }
}
