package org.apache.hama.bsp.message.io;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.util.ReflectionUtils;

/* loaded from: input_file:org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.class */
public class CombineSpilledDataProcessor<M extends Writable> extends WriteSpilledDataProcessor {
    public static Log LOG = LogFactory.getLog(CombineSpilledDataProcessor.class);
    Combiner<M> combiner;
    M writableObject;
    ReusableByteBuffer<M> iterator;
    DirectByteBufferOutputStream combineOutputBuffer;
    ByteBuffer combineBuffer;

    public CombineSpilledDataProcessor(String str) throws FileNotFoundException {
        super(str);
    }

    @Override // org.apache.hama.bsp.message.io.WriteSpilledDataProcessor, org.apache.hama.bsp.message.io.SpilledDataProcessor
    public boolean init(Configuration configuration) {
        if (!super.init(configuration)) {
            return false;
        }
        String str = configuration.get(Constants.COMBINER_CLASS);
        if (str == null) {
            return true;
        }
        try {
            this.combiner = (Combiner) ReflectionUtils.newInstance(str);
            String str2 = configuration.get(Constants.MESSAGE_CLASS);
            if (str2 != null) {
                try {
                    this.writableObject = (M) ReflectionUtils.newInstance(str2);
                    this.iterator = new ReusableByteBuffer<>(this.writableObject);
                } catch (ClassNotFoundException e) {
                    LOG.error("Error combining the records.", e);
                    return false;
                }
            }
            this.combineOutputBuffer = new DirectByteBufferOutputStream();
            this.combineBuffer = ByteBuffer.allocateDirect(Constants.BUFFER_DEFAULT_SIZE);
            this.combineOutputBuffer.setBuffer(this.combineBuffer);
            return true;
        } catch (ClassNotFoundException e2) {
            throw new RuntimeException(e2);
        }
    }

    @Override // org.apache.hama.bsp.message.io.WriteSpilledDataProcessor, org.apache.hama.bsp.message.io.SpilledDataProcessor
    public boolean handleSpilledBuffer(SpilledByteBuffer spilledByteBuffer) {
        if (this.combiner == null || this.writableObject == null) {
            return super.handleSpilledBuffer(spilledByteBuffer);
        }
        try {
            this.iterator.set(spilledByteBuffer);
            M combine = this.combiner.combine(this.iterator);
            try {
                this.iterator.prepareForNext();
                try {
                    combine.write(this.combineOutputBuffer);
                    try {
                        this.combineOutputBuffer.flush();
                        this.combineOutputBuffer.getBuffer().flip();
                        try {
                            boolean handleSpilledBuffer = super.handleSpilledBuffer(new SpilledByteBuffer(this.combineOutputBuffer.getBuffer(), this.combineOutputBuffer.getBuffer().remaining()));
                            this.combineOutputBuffer.clear();
                            return handleSpilledBuffer;
                        } catch (Throwable th) {
                            this.combineOutputBuffer.clear();
                            throw th;
                        }
                    } catch (IOException e) {
                        LOG.error("Error flushing the combiner output.", e);
                        return false;
                    }
                } catch (IOException e2) {
                    LOG.error("Error writing the combiner output.", e2);
                    return false;
                }
            } catch (IOException e3) {
                LOG.error("Error preparing for next buffer.", e3);
                return false;
            }
        } catch (IOException e4) {
            LOG.error("Error setting buffer for combining data", e4);
            return false;
        }
    }

    @Override // org.apache.hama.bsp.message.io.WriteSpilledDataProcessor, org.apache.hama.bsp.message.io.SpilledDataProcessor
    public boolean close() {
        return true;
    }
}
