package org.apache.hama.pipes;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;

/* loaded from: input_file:org/apache/hama/pipes/BinaryProtocol.class */
public class BinaryProtocol<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable> implements DownwardProtocol<K1, V1> {
    protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class.getName());
    public static final int CURRENT_PROTOCOL_VERSION = 0;
    private static final int BUFFER_SIZE = 131072;
    protected final DataOutputStream stream;
    private BinaryProtocol<K1, V1, K2, V2>.UplinkReaderThread uplink;
    protected final BSPPeer<K1, V1, K2, V2, BytesWritable> peer;
    protected final DataOutputBuffer buffer = new DataOutputBuffer();
    private boolean hasTask = false;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hama/pipes/BinaryProtocol$MessageType.class */
    public enum MessageType {
        START(0),
        SET_BSPJOB_CONF(1),
        SET_INPUT_TYPES(2),
        RUN_SETUP(3),
        RUN_BSP(4),
        RUN_CLEANUP(5),
        READ_KEYVALUE(6),
        WRITE_KEYVALUE(7),
        GET_MSG(8),
        GET_MSG_COUNT(9),
        SEND_MSG(10),
        SYNC(11),
        GET_ALL_PEERNAME(12),
        GET_PEERNAME(13),
        GET_PEER_INDEX(14),
        GET_PEER_COUNT(15),
        GET_SUPERSTEP_COUNT(16),
        REOPEN_INPUT(17),
        CLEAR(18),
        CLOSE(19),
        ABORT(20),
        DONE(21),
        TASK_DONE(22),
        REGISTER_COUNTER(23),
        INCREMENT_COUNTER(24),
        LOG(25);

        final int code;

        MessageType(int i) {
            this.code = i;
        }
    }

    /* loaded from: input_file:org/apache/hama/pipes/BinaryProtocol$TeeOutputStream.class */
    private static class TeeOutputStream extends FilterOutputStream {
        private OutputStream file;

        TeeOutputStream(String str, OutputStream outputStream) throws IOException {
            super(outputStream);
            this.file = new FileOutputStream(str);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            this.file.write(bArr, i, i2);
            this.out.write(bArr, i, i2);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream
        public void write(int i) throws IOException {
            this.file.write(i);
            this.out.write(i);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            this.file.flush();
            this.out.flush();
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            flush();
            this.file.close();
            this.out.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hama/pipes/BinaryProtocol$UplinkReaderThread.class */
    public class UplinkReaderThread extends Thread {
        protected DataInputStream inStream;
        protected K2 key;
        protected V2 value;
        protected BSPPeer<K1, V1, K2, V2, BytesWritable> peer;

        public UplinkReaderThread(BSPPeer<K1, V1, K2, V2, BytesWritable> bSPPeer, InputStream inputStream) throws IOException {
            this.inStream = new DataInputStream(new BufferedInputStream(inputStream, BinaryProtocol.BUFFER_SIZE));
            this.peer = bSPPeer;
            this.key = (K2) ReflectionUtils.newInstance(bSPPeer.getConfiguration().getClass("bsp.output.key.class", Object.class), bSPPeer.getConfiguration());
            this.value = (V2) ReflectionUtils.newInstance(bSPPeer.getConfiguration().getClass("bsp.output.value.class", Object.class), bSPPeer.getConfiguration());
        }

        public void closeConnection() throws IOException {
            this.inStream.close();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    int readCommand = readCommand();
                    if (readCommand != -1) {
                        BinaryProtocol.LOG.debug("Handling uplink command " + readCommand);
                        if (readCommand == MessageType.WRITE_KEYVALUE.code) {
                            writeKeyValue();
                        } else if (readCommand == MessageType.READ_KEYVALUE.code) {
                            readKeyValue();
                        } else if (readCommand == MessageType.INCREMENT_COUNTER.code) {
                            incrementCounter();
                        } else if (readCommand != MessageType.REGISTER_COUNTER.code) {
                            if (readCommand == MessageType.TASK_DONE.code) {
                                BinaryProtocol.LOG.debug("Got MessageType.TASK_DONE");
                                BinaryProtocol.this.hasTask = false;
                            } else {
                                if (readCommand == MessageType.DONE.code) {
                                    BinaryProtocol.LOG.debug("Pipe child done");
                                    return;
                                }
                                if (readCommand == MessageType.SEND_MSG.code) {
                                    sendMessage();
                                } else if (readCommand == MessageType.GET_MSG_COUNT.code) {
                                    getMessageCount();
                                } else if (readCommand == MessageType.GET_MSG.code) {
                                    getMessage();
                                } else if (readCommand == MessageType.SYNC.code) {
                                    sync();
                                } else if (readCommand == MessageType.GET_ALL_PEERNAME.code) {
                                    getAllPeerNames();
                                } else if (readCommand == MessageType.GET_PEERNAME.code) {
                                    getPeerName();
                                } else if (readCommand == MessageType.GET_PEER_INDEX.code) {
                                    getPeerIndex();
                                } else if (readCommand == MessageType.GET_PEER_COUNT.code) {
                                    getPeerCount();
                                } else if (readCommand == MessageType.GET_SUPERSTEP_COUNT.code) {
                                    getSuperstepCount();
                                } else if (readCommand == MessageType.REOPEN_INPUT.code) {
                                    reopenInput();
                                } else {
                                    if (readCommand != MessageType.CLEAR.code) {
                                        throw new IOException("Bad command code: " + readCommand);
                                    }
                                    BinaryProtocol.LOG.debug("Got MessageType.CLEAR");
                                    this.peer.clear();
                                }
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Throwable th) {
                    onError(th);
                    throw new RuntimeException(th);
                }
            }
            throw new InterruptedException();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void onError(Throwable th) {
            BinaryProtocol.LOG.error(StringUtils.stringifyException(th));
        }

        public int readCommand() throws IOException {
            return WritableUtils.readVInt(this.inStream);
        }

        public void reopenInput() throws IOException {
            BinaryProtocol.LOG.debug("Got MessageType.REOPEN_INPUT");
            this.peer.reopenInput();
        }

        public void getSuperstepCount() throws IOException {
            WritableUtils.writeVInt(BinaryProtocol.this.stream, MessageType.GET_SUPERSTEP_COUNT.code);
            WritableUtils.writeVLong(BinaryProtocol.this.stream, this.peer.getSuperstepCount());
            BinaryProtocol.this.flush();
            BinaryProtocol.LOG.debug("Responded MessageType.GET_SUPERSTEP_COUNT - SuperstepCount: " + this.peer.getSuperstepCount());
        }

        public void getPeerCount() throws IOException {
            WritableUtils.writeVInt(BinaryProtocol.this.stream, MessageType.GET_PEER_COUNT.code);
            WritableUtils.writeVInt(BinaryProtocol.this.stream, this.peer.getNumPeers());
            BinaryProtocol.this.flush();
            BinaryProtocol.LOG.debug("Responded MessageType.GET_PEER_COUNT - NumPeers: " + this.peer.getNumPeers());
        }

        public void getPeerIndex() throws IOException {
            WritableUtils.writeVInt(BinaryProtocol.this.stream, MessageType.GET_PEER_INDEX.code);
            WritableUtils.writeVInt(BinaryProtocol.this.stream, this.peer.getPeerIndex());
            BinaryProtocol.this.flush();
            BinaryProtocol.LOG.debug("Responded MessageType.GET_PEER_INDEX - PeerIndex: " + this.peer.getPeerIndex());
        }

        public void getPeerName() throws IOException {
            int readCommand = readCommand();
            BinaryProtocol.LOG.debug("Got MessageType.GET_PEERNAME id: " + readCommand);
            WritableUtils.writeVInt(BinaryProtocol.this.stream, MessageType.GET_PEERNAME.code);
            if (readCommand == -1) {
                Text.writeString(BinaryProtocol.this.stream, this.peer.getPeerName());
                BinaryProtocol.LOG.debug("Responded MessageType.GET_PEERNAME - Get Own PeerName: " + this.peer.getPeerName());
            } else if (readCommand < -1 || readCommand >= this.peer.getNumPeers()) {
                Text.writeString(BinaryProtocol.this.stream, "");
                BinaryProtocol.LOG.debug("Responded MessageType.GET_PEERNAME - Empty PeerName!");
            } else {
                Text.writeString(BinaryProtocol.this.stream, this.peer.getPeerName(readCommand));
                BinaryProtocol.LOG.debug("Responded MessageType.GET_PEERNAME - PeerName: " + this.peer.getPeerName(readCommand));
            }
            BinaryProtocol.this.flush();
        }

        public void getAllPeerNames() throws IOException {
            BinaryProtocol.LOG.debug("Got MessageType.GET_ALL_PEERNAME");
            WritableUtils.writeVInt(BinaryProtocol.this.stream, MessageType.GET_ALL_PEERNAME.code);
            WritableUtils.writeVInt(BinaryProtocol.this.stream, this.peer.getAllPeerNames().length);
            for (String str : this.peer.getAllPeerNames()) {
                Text.writeString(BinaryProtocol.this.stream, str);
            }
            BinaryProtocol.this.flush();
            BinaryProtocol.LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: " + this.peer.getAllPeerNames().length);
        }

        public void sync() throws IOException, SyncException, InterruptedException {
            BinaryProtocol.LOG.debug("Got MessageType.SYNC");
            this.peer.sync();
        }

        public void getMessage() throws IOException {
            BinaryProtocol.LOG.debug("Got MessageType.GET_MSG");
            WritableUtils.writeVInt(BinaryProtocol.this.stream, MessageType.GET_MSG.code);
            Writable writable = (BytesWritable) this.peer.getCurrentMessage();
            if (writable != null) {
                BinaryProtocol.this.writeObject(writable);
            }
            BinaryProtocol.this.flush();
            BinaryProtocol.LOG.debug("Responded MessageType.GET_MSG - Message(BytesWritable) ");
        }

        public void getMessageCount() throws IOException {
            WritableUtils.writeVInt(BinaryProtocol.this.stream, MessageType.GET_MSG_COUNT.code);
            WritableUtils.writeVInt(BinaryProtocol.this.stream, this.peer.getNumCurrentMessages());
            BinaryProtocol.this.flush();
            BinaryProtocol.LOG.debug("Responded MessageType.GET_MSG_COUNT - Count: " + this.peer.getNumCurrentMessages());
        }

        public void sendMessage() throws IOException {
            String readString = Text.readString(this.inStream);
            BytesWritable bytesWritable = new BytesWritable();
            readObject(bytesWritable);
            BinaryProtocol.LOG.debug("Got MessageType.SEND_MSG to peerName: " + readString);
            this.peer.send(readString, bytesWritable);
        }

        public void incrementCounter() throws IOException {
            String readString = Text.readString(this.inStream);
            this.peer.incrementCounter(Text.readString(this.inStream), readString, WritableUtils.readVLong(this.inStream));
        }

        public void readKeyValue() throws IOException {
            if (this.peer.getConfiguration().get("bsp.input.format.class") == null || this.peer.getConfiguration().get("bsp.input.format.class").equals("org.apache.hama.bsp.NullInputFormat")) {
                WritableUtils.writeVInt(BinaryProtocol.this.stream, MessageType.READ_KEYVALUE.code);
                Text.writeString(BinaryProtocol.this.stream, "");
                Text.writeString(BinaryProtocol.this.stream, "");
                BinaryProtocol.this.flush();
                BinaryProtocol.LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
                return;
            }
            KeyValuePair<K1, V1> readNext = this.peer.readNext();
            WritableUtils.writeVInt(BinaryProtocol.this.stream, MessageType.READ_KEYVALUE.code);
            if (readNext != null) {
                BinaryProtocol.this.writeObject(readNext.getKey());
                BinaryProtocol.this.writeObject(readNext.getValue());
                BinaryProtocol.LOG.debug("Responded MessageType.READ_KEYVALUE - Key: " + readNext.getKey() + " Value: " + readNext.getValue());
            } else {
                Text.writeString(BinaryProtocol.this.stream, "");
                Text.writeString(BinaryProtocol.this.stream, "");
                BinaryProtocol.LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
            }
            BinaryProtocol.this.flush();
        }

        public void writeKeyValue() throws IOException {
            readObject(this.key);
            readObject(this.value);
            if (BinaryProtocol.LOG.isDebugEnabled()) {
                BinaryProtocol.LOG.debug("Got MessageType.WRITE_KEYVALUE - Key: " + this.key + " Value: " + this.value);
            }
            this.peer.write(this.key, this.value);
        }

        protected void readObject(Writable writable) throws IOException {
            int readCommand = readCommand();
            if (writable instanceof BytesWritable) {
                byte[] bArr = new byte[readCommand];
                this.inStream.readFully(bArr);
                ((BytesWritable) writable).set(bArr, 0, readCommand);
            } else {
                if (!(writable instanceof Text)) {
                    if (!(writable instanceof NullWritable)) {
                        throw new IOException("Hama Pipes does only support Text as Key/Value output!");
                    }
                    throw new IOException("Cannot read data into NullWritable! Check OutputClasses!");
                }
                byte[] bArr2 = new byte[readCommand];
                this.inStream.readFully(bArr2);
                ((Text) writable).set(bArr2);
            }
        }
    }

    public BinaryProtocol(BSPPeer<K1, V1, K2, V2, BytesWritable> bSPPeer, OutputStream outputStream, InputStream inputStream) throws IOException {
        this.peer = bSPPeer;
        OutputStream outputStream2 = outputStream;
        this.stream = new DataOutputStream(new BufferedOutputStream(Submitter.getKeepCommandFile(bSPPeer.getConfiguration()) ? new TeeOutputStream("downlink.data", outputStream2) : outputStream2, BUFFER_SIZE));
        this.uplink = getUplinkReader(bSPPeer, inputStream);
        this.uplink.setName("pipe-uplink-handler");
        this.uplink.start();
    }

    public BinaryProtocol<K1, V1, K2, V2>.UplinkReaderThread getUplinkReader(BSPPeer<K1, V1, K2, V2, BytesWritable> bSPPeer, InputStream inputStream) throws IOException {
        return new UplinkReaderThread(bSPPeer, inputStream);
    }

    @Override // org.apache.hama.pipes.DownwardProtocol
    public boolean waitForFinish() throws IOException, InterruptedException {
        while (this.hasTask) {
            try {
                Thread.sleep(100L);
            } catch (Exception e) {
                LOG.error(e);
            }
        }
        return this.hasTask;
    }

    @Override // org.apache.hama.pipes.DownwardProtocol
    public void close() throws IOException, InterruptedException {
        LOG.debug("closing connection");
        endOfInput();
        this.uplink.interrupt();
        this.uplink.join();
        this.uplink.closeConnection();
        this.stream.close();
    }

    @Override // org.apache.hama.pipes.DownwardProtocol
    public void start() throws IOException {
        LOG.debug("starting downlink");
        WritableUtils.writeVInt(this.stream, MessageType.START.code);
        WritableUtils.writeVInt(this.stream, 0);
        flush();
        LOG.debug("Sent MessageType.START");
        setBSPJob(this.peer.getConfiguration());
    }

    public void setBSPJob(Configuration configuration) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.SET_BSPJOB_CONF.code);
        ArrayList arrayList = new ArrayList();
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            arrayList.add(entry.getKey());
            arrayList.add(entry.getValue());
        }
        WritableUtils.writeVInt(this.stream, arrayList.size());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Text.writeString(this.stream, (String) it2.next());
        }
        flush();
        LOG.debug("Sent MessageType.SET_BSPJOB_CONF");
    }

    @Override // org.apache.hama.pipes.DownwardProtocol
    public void setInputTypes(String str, String str2) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.SET_INPUT_TYPES.code);
        Text.writeString(this.stream, str);
        Text.writeString(this.stream, str2);
        flush();
        LOG.debug("Sent MessageType.SET_INPUT_TYPES");
    }

    @Override // org.apache.hama.pipes.DownwardProtocol
    public void runSetup(boolean z, boolean z2) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_SETUP.code);
        WritableUtils.writeVInt(this.stream, z ? 1 : 0);
        WritableUtils.writeVInt(this.stream, z2 ? 1 : 0);
        flush();
        this.hasTask = true;
        LOG.debug("Sent MessageType.RUN_SETUP");
    }

    @Override // org.apache.hama.pipes.DownwardProtocol
    public void runBsp(boolean z, boolean z2) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_BSP.code);
        WritableUtils.writeVInt(this.stream, z ? 1 : 0);
        WritableUtils.writeVInt(this.stream, z2 ? 1 : 0);
        flush();
        this.hasTask = true;
        LOG.debug("Sent MessageType.RUN_BSP");
    }

    @Override // org.apache.hama.pipes.DownwardProtocol
    public void runCleanup(boolean z, boolean z2) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_CLEANUP.code);
        WritableUtils.writeVInt(this.stream, z ? 1 : 0);
        WritableUtils.writeVInt(this.stream, z2 ? 1 : 0);
        flush();
        this.hasTask = true;
        LOG.debug("Sent MessageType.RUN_CLEANUP");
    }

    public void endOfInput() throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.CLOSE.code);
        flush();
        LOG.debug("Sent close command");
        LOG.debug("Sent MessageType.CLOSE");
    }

    @Override // org.apache.hama.pipes.DownwardProtocol
    public void abort() throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.ABORT.code);
        flush();
        LOG.debug("Sent MessageType.ABORT");
    }

    @Override // org.apache.hama.pipes.DownwardProtocol
    public void flush() throws IOException {
        this.stream.flush();
    }

    protected void writeObject(Writable writable) throws IOException {
        if (writable instanceof Text) {
            Text text = (Text) writable;
            int length = text.getLength();
            WritableUtils.writeVInt(this.stream, length);
            this.stream.write(text.getBytes(), 0, length);
            return;
        }
        if (writable instanceof BytesWritable) {
            BytesWritable bytesWritable = (BytesWritable) writable;
            int length2 = bytesWritable.getLength();
            WritableUtils.writeVInt(this.stream, length2);
            this.stream.write(bytesWritable.getBytes(), 0, length2);
            return;
        }
        this.buffer.reset();
        writable.write(this.buffer);
        int length3 = this.buffer.getLength();
        WritableUtils.writeVInt(this.stream, length3);
        this.stream.write(this.buffer.getData(), 0, length3);
    }
}
