package org.apache.hama.bsp;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;

/* loaded from: input_file:org/apache/hama/bsp/BSPMessageBundle.class */
public class BSPMessageBundle<M extends Writable> implements Writable, Iterable<M> {
    public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
    private int bundleSize;
    private int bundleLength;
    ByteArrayOutputStream byteBuffer;
    DataOutputStream bufferDos;
    private byte[] compressed;
    private byte[] serialized;
    private BSPMessageCompressor<M> compressor = null;
    private long threshold = 128;
    private String className = null;
    ByteArrayInputStream bis = null;
    DataInputStream dis = null;
    ByteArrayOutputStream mbos = null;
    DataOutputStream mdos = null;
    ByteArrayInputStream mbis = null;
    DataInputStream mdis = null;

    public BSPMessageBundle() {
        this.bundleSize = 0;
        this.bundleLength = 0;
        this.byteBuffer = null;
        this.bufferDos = null;
        this.byteBuffer = new ByteArrayOutputStream();
        this.bufferDos = new DataOutputStream(this.byteBuffer);
        this.bundleSize = 0;
        this.bundleLength = 0;
    }

    public byte[] serialize(M m) throws IOException {
        this.mbos = new ByteArrayOutputStream();
        this.mdos = new DataOutputStream(this.mbos);
        m.write(this.mdos);
        return this.mbos.toByteArray();
    }

    public void addMessage(M m) {
        try {
            this.serialized = serialize(m);
            if (this.compressor == null || this.serialized.length <= this.threshold) {
                this.bufferDos.writeBoolean(false);
                this.bufferDos.write(this.serialized);
                this.bundleLength += this.serialized.length;
            } else {
                this.bufferDos.writeBoolean(true);
                this.compressed = this.compressor.compress(this.serialized);
                this.bufferDos.writeInt(this.compressed.length);
                this.bufferDos.write(this.compressed);
                this.bundleLength += this.compressed.length;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (this.className == null) {
            this.className = m.getClass().getName();
        }
        this.bundleSize++;
    }

    @Override // java.lang.Iterable
    public Iterator<M> iterator() {
        this.bis = new ByteArrayInputStream(this.byteBuffer.toByteArray());
        this.dis = new DataInputStream(this.bis);
        return (Iterator<M>) new Iterator<M>() { // from class: org.apache.hama.bsp.BSPMessageBundle.1
            M msg;
            byte[] decompressed;

            @Override // java.util.Iterator
            public boolean hasNext() {
                try {
                    return BSPMessageBundle.this.dis.available() > 0;
                } catch (IOException e) {
                    return false;
                }
            }

            @Override // java.util.Iterator
            public M next() {
                boolean z = false;
                try {
                    z = BSPMessageBundle.this.dis.readBoolean();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                Class<?> cls = null;
                try {
                    cls = Class.forName(BSPMessageBundle.this.className);
                } catch (ClassNotFoundException e2) {
                    BSPMessageBundle.LOG.error("Class was not found.", e2);
                }
                this.msg = (M) ReflectionUtils.newInstance(cls, (Configuration) null);
                try {
                    if (z) {
                        BSPMessageBundle.this.compressed = new byte[BSPMessageBundle.this.dis.readInt()];
                        BSPMessageBundle.this.dis.readFully(BSPMessageBundle.this.compressed);
                        this.decompressed = BSPMessageBundle.this.compressor.decompress(BSPMessageBundle.this.compressed);
                        BSPMessageBundle.this.mbis = new ByteArrayInputStream(this.decompressed);
                        BSPMessageBundle.this.mdis = new DataInputStream(BSPMessageBundle.this.mbis);
                        this.msg.readFields(BSPMessageBundle.this.mdis);
                    } else {
                        this.msg.readFields(BSPMessageBundle.this.dis);
                    }
                } catch (IOException e3) {
                    e3.printStackTrace();
                }
                return this.msg;
            }

            @Override // java.util.Iterator
            public void remove() {
            }
        };
    }

    public int size() {
        return this.bundleSize;
    }

    public void setCompressor(BSPMessageCompressor<M> bSPMessageCompressor, long j) {
        this.compressor = bSPMessageCompressor;
        this.threshold = j;
    }

    public long getLength() throws IOException {
        return this.bundleLength;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.bundleSize);
        if (this.bundleSize > 0) {
            dataOutput.writeUTF(this.className);
            byte[] byteArray = this.byteBuffer.toByteArray();
            dataOutput.writeInt(byteArray.length);
            dataOutput.write(byteArray);
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        if (dataInput.readInt() > 0) {
            this.className = dataInput.readUTF();
            byte[] bArr = new byte[dataInput.readInt()];
            dataInput.readFully(bArr);
            this.bufferDos.write(bArr);
        }
    }
}
