package org.apache.hama.pipes.protocol;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.pipes.Submitter;

/* loaded from: input_file:org/apache/hama/pipes/protocol/BinaryProtocol.class */
public class BinaryProtocol<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable> implements DownwardProtocol<K1, V1, K2, V2> {
    protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class.getName());
    public static final int CURRENT_PROTOCOL_VERSION = 0;
    protected static final int BUFFER_SIZE = 131072;
    protected final DataOutputStream stream;
    protected final DataOutputBuffer buffer;
    private UplinkReader<K1, V1, K2, V2> uplink;
    public final Object hasTaskLock;
    private boolean hasTask;
    public final Object resultLock;
    private Integer resultInt;
    protected final BSPPeer<K1, V1, K2, V2, BytesWritable> peer;
    private Configuration conf;

    /* loaded from: input_file:org/apache/hama/pipes/protocol/BinaryProtocol$TeeOutputStream.class */
    private static class TeeOutputStream extends FilterOutputStream {
        private OutputStream file;

        TeeOutputStream(String str, OutputStream outputStream) throws IOException {
            super(outputStream);
            this.file = new FileOutputStream(str);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            this.file.write(bArr, i, i2);
            this.out.write(bArr, i, i2);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream
        public void write(int i) throws IOException {
            this.file.write(i);
            this.out.write(i);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            this.file.flush();
            this.out.flush();
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            flush();
            this.file.close();
            this.out.close();
        }
    }

    public BinaryProtocol(Configuration configuration, OutputStream outputStream, InputStream inputStream) throws IOException {
        this.buffer = new DataOutputBuffer();
        this.hasTaskLock = new Object();
        this.hasTask = false;
        this.resultLock = new Object();
        this.resultInt = null;
        this.conf = configuration;
        this.peer = null;
        this.stream = new DataOutputStream(new BufferedOutputStream(Submitter.getKeepCommandFile(configuration) ? new TeeOutputStream("downlink.data", outputStream) : outputStream, BUFFER_SIZE));
        this.uplink = new UplinkReader<>(this, configuration, inputStream);
        this.uplink.setName("pipe-uplink-handler");
        this.uplink.start();
    }

    public BinaryProtocol(BSPPeer<K1, V1, K2, V2, BytesWritable> bSPPeer, OutputStream outputStream, InputStream inputStream) throws IOException {
        this.buffer = new DataOutputBuffer();
        this.hasTaskLock = new Object();
        this.hasTask = false;
        this.resultLock = new Object();
        this.resultInt = null;
        this.peer = bSPPeer;
        this.conf = bSPPeer.getConfiguration();
        this.stream = new DataOutputStream(new BufferedOutputStream(Submitter.getKeepCommandFile(this.conf) ? new TeeOutputStream("downlink.data", outputStream) : outputStream, BUFFER_SIZE));
        this.uplink = getUplinkReader(bSPPeer, inputStream);
        this.uplink.setName("pipe-uplink-handler");
        this.uplink.start();
    }

    public UplinkReader<K1, V1, K2, V2> getUplinkReader(BSPPeer<K1, V1, K2, V2, BytesWritable> bSPPeer, InputStream inputStream) throws IOException {
        return new UplinkReader<>(this, bSPPeer, inputStream);
    }

    public boolean isHasTask() {
        return this.hasTask;
    }

    public synchronized void setHasTask(boolean z) {
        this.hasTask = z;
    }

    public synchronized void setResult(int i) {
        this.resultInt = Integer.valueOf(i);
    }

    public DataOutputStream getStream() {
        return this.stream;
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public void start() throws IOException {
        LOG.debug("starting downlink");
        WritableUtils.writeVInt(this.stream, MessageType.START.code);
        WritableUtils.writeVInt(this.stream, 0);
        flush();
        LOG.debug("Sent MessageType.START");
        setBSPJobConf(this.conf);
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public void setBSPJobConf(Configuration configuration) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.SET_BSPJOB_CONF.code);
        ArrayList<Map.Entry> arrayList = new ArrayList();
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            arrayList.add((Map.Entry) it.next());
        }
        WritableUtils.writeVInt(this.stream, arrayList.size());
        for (Map.Entry entry : arrayList) {
            Text.writeString(this.stream, (String) entry.getKey());
            Text.writeString(this.stream, (String) entry.getValue());
        }
        flush();
        LOG.debug("Sent MessageType.SET_BSPJOB_CONF including " + arrayList.size() + " entries.");
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public void setInputTypes(String str, String str2) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.SET_INPUT_TYPES.code);
        Text.writeString(this.stream, str);
        Text.writeString(this.stream, str2);
        flush();
        LOG.debug("Sent MessageType.SET_INPUT_TYPES");
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public void runSetup(boolean z, boolean z2) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_SETUP.code);
        WritableUtils.writeVInt(this.stream, z ? 1 : 0);
        WritableUtils.writeVInt(this.stream, z2 ? 1 : 0);
        flush();
        setHasTask(true);
        LOG.debug("Sent MessageType.RUN_SETUP");
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public void runBsp(boolean z, boolean z2) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_BSP.code);
        WritableUtils.writeVInt(this.stream, z ? 1 : 0);
        WritableUtils.writeVInt(this.stream, z2 ? 1 : 0);
        flush();
        setHasTask(true);
        LOG.debug("Sent MessageType.RUN_BSP");
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public void runCleanup(boolean z, boolean z2) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_CLEANUP.code);
        WritableUtils.writeVInt(this.stream, z ? 1 : 0);
        WritableUtils.writeVInt(this.stream, z2 ? 1 : 0);
        flush();
        setHasTask(true);
        LOG.debug("Sent MessageType.RUN_CLEANUP");
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public int getPartition(String str, String str2, int i) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.PARTITION_REQUEST.code);
        Text.writeString(this.stream, str);
        Text.writeString(this.stream, str2);
        WritableUtils.writeVInt(this.stream, i);
        flush();
        LOG.debug("Sent MessageType.PARTITION_REQUEST - key: " + str + " value: " + str2.substring(0, 10) + "... numTasks: " + i);
        int i2 = 0;
        synchronized (this.resultLock) {
            while (this.resultInt == null) {
                try {
                    this.resultLock.wait();
                } catch (InterruptedException e) {
                    LOG.error(e);
                }
            }
            i2 = this.resultInt.intValue();
            this.resultInt = null;
        }
        return i2;
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public void abort() throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.ABORT.code);
        flush();
        LOG.debug("Sent MessageType.ABORT");
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public void flush() throws IOException {
        this.stream.flush();
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public void close() throws IOException, InterruptedException {
        LOG.debug("closing connection");
        endOfInput();
        this.uplink.interrupt();
        this.uplink.join();
        this.uplink.closeConnection();
        this.stream.close();
    }

    @Override // org.apache.hama.pipes.protocol.DownwardProtocol
    public boolean waitForFinish() throws IOException, InterruptedException {
        synchronized (this.hasTaskLock) {
            while (this.hasTask) {
                try {
                    this.hasTaskLock.wait();
                } catch (InterruptedException e) {
                    LOG.error(e);
                }
            }
        }
        return this.hasTask;
    }

    public void endOfInput() throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.CLOSE.code);
        flush();
        LOG.debug("Sent close command");
        LOG.debug("Sent MessageType.CLOSE");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeObject(Writable writable) throws IOException {
        if (writable instanceof Text) {
            Text text = (Text) writable;
            int length = text.getLength();
            WritableUtils.writeVInt(this.stream, length);
            this.stream.write(text.getBytes(), 0, length);
            return;
        }
        if (writable instanceof BytesWritable) {
            BytesWritable bytesWritable = (BytesWritable) writable;
            int length2 = bytesWritable.getLength();
            WritableUtils.writeVInt(this.stream, length2);
            this.stream.write(bytesWritable.getBytes(), 0, length2);
            return;
        }
        this.buffer.reset();
        writable.write(this.buffer);
        int length3 = this.buffer.getLength();
        WritableUtils.writeVInt(this.stream, length3);
        this.stream.write(this.buffer.getData(), 0, length3);
    }
}
