package org.wso2.carbon.analytics.spark.event;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.wso2.carbon.databridge.agent.DataPublisher;
import org.wso2.carbon.databridge.commons.Event;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/wso2/carbon/analytics/spark/event/EventIteratorFunction.class */
public class EventIteratorFunction extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable {
    private static final Log log = LogFactory.getLog(EventIteratorFunction.class);
    private static final long serialVersionUID = 4048303072566432397L;
    private int tenantId;
    private StructType schema;
    private String streamId;
    private String receiverURLSet;
    private String authURLSet;
    private String username;
    private String password;
    private DataPublisher dataPublisher;

    public EventIteratorFunction(int i, String str, StructType structType, String str2, String str3, String str4, String str5) {
        this.tenantId = i;
        this.streamId = str;
        this.schema = structType;
        this.receiverURLSet = str2;
        this.authURLSet = str3;
        this.username = str4;
        this.password = str5;
    }

    public BoxedUnit apply(Iterator<Row> iterator) {
        this.dataPublisher = DataPublisherHolder.getInstance().getDataPublisher(this.receiverURLSet, this.authURLSet, this.username, this.password);
        while (iterator.hasNext()) {
            Row row = (Row) iterator.next();
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < row.length(); i++) {
                arrayList.add(row.get(i));
            }
            if (this.dataPublisher != null) {
                this.dataPublisher.publish(buildEvent(arrayList));
            } else if (log.isDebugEnabled()) {
                log.debug("Publisher is not initialized properly. Couldn't publish events to streamId: " + this.streamId);
            }
            if (log.isDebugEnabled()) {
                log.debug("Published event to streamId: " + this.streamId);
            }
        }
        return BoxedUnit.UNIT;
    }

    private Event buildEvent(List<Object> list) {
        Event event = new Event();
        event.setTimeStamp(System.currentTimeMillis());
        event.setStreamId(this.streamId);
        event.setPayloadData(list.toArray());
        return event;
    }
}
