package org.apache.eagle.dataproc.impl.persist;

import com.typesafe.config.Config;
import java.text.MessageFormat;
import java.util.List;
import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
import org.apache.eagle.dataproc.impl.persist.druid.KafkaPersistService;
import org.apache.eagle.datastream.Collector;
import org.apache.eagle.datastream.JavaStormStreamExecutor2;
import org.apache.eagle.datastream.core.StorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/eagle/dataproc/impl/persist/PersistExecutor.class */
public class PersistExecutor extends JavaStormStreamExecutor2<String, AggregateEntity> {
    private static final Logger LOG = LoggerFactory.getLogger(PersistExecutor.class);
    private Config config;
    private IPersistService<AggregateEntity> persistService;
    private String persistExecutorId;
    private String persistType;

    public PersistExecutor(String str, String str2) {
        this.persistExecutorId = str;
        this.persistType = str2;
    }

    public void prepareConfig(Config config) {
        this.config = config;
    }

    public void init() {
        if (!this.persistType.equalsIgnoreCase(StorageType.KAFKA().toString())) {
            throw new RuntimeException(String.format("Persist type '%s' not supported yet!", this.persistService));
        }
        this.persistService = new KafkaPersistService(this.config.getConfig("persistExecutorConfigs." + this.persistExecutorId));
    }

    public void flatMap(List<Object> list, Collector<Tuple2<String, AggregateEntity>> collector) {
        if (list.size() != 2) {
            LOG.error(String.format("Persist executor expect two elements per tuple. But actually got size %d lists!", Integer.valueOf(list.size())));
            return;
        }
        AggregateEntity aggregateEntity = (AggregateEntity) list.get(1);
        try {
            this.persistService.save("defaultOutput", aggregateEntity);
        } catch (Exception e) {
            LOG.error(MessageFormat.format("persist entity failed: {0}", aggregateEntity), e);
        }
    }
}
