package org.apache.hama.pipes.protocol;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;

/* loaded from: input_file:org/apache/hama/pipes/protocol/UplinkReader.class */
public class UplinkReader<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable> extends Thread {
    private static final Log LOG = LogFactory.getLog(UplinkReader.class);
    private BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binProtocol;
    private BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> peer;
    private Configuration conf;
    private FileSystem fs;
    protected DataInputStream inStream;
    protected DataOutputStream outStream;
    private Map<Integer, Map.Entry<SequenceFile.Reader, Map.Entry<Writable, Writable>>> sequenceFileReaders;
    private Map<Integer, Map.Entry<SequenceFile.Writer, Map.Entry<Writable, Writable>>> sequenceFileWriters;
    private Set<String> sequenceFileWriterPaths;

    public UplinkReader(BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binaryProtocol, Configuration configuration, InputStream inputStream) throws IOException {
        this.peer = null;
        this.binProtocol = binaryProtocol;
        this.conf = configuration;
        this.fs = FileSystem.get(configuration);
        this.inStream = new DataInputStream(new BufferedInputStream(inputStream, 131072));
        this.outStream = this.binProtocol.getOutputStream();
        this.sequenceFileReaders = new HashMap();
        this.sequenceFileWriters = new HashMap();
        this.sequenceFileWriterPaths = new HashSet();
    }

    public UplinkReader(BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binaryProtocol, BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bSPPeer, InputStream inputStream) throws IOException {
        this(binaryProtocol, bSPPeer.getConfiguration(), inputStream);
        this.peer = bSPPeer;
    }

