package org.elasticsearch.hadoop.cascading;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.elasticsearch.hadoop.cfg.HadoopSettingsManager;
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.util.FieldAlias;
import org.elasticsearch.hadoop.util.StringUtils;

/* loaded from: input_file:org/elasticsearch/hadoop/cascading/EsHadoopScheme.class */
class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
    private static final long serialVersionUID = 4304172465362298925L;
    private static final int SRC_CTX_SIZE = 4;
    private static final int SRC_CTX_KEY = 0;
    private static final int SRC_CTX_VALUE = 1;
    private static final int SRC_CTX_ALIASES = 2;
    private static final int SRC_CTX_OUTPUT_JSON = 3;
    private final String index;
    private final String query;
    private final String nodes;
    private final int port;
    private final Properties props;
    private static Log log = LogFactory.getLog(EsHadoopScheme.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public EsHadoopScheme(String str, int i, String str2, String str3, Fields fields, Properties properties) {
        this.index = str2;
        this.query = str3;
        this.nodes = str;
        this.port = i;
        if (fields != null) {
            setSinkFields(fields);
            setSourceFields(fields);
        }
        this.props = properties;
    }

    public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        super.sourcePrepare(flowProcess, sourceCall);
        Settings loadSettings = loadSettings(flowProcess.getConfigCopy(), true);
        sourceCall.setContext(new Object[]{((RecordReader) sourceCall.getInput()).createKey(), ((RecordReader) sourceCall.getInput()).createValue(), CascadingUtils.alias(loadSettings), Boolean.valueOf(loadSettings.getOutputAsJson())});
    }

    public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        super.sourceCleanup(flowProcess, sourceCall);
        sourceCall.setContext((Object) null);
    }

    public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        super.sinkPrepare(flowProcess, sinkCall);
        sinkCall.setContext(new Object[]{CascadingUtils.fieldToAlias(loadSettings(flowProcess.getConfigCopy(), false), getSinkFields())});
    }

    public void sinkCleanup(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        super.sinkCleanup(flowProcess, sinkCall);
        sinkCall.setContext((Object) null);
    }

    public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
        jobConf.setInputFormat(EsInputFormat.class);
        jobConf.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(CascadingUtils.fieldToAlias(loadSettings(jobConf, true), getSourceFields())));
        if (log.isTraceEnabled()) {
            log.trace("Initialized (source) configuration " + HadoopCfgUtils.asProperties(jobConf));
        }
    }

    public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
        jobConf.setOutputFormat(EsOutputFormat.class);
        Settings loadSettings = loadSettings(jobConf, false);
        Log log2 = LogFactory.getLog(EsTap.class);
        InitializationUtils.setValueWriterIfNotSet(loadSettings, CascadingValueWriter.class, log2);
        InitializationUtils.setValueReaderIfNotSet(loadSettings, JdkValueReader.class, log2);
        InitializationUtils.setBytesConverterIfNeeded(loadSettings, CascadingLocalBytesConverter.class, log2);
        InitializationUtils.setFieldExtractorIfNotSet(loadSettings, CascadingFieldExtractor.class, log2);
        HadoopCfgUtils.setFileOutputFormatDir(jobConf, loadSettings.getResourceWrite());
        HadoopCfgUtils.setOutputCommitterClass(jobConf, EsOutputFormat.EsOldAPIOutputCommitter.class.getName());
        if (log2.isTraceEnabled()) {
            log2.trace("Initialized (sink) configuration " + HadoopCfgUtils.asProperties(jobConf));
        }
    }

    private Settings loadSettings(Object obj, boolean z) {
        return CascadingUtils.init(HadoopSettingsManager.loadFrom(obj).merge(this.props), this.nodes, this.port, this.index, this.query, z);
    }

    public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        Map map;
        Object[] objArr = (Object[]) sourceCall.getContext();
        if (!((RecordReader) sourceCall.getInput()).next(objArr[SRC_CTX_KEY], objArr[SRC_CTX_VALUE])) {
            return false;
        }
        boolean booleanValue = ((Boolean) objArr[SRC_CTX_OUTPUT_JSON]).booleanValue();
        TupleEntry incomingEntry = sourceCall.getIncomingEntry();
        if (booleanValue) {
            map = new HashMap(SRC_CTX_VALUE);
            map.put(new Text("data"), objArr[SRC_CTX_VALUE]);
        } else {
            map = (Map) objArr[SRC_CTX_VALUE];
        }
        FieldAlias fieldAlias = (FieldAlias) objArr[SRC_CTX_ALIASES];
        if (!incomingEntry.getFields().isDefined()) {
            List elements = Tuple.elements(incomingEntry.getTuple());
            elements.clear();
            elements.addAll(map.values());
            return true;
        }
        Text text = new Text();
        Iterator it = incomingEntry.getFields().iterator();
        while (it.hasNext()) {
            Comparable comparable = (Comparable) it.next();
            Object obj = map;
            Iterator<String> it2 = StringUtils.tokenize(fieldAlias.toES(comparable.toString()), StringUtils.PATH_CURRENT).iterator();
            while (it2.hasNext()) {
                text.set(it2.next());
                obj = ((Map) obj).get(text);
                if (obj == null) {
                    break;
                }
            }
            CascadingUtils.setObject(incomingEntry, comparable, obj);
        }
        return true;
    }

    public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        ((OutputCollector) sinkCall.getOutput()).collect((Object) null, sinkCall);
    }

    public /* bridge */ /* synthetic */ void sinkConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
        sinkConfInit((FlowProcess<JobConf>) flowProcess, (Tap<JobConf, RecordReader, OutputCollector>) tap, (JobConf) obj);
    }

    public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
        sourceConfInit((FlowProcess<JobConf>) flowProcess, (Tap<JobConf, RecordReader, OutputCollector>) tap, (JobConf) obj);
    }
}
