package acromusashi.stream.bolt.hbase;

import acromusashi.stream.bolt.AmConfigurationBolt;
import acromusashi.stream.camel.CamelInitializer;
import acromusashi.stream.converter.AbstractMessageConverter;
import acromusashi.stream.entity.StreamMessage;
import acromusashi.stream.exception.ConvertFailException;
import acromusashi.stream.exception.InitFailException;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.hbase.model.HBaseCell;
import org.apache.camel.component.hbase.model.HBaseData;
import org.apache.camel.component.hbase.model.HBaseRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:acromusashi/stream/bolt/hbase/CamelHbaseStoreBolt.class */
public class CamelHbaseStoreBolt extends AmConfigurationBolt {
    private static final long serialVersionUID = -668373233969623288L;
    private static final Logger logger = LoggerFactory.getLogger(CamelHbaseStoreBolt.class);
    private String contextUri;
    private String defaultEndPoint = "direct:hbase";
    private List<CellDefine> cellDefineList;
    private transient ProducerTemplate producerTemplate;
    protected AbstractMessageConverter converter;

    @Override // acromusashi.stream.bolt.AmConfigurationBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        try {
            this.producerTemplate = CamelInitializer.generateTemplete(this.contextUri);
        } catch (Exception e) {
            logger.error("Failure to get ProducerTemplate.", e);
            throw new InitFailException(e);
        }
    }

    @Override // acromusashi.stream.bolt.AmConfigurationBolt
    public void onMessage(StreamMessage streamMessage) {
        try {
            Map<String, Object> map = this.converter.toMap(streamMessage);
            HBaseData hBaseData = new HBaseData();
            HBaseRow hBaseRow = new HBaseRow();
            hBaseRow.setId(map.get("timestamp").toString() + "_" + map.get("source").toString());
            List list = (List) map.get("body");
            for (int i = 0; i < list.size(); i++) {
                CellDefine cellDefine = this.cellDefineList.get(i);
                HBaseCell hBaseCell = new HBaseCell();
                hBaseCell.setFamily(cellDefine.family);
                hBaseCell.setQualifier(cellDefine.qualifier);
                hBaseCell.setValue(list.get(i));
                hBaseRow.getCells().add(hBaseCell);
            }
            hBaseData.getRows().add(hBaseRow);
            insert(hBaseData);
            ack();
        } catch (ConvertFailException e) {
            logger.warn(MessageFormat.format("Fail convert to hbase record. Dispose received message. : Message={0}", streamMessage), e);
            ack();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    protected void insert(HBaseData hBaseData) {
        this.producerTemplate.sendBody(this.defaultEndPoint, hBaseData);
    }

    public void setApplicationContextUri(String str) {
        this.contextUri = str;
    }

    public void setCellDefineList(List<CellDefine> list) {
        this.cellDefineList = list;
    }

    public void setConverter(AbstractMessageConverter abstractMessageConverter) {
        this.converter = abstractMessageConverter;
    }
}