    private boolean isPeerAvailable() {
        return this.peer != null;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                int readCommand = readCommand();
                LOG.debug("Handling uplink command: " + readCommand);
                if (readCommand == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) {
                    writeKeyValue();
                } else if (readCommand == MessageType.READ_KEYVALUE.code && isPeerAvailable()) {
                    readKeyValue();
                } else if (readCommand == MessageType.INCREMENT_COUNTER.code && isPeerAvailable()) {
                    incrementCounter();
                } else if (readCommand != MessageType.REGISTER_COUNTER.code || !isPeerAvailable()) {
                    if (readCommand == MessageType.TASK_DONE.code) {
                        synchronized (this.binProtocol.hasTaskLock) {
                            this.binProtocol.setHasTask(false);
                            LOG.debug("Got MessageType.TASK_DONE");
                            this.binProtocol.hasTaskLock.notify();
                        }
                    } else {
                        if (readCommand == MessageType.DONE.code) {
                            LOG.debug("Pipe child done");
                            return;
                        }
                        if (readCommand == MessageType.SEND_MSG.code && isPeerAvailable()) {
                            sendMessage();
                        } else if (readCommand == MessageType.GET_MSG_COUNT.code && isPeerAvailable()) {
                            getMessageCount();
                        } else if (readCommand == MessageType.GET_MSG.code && isPeerAvailable()) {
                            getMessage();
                        } else if (readCommand == MessageType.SYNC.code && isPeerAvailable()) {
                            sync();
                        } else if (readCommand == MessageType.GET_ALL_PEERNAME.code && isPeerAvailable()) {
                            getAllPeerNames();
                        } else if (readCommand == MessageType.GET_PEERNAME.code && isPeerAvailable()) {
                            getPeerName();
                        } else if (readCommand == MessageType.GET_PEER_INDEX.code && isPeerAvailable()) {
                            getPeerIndex();
                        } else if (readCommand == MessageType.GET_PEER_COUNT.code && isPeerAvailable()) {
                            getPeerCount();
                        } else if (readCommand == MessageType.GET_SUPERSTEP_COUNT.code && isPeerAvailable()) {
                            getSuperstepCount();
                        } else if (readCommand == MessageType.REOPEN_INPUT.code && isPeerAvailable()) {
                            reopenInput();
                        } else if (readCommand == MessageType.CLEAR.code && isPeerAvailable()) {
                            clear();
                        } else if (readCommand == MessageType.SEQFILE_OPEN.code) {
                            seqFileOpen();
                        } else if (readCommand == MessageType.SEQFILE_READNEXT.code) {
                            seqFileReadNext();
                        } else if (readCommand == MessageType.SEQFILE_APPEND.code) {
                            seqFileAppend();
                        } else if (readCommand == MessageType.SEQFILE_CLOSE.code) {
                            seqFileClose();
                        } else {
                            if (readCommand != MessageType.PARTITION_RESPONSE.code) {
                                throw new Exception("Bad command code: " + readCommand);
                            }
                            partitionResponse();
                        }
                    }
                }
            } catch (InterruptedException e) {
                onError(e);
                return;
            } catch (Throwable th) {
                onError(th);
                throw new RuntimeException(th);
            }
        }
        throw new InterruptedException();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onError(Throwable th) {
        LOG.error(StringUtils.stringifyException(th));
        synchronized (this.binProtocol.hasTaskLock) {
            this.binProtocol.setUplinkException(th);
            this.binProtocol.setHasTask(false);
            this.binProtocol.hasTaskLock.notify();
        }
    }

    protected int readCommand() throws IOException {
        return WritableUtils.readVInt(this.inStream);
    }

    public void closeConnection() throws IOException {
        this.inStream.close();
        Iterator<Integer> it = this.sequenceFileReaders.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            LOG.debug("close SequenceFileReader: " + intValue);
            this.sequenceFileReaders.get(Integer.valueOf(intValue)).getKey().close();
        }
        Iterator<Integer> it2 = this.sequenceFileWriters.keySet().iterator();
        while (it2.hasNext()) {
            int intValue2 = it2.next().intValue();
            LOG.debug("close SequenceFileWriter: " + intValue2);
            this.sequenceFileWriters.get(Integer.valueOf(intValue2)).getKey().close();
        }
    }

    public void reopenInput() throws IOException {
        LOG.debug("Got MessageType.REOPEN_INPUT");
        this.peer.reopenInput();
        WritableUtils.writeVInt(this.outStream, MessageType.REOPEN_INPUT.code);
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.REOPEN_INPUT");
    }

    public void clear() throws IOException {
        LOG.debug("Got MessageType.CLEAR");
        this.peer.clear();
        WritableUtils.writeVInt(this.outStream, MessageType.CLEAR.code);
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.CLEAR");
    }

    public void getSuperstepCount() throws IOException {
        WritableUtils.writeVInt(this.outStream, MessageType.GET_SUPERSTEP_COUNT.code);
        WritableUtils.writeVLong(this.outStream, this.peer.getSuperstepCount());
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.GET_SUPERSTEP_COUNT - SuperstepCount: " + this.peer.getSuperstepCount());
    }

    public void getPeerCount() throws IOException {
        WritableUtils.writeVInt(this.outStream, MessageType.GET_PEER_COUNT.code);
        WritableUtils.writeVInt(this.outStream, this.peer.getNumPeers());
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.GET_PEER_COUNT - NumPeers: " + this.peer.getNumPeers());
    }

    public void getPeerIndex() throws IOException {
        WritableUtils.writeVInt(this.outStream, MessageType.GET_PEER_INDEX.code);
        WritableUtils.writeVInt(this.outStream, this.peer.getPeerIndex());
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.GET_PEER_INDEX - PeerIndex: " + this.peer.getPeerIndex());
    }

    public void getPeerName() throws IOException {
        int readVInt = WritableUtils.readVInt(this.inStream);
        LOG.debug("Got MessageType.GET_PEERNAME id: " + readVInt);
        WritableUtils.writeVInt(this.outStream, MessageType.GET_PEERNAME.code);
        if (readVInt == -1) {
            Text.writeString(this.outStream, this.peer.getPeerName());
            LOG.debug("Responded MessageType.GET_PEERNAME - Get Own PeerName: " + this.peer.getPeerName());
        } else if (readVInt < -1 || readVInt >= this.peer.getNumPeers()) {
            Text.writeString(this.outStream, "");
            LOG.debug("Responded MessageType.GET_PEERNAME - Empty PeerName!");
        } else {
            Text.writeString(this.outStream, this.peer.getPeerName(readVInt));
            LOG.debug("Responded MessageType.GET_PEERNAME - PeerName: " + this.peer.getPeerName(readVInt));
        }
        this.binProtocol.flush();
    }

    public void getAllPeerNames() throws IOException {
        LOG.debug("Got MessageType.GET_ALL_PEERNAME");
        String[] allPeerNames = this.peer.getAllPeerNames();
        WritableUtils.writeVInt(this.outStream, MessageType.GET_ALL_PEERNAME.code);
        WritableUtils.writeVInt(this.outStream, allPeerNames.length);
        for (String str : allPeerNames) {
            Text.writeString(this.outStream, str);
        }
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: " + allPeerNames.length);
    }

    public void sync() throws IOException, SyncException, InterruptedException {
        LOG.debug("Got MessageType.SYNC");
        this.peer.sync();
        WritableUtils.writeVInt(this.outStream, MessageType.SYNC.code);
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.SYNC");
    }

    public void getMessage() throws IOException {
        LOG.debug("Got MessageType.GET_MSG");
        M currentMessage = this.peer.getCurrentMessage();
        if (currentMessage != null) {
            WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG.code);
            this.binProtocol.writeObject(currentMessage);
            LOG.debug("Responded MessageType.GET_MSG - Message: " + (currentMessage.toString().length() < 10 ? currentMessage.toString() : currentMessage.toString().substring(0, 9) + "..."));
        } else {
            WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
            LOG.debug("Responded MessageType.END_OF_DATA");
        }
        this.binProtocol.flush();
    }

    public void getMessageCount() throws IOException {
        WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG_COUNT.code);
        WritableUtils.writeVInt(this.outStream, this.peer.getNumCurrentMessages());
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.GET_MSG_COUNT - Count: " + this.peer.getNumCurrentMessages());
    }

    public void incrementCounter() throws IOException {
        LOG.debug("Got MessageType.INCREMENT_COUNTER");
        String readString = Text.readString(this.inStream);
        String readString2 = Text.readString(this.inStream);
        long readVLong = WritableUtils.readVLong(this.inStream);
        LOG.debug("Got MessageType.INCREMENT_COUNTER group: " + readString + " name: " + readString2 + " amount: " + readVLong);
        this.peer.incrementCounter(readString, readString2, readVLong);
        WritableUtils.writeVInt(this.outStream, MessageType.INCREMENT_COUNTER.code);
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.INCREMENT_COUNTER");
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void sendMessage() throws IOException, InstantiationException, IllegalAccessException {
        String readString = Text.readString(this.inStream);
        Writable writable = (Writable) ReflectionUtils.newInstance(this.conf.getClass(Constants.MESSAGE_CLASS, BytesWritable.class), this.conf);
        LOG.debug("Got MessageType.SEND_MSG peerName: " + readString + " messageClass: " + writable.getClass().getName());
        readObject(writable);
        this.peer.send(readString, writable);
        WritableUtils.writeVInt(this.outStream, MessageType.SEND_MSG.code);
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.SEND_MSG");
        LOG.debug("Sent message to peerName: " + readString + " messageClass: " + writable.getClass().getName() + " Message: " + (writable.toString().length() < 10 ? writable.toString() : writable.toString().substring(0, 9) + "..."));
    }

    public void readKeyValue() throws IOException {
        if (this.peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS) == null || this.peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS).equals("org.apache.hama.bsp.NullInputFormat")) {
            WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
            this.binProtocol.flush();
            LOG.debug("Responded MessageType.READ_KEYVALUE - END_OF_DATA");
            return;
        }
        KeyValuePair<KEYIN, VALUEIN> readNext = this.peer.readNext();
        if (readNext != null) {
            WritableUtils.writeVInt(this.outStream, MessageType.READ_KEYVALUE.code);
            this.binProtocol.writeObject((Writable) readNext.getKey());
            this.binProtocol.writeObject((Writable) readNext.getValue());
            LOG.debug("Responded MessageType.READ_KEYVALUE - Key: " + (readNext.getKey().toString().length() < 10 ? readNext.getKey().toString() : readNext.getKey().toString().substring(0, 9) + "...") + " Value: " + (readNext.getValue().toString().length() < 10 ? readNext.getValue().toString() : readNext.getValue().toString().substring(0, 9) + "..."));
        } else {
            WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
            LOG.debug("Responded MessageType.READ_KEYVALUE - END_OF_DATA");
        }
        this.binProtocol.flush();
    }

    public void writeKeyValue() throws IOException {
        Object newInstance = ReflectionUtils.newInstance(this.conf.getClass("bsp.output.key.class", Object.class), this.conf);
        Object newInstance2 = ReflectionUtils.newInstance(this.conf.getClass("bsp.output.value.class", Object.class), this.conf);
        LOG.debug("Got MessageType.WRITE_KEYVALUE keyOutClass: " + newInstance.getClass().getName() + " valueOutClass: " + newInstance2.getClass().getName());
        readObject((Writable) newInstance);
        readObject((Writable) newInstance2);
        this.peer.write(newInstance, newInstance2);
        WritableUtils.writeVInt(this.outStream, MessageType.WRITE_KEYVALUE.code);
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.WRITE_KEYVALUE");
        LOG.debug("Done MessageType.WRITE_KEYVALUE - Key: " + (newInstance.toString().length() < 10 ? newInstance.toString() : newInstance.toString().substring(0, 9) + "...") + " Value: " + (newInstance2.toString().length() < 10 ? newInstance2.toString() : newInstance2.toString().substring(0, 9) + "..."));
    }

    public void seqFileOpen() throws IOException {
        String readString = Text.readString(this.inStream);
        String readString2 = Text.readString(this.inStream);
        String readString3 = Text.readString(this.inStream);
        String readString4 = Text.readString(this.inStream);
        LOG.debug("GOT MessageType.SEQFILE_OPEN - Path: " + readString);
        LOG.debug("GOT MessageType.SEQFILE_OPEN - Option: " + readString2);
        LOG.debug("GOT MessageType.SEQFILE_OPEN - KeyClass: " + readString3);
        LOG.debug("GOT MessageType.SEQFILE_OPEN - ValueClass: " + readString4);
        int i = -1;
        if (readString2.equals("r")) {
            try {
                SequenceFile.Reader reader = new SequenceFile.Reader(this.fs, new Path(readString), this.conf);
                if (reader.getKeyClassName().equals(readString3) && reader.getValueClassName().equals(readString4)) {
                    Class<?> loadClass = this.conf.getClassLoader().loadClass(readString3);
                    Class<?> loadClass2 = this.conf.getClassLoader().loadClass(readString4);
                    Writable writable = (Writable) ReflectionUtils.newInstance(loadClass, this.conf);
                    Writable writable2 = (Writable) ReflectionUtils.newInstance(loadClass2, this.conf);
                    i = reader.hashCode();
                    this.sequenceFileReaders.put(Integer.valueOf(i), new AbstractMap.SimpleEntry(reader, new AbstractMap.SimpleEntry(writable, writable2)));
                } else {
                    i = -1;
                    if (reader.getKeyClassName().equals(readString3)) {
                        LOG.error("SEQFILE_OPEN - Wrong ValueClass: " + readString4 + " File ValueClass: " + reader.getValueClassName());
                    } else {
                        LOG.error("SEQFILE_OPEN - Wrong KeyClass: " + readString3 + " File KeyClass: " + reader.getKeyClassName());
                    }
                }
            } catch (IOException e) {
                LOG.error("SEQFILE_OPEN - " + e.getMessage());
                i = -1;
            } catch (ClassNotFoundException e2) {
                LOG.error("SEQFILE_OPEN - " + e2.getMessage());
                i = -1;
            }
        } else if (readString2.equals("w")) {
            try {
                if (this.sequenceFileWriterPaths.contains(readString)) {
                    i = -1;
                    LOG.error("SEQFILE_OPEN - Path: " + readString + " is already used by another Writer!");
                } else {
                    Class<?> loadClass3 = this.conf.getClassLoader().loadClass(readString3);
                    Class<?> loadClass4 = this.conf.getClassLoader().loadClass(readString4);
                    Writable writable3 = (Writable) ReflectionUtils.newInstance(loadClass3, this.conf);
                    Writable writable4 = (Writable) ReflectionUtils.newInstance(loadClass4, this.conf);
                    SequenceFile.Writer writer = new SequenceFile.Writer(this.fs, this.conf, new Path(readString), loadClass3, loadClass4);
                    i = writer.hashCode();
                    this.sequenceFileWriters.put(Integer.valueOf(i), new AbstractMap.SimpleEntry(writer, new AbstractMap.SimpleEntry(writable3, writable4)));
                    this.sequenceFileWriterPaths.add(readString);
                }
            } catch (IOException e3) {
                LOG.error("SEQFILE_OPEN - " + e3.getMessage());
                i = -1;
            } catch (ClassNotFoundException e4) {
                LOG.error("SEQFILE_OPEN - " + e4.getMessage());
                i = -1;
            }
        } else {
            LOG.error("SEQFILE_OPEN - Wrong option: '" + readString2 + "'");
        }
        WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_OPEN.code);
        WritableUtils.writeVInt(this.outStream, i);
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.SEQFILE_OPEN - FileID: " + i);
    }

    public void seqFileReadNext() throws IOException {
        int readVInt = WritableUtils.readVInt(this.inStream);
        LOG.debug("GOT MessageType.SEQFILE_READNEXT - FileID: " + readVInt);
        if (!this.sequenceFileReaders.containsKey(Integer.valueOf(readVInt))) {
            LOG.error("MessageType.SEQFILE_READNEXT: FileID " + readVInt + " not found!");
            WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
            LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA");
            this.binProtocol.flush();
            return;
        }
        Writable key = this.sequenceFileReaders.get(Integer.valueOf(readVInt)).getValue().getKey();
        Writable value = this.sequenceFileReaders.get(Integer.valueOf(readVInt)).getValue().getValue();
        if (this.sequenceFileReaders.get(Integer.valueOf(readVInt)).getKey().next(key, value)) {
            WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_READNEXT.code);
            this.binProtocol.writeObject(key);
            this.binProtocol.writeObject(value);
            LOG.debug("Responded MessageType.SEQFILE_READNEXT - Key: " + (key.toString().length() < 10 ? key.toString() : key.toString().substring(0, 9) + "...") + " Value: " + (value.toString().length() < 10 ? value.toString() : value.toString().substring(0, 9) + "..."));
        } else {
            WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
            LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA");
        }
        this.binProtocol.flush();
    }

    public void seqFileAppend() throws IOException {
        int readVInt = WritableUtils.readVInt(this.inStream);
        LOG.debug("GOT MessageType.SEQFILE_APPEND - FileID: " + readVInt);
        boolean z = false;
        if (this.sequenceFileWriters.containsKey(Integer.valueOf(readVInt))) {
            Writable key = this.sequenceFileWriters.get(Integer.valueOf(readVInt)).getValue().getKey();
            Writable value = this.sequenceFileWriters.get(Integer.valueOf(readVInt)).getValue().getValue();
            readObject(key);
            readObject(value);
            if (key != null && value != null) {
                this.sequenceFileWriters.get(Integer.valueOf(readVInt)).getKey().append(key, value);
                LOG.debug("Stored data: Key: " + (key.toString().length() < 10 ? key.toString() : key.toString().substring(0, 9) + "...") + " Value: " + (value.toString().length() < 10 ? value.toString() : value.toString().substring(0, 9) + "..."));
                z = true;
            }
        } else {
            int available = this.inStream.available();
            this.inStream.skip(available);
            LOG.debug("MessageType.SEQFILE_APPEND: skip " + available + " bytes");
            LOG.error("MessageType.SEQFILE_APPEND: FileID " + readVInt + " not found!");
        }
        WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_APPEND.code);
        WritableUtils.writeVInt(this.outStream, z ? 1 : 0);
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.SEQFILE_APPEND - Result: " + z);
    }

    public void seqFileClose() throws IOException {
        int readVInt = WritableUtils.readVInt(this.inStream);
        LOG.debug("GOT MessageType.SEQFILE_CLOSE - FileID: " + readVInt);
        boolean z = false;
        if (this.sequenceFileReaders.containsKey(Integer.valueOf(readVInt))) {
            this.sequenceFileReaders.get(Integer.valueOf(readVInt)).getKey().close();
            this.sequenceFileReaders.remove(Integer.valueOf(readVInt));
            z = true;
        } else if (this.sequenceFileWriters.containsKey(Integer.valueOf(readVInt))) {
            this.sequenceFileWriters.get(Integer.valueOf(readVInt)).getKey().close();
            this.sequenceFileWriters.remove(Integer.valueOf(readVInt));
            z = true;
        } else {
            LOG.error("MessageType.SEQFILE_CLOSE: FileID " + readVInt + " not found!");
        }
        WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_CLOSE.code);
        WritableUtils.writeVInt(this.outStream, z ? 1 : 0);
        this.binProtocol.flush();
        LOG.debug("Responded MessageType.SEQFILE_CLOSE - Result: " + z);
    }

    public void partitionResponse() throws IOException {
        int readVInt = WritableUtils.readVInt(this.inStream);
        synchronized (this.binProtocol.resultLock) {
            this.binProtocol.setResult(readVInt);
            LOG.debug("Received MessageType.PARTITION_RESPONSE - Result: " + readVInt);
            this.binProtocol.resultLock.notify();
        }
    }

    protected void readObject(Writable writable) throws IOException {
        if (writable instanceof Text) {
            byte[] bArr = new byte[WritableUtils.readVInt(this.inStream)];
            this.inStream.readFully(bArr);
            ((Text) writable).set(bArr);
            return;
        }
        if (writable instanceof BytesWritable) {
            int readVInt = WritableUtils.readVInt(this.inStream);
            byte[] bArr2 = new byte[readVInt];
            this.inStream.readFully(bArr2);
            ((BytesWritable) writable).set(bArr2, 0, readVInt);
            return;
        }
        if (writable instanceof IntWritable) {
            ((IntWritable) writable).set(WritableUtils.readVInt(this.inStream));
            return;
        }
        if (writable instanceof LongWritable) {
            ((LongWritable) writable).set(WritableUtils.readVLong(this.inStream));
            return;
        }
        try {
            LOG.debug("reading type: " + writable.getClass().getName());
            writable.readFields(this.inStream);
        } catch (IOException e) {
            throw new IOException("Hama Pipes is not able to read " + writable.getClass().getName(), e);
        }
    }
}
