package acromusashi.stream.bolt.hdfs;

import acromusashi.stream.bolt.AmConfigurationBolt;
import acromusashi.stream.entity.StreamMessage;
import acromusashi.stream.exception.InitFailException;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:acromusashi/stream/bolt/hdfs/HdfsStoreBolt.class */
public class HdfsStoreBolt extends AmConfigurationBolt {
    private static final long serialVersionUID = -2877852415844943739L;
    private static final Logger logger = LoggerFactory.getLogger(HdfsStoreBolt.class);
    private transient HdfsOutputSwitcher delegate = null;

    @Override // acromusashi.stream.bolt.AmConfigurationBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        String thisComponentId = topologyContext.getThisComponentId();
        int thisTaskId = topologyContext.getThisTaskId();
        HdfsStoreConfig hdfsStoreConfig = new HdfsStoreConfig();
        hdfsStoreConfig.setOutputUri((String) map.get("hdfsstorebolt.outputuri"));
        hdfsStoreConfig.setFileNameHeader((String) map.get("hdfsstorebolt.filenameheader"));
        hdfsStoreConfig.setFileSwitchIntarval(((Long) map.get("hdfsstorebolt.interval")).intValue());
        hdfsStoreConfig.setFileNameBody("_" + thisComponentId + "_" + thisTaskId + "_");
        boolean z = true;
        Object obj = map.get("hdfsstorebolt.executepreprocess");
        if (obj != null && (obj instanceof Boolean)) {
            z = ((Boolean) obj).booleanValue();
        }
        try {
            FileSystem fileSystem = new Path(hdfsStoreConfig.getOutputUri()).getFileSystem(new Configuration());
            if (z) {
                HdfsPreProcessor.execute(fileSystem, hdfsStoreConfig.getOutputUri(), hdfsStoreConfig.getFileNameHeader() + hdfsStoreConfig.getFileNameBody(), hdfsStoreConfig.getTmpFileSuffix());
            }
            this.delegate = new HdfsOutputSwitcher();
            this.delegate.initialize(fileSystem, hdfsStoreConfig, System.currentTimeMillis());
        } catch (Exception e) {
            logger.warn("Failed to HDFS write initialize.", e);
            throw new InitFailException(e);
        }
    }

    @Override // acromusashi.stream.bolt.AmConfigurationBolt
    public void onMessage(StreamMessage streamMessage) {
        try {
            this.delegate.appendLine(streamMessage.toString(), System.currentTimeMillis());
        } catch (IOException e) {
            logger.warn(MessageFormat.format("Fail write to hdfs. Dispose received message. : Message={0}", streamMessage), e);
        }
        ack();
    }

    public void cleanup() {
        logger.info("HDFSSinkBolt Cleanup Start.");
        try {
            this.delegate.close();
        } catch (IOException e) {
            logger.warn("Failed to HDFS write close. Skip close.", e);
        }
        logger.info("HDFSSinkBolt Cleanup finished.");
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }
}
