package org.apache.hadoop.mapred.pipes;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:lib/hadoop-mapreduce-client-core-2.7.4.0.jar:org/apache/hadoop/mapred/pipes/BinaryProtocol.class */
public class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable, K2 extends WritableComparable, V2 extends Writable> implements DownwardProtocol<K1, V1> {
    public static final int CURRENT_PROTOCOL_VERSION = 0;
    private static final int BUFFER_SIZE = 131072;
    private DataOutputStream stream;
    private DataOutputBuffer buffer = new DataOutputBuffer();
    private static final Log LOG = LogFactory.getLog(BinaryProtocol.class.getName());
    private UplinkReaderThread uplink;

    /* loaded from: input_file:lib/hadoop-mapreduce-client-core-2.7.4.0.jar:org/apache/hadoop/mapred/pipes/BinaryProtocol$MessageType.class */
    private enum MessageType {
        START(0),
        SET_JOB_CONF(1),
        SET_INPUT_TYPES(2),
        RUN_MAP(3),
        MAP_ITEM(4),
        RUN_REDUCE(5),
        REDUCE_KEY(6),
        REDUCE_VALUE(7),
        CLOSE(8),
        ABORT(9),
        AUTHENTICATION_REQ(10),
        OUTPUT(50),
        PARTITIONED_OUTPUT(51),
        STATUS(52),
        PROGRESS(53),
        DONE(54),
        REGISTER_COUNTER(55),
        INCREMENT_COUNTER(56),
        AUTHENTICATION_RESP(57);

        final int code;

        MessageType(int i) {
            this.code = i;
        }
    }

    /* loaded from: input_file:lib/hadoop-mapreduce-client-core-2.7.4.0.jar:org/apache/hadoop/mapred/pipes/BinaryProtocol$TeeOutputStream.class */
    private static class TeeOutputStream extends FilterOutputStream {
        private OutputStream file;

        TeeOutputStream(String str, OutputStream outputStream) throws IOException {
            super(outputStream);
            this.file = new FileOutputStream(str);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            this.file.write(bArr, i, i2);
            this.out.write(bArr, i, i2);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream
        public void write(int i) throws IOException {
            this.file.write(i);
            this.out.write(i);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            this.file.flush();
            this.out.flush();
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            flush();
            this.file.close();
            this.out.close();
        }
    }

    /* loaded from: input_file:lib/hadoop-mapreduce-client-core-2.7.4.0.jar:org/apache/hadoop/mapred/pipes/BinaryProtocol$UplinkReaderThread.class */
    private static class UplinkReaderThread<K2 extends WritableComparable, V2 extends Writable> extends Thread {
        private DataInputStream inStream;
        private UpwardProtocol<K2, V2> handler;
        private K2 key;
        private V2 value;
        private boolean authPending = true;

        public UplinkReaderThread(InputStream inputStream, UpwardProtocol<K2, V2> upwardProtocol, K2 k2, V2 v2) throws IOException {
            this.inStream = new DataInputStream(new BufferedInputStream(inputStream, 131072));
            this.handler = upwardProtocol;
            this.key = k2;
            this.value = v2;
        }

        public void closeConnection() throws IOException {
            this.inStream.close();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    int readVInt = WritableUtils.readVInt(this.inStream);
                    BinaryProtocol.LOG.debug("Handling uplink command " + readVInt);
                    if (readVInt == MessageType.AUTHENTICATION_RESP.code) {
                        this.authPending = !this.handler.authenticate(Text.readString(this.inStream));
                    } else if (this.authPending) {
                        BinaryProtocol.LOG.warn("Message " + readVInt + " received before authentication is complete. Ignoring");
                    } else if (readVInt == MessageType.OUTPUT.code) {
                        readObject(this.key);
                        readObject(this.value);
                        this.handler.output(this.key, this.value);
                    } else if (readVInt == MessageType.PARTITIONED_OUTPUT.code) {
                        int readVInt2 = WritableUtils.readVInt(this.inStream);
                        readObject(this.key);
                        readObject(this.value);
                        this.handler.partitionedOutput(readVInt2, this.key, this.value);
                    } else if (readVInt == MessageType.STATUS.code) {
                        this.handler.status(Text.readString(this.inStream));
                    } else if (readVInt == MessageType.PROGRESS.code) {
                        this.handler.progress(this.inStream.readFloat());
                    } else if (readVInt == MessageType.REGISTER_COUNTER.code) {
                        this.handler.registerCounter(WritableUtils.readVInt(this.inStream), Text.readString(this.inStream), Text.readString(this.inStream));
                    } else {
                        if (readVInt != MessageType.INCREMENT_COUNTER.code) {
                            if (readVInt != MessageType.DONE.code) {
                                throw new IOException("Bad command code: " + readVInt);
                            }
                            BinaryProtocol.LOG.debug("Pipe child done");
                            this.handler.done();
                            return;
                        }
                        this.handler.incrementCounter(WritableUtils.readVInt(this.inStream), WritableUtils.readVLong(this.inStream));
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Throwable th) {
                    BinaryProtocol.LOG.error(StringUtils.stringifyException(th));
                    this.handler.failed(th);
                    return;
                }
            }
            throw new InterruptedException();
        }

        private void readObject(Writable writable) throws IOException {
            int readVInt = WritableUtils.readVInt(this.inStream);
            if (writable instanceof BytesWritable) {
                byte[] bArr = new byte[readVInt];
                this.inStream.readFully(bArr);
                ((BytesWritable) writable).set(bArr, 0, readVInt);
            } else {
                if (!(writable instanceof Text)) {
                    writable.readFields(this.inStream);
                    return;
                }
                byte[] bArr2 = new byte[readVInt];
                this.inStream.readFully(bArr2);
                ((Text) writable).set(bArr2);
            }
        }
    }

    public BinaryProtocol(Socket socket, UpwardProtocol<K2, V2> upwardProtocol, K2 k2, V2 v2, JobConf jobConf) throws IOException {
        OutputStream outputStream = socket.getOutputStream();
        this.stream = new DataOutputStream(new BufferedOutputStream(Submitter.getKeepCommandFile(jobConf) ? new TeeOutputStream("downlink.data", outputStream) : outputStream, 131072));
        this.uplink = new UplinkReaderThread(socket.getInputStream(), upwardProtocol, k2, v2);
        this.uplink.setName("pipe-uplink-handler");
        this.uplink.start();
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void close() throws IOException, InterruptedException {
        LOG.debug("closing connection");
        this.stream.close();
        this.uplink.closeConnection();
        this.uplink.interrupt();
        this.uplink.join();
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void authenticate(String str, String str2) throws IOException {
        LOG.debug("Sending AUTHENTICATION_REQ, digest=" + str + ", challenge=" + str2);
        WritableUtils.writeVInt(this.stream, MessageType.AUTHENTICATION_REQ.code);
        Text.writeString(this.stream, str);
        Text.writeString(this.stream, str2);
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void start() throws IOException {
        LOG.debug("starting downlink");
        WritableUtils.writeVInt(this.stream, MessageType.START.code);
        WritableUtils.writeVInt(this.stream, 0);
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void setJobConf(JobConf jobConf) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.SET_JOB_CONF.code);
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, String>> it = jobConf.iterator();
        while (it.hasNext()) {
            Map.Entry<String, String> next = it.next();
            arrayList.add(next.getKey());
            arrayList.add(next.getValue());
        }
        WritableUtils.writeVInt(this.stream, arrayList.size());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Text.writeString(this.stream, (String) it2.next());
        }
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void setInputTypes(String str, String str2) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.SET_INPUT_TYPES.code);
        Text.writeString(this.stream, str);
        Text.writeString(this.stream, str2);
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void runMap(InputSplit inputSplit, int i, boolean z) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_MAP.code);
        writeObject(inputSplit);
        WritableUtils.writeVInt(this.stream, i);
        WritableUtils.writeVInt(this.stream, z ? 1 : 0);
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void mapItem(WritableComparable writableComparable, Writable writable) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.MAP_ITEM.code);
        writeObject(writableComparable);
        writeObject(writable);
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void runReduce(int i, boolean z) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.RUN_REDUCE.code);
        WritableUtils.writeVInt(this.stream, i);
        WritableUtils.writeVInt(this.stream, z ? 1 : 0);
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void reduceKey(WritableComparable writableComparable) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.REDUCE_KEY.code);
        writeObject(writableComparable);
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void reduceValue(Writable writable) throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.REDUCE_VALUE.code);
        writeObject(writable);
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void endOfInput() throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.CLOSE.code);
        LOG.debug("Sent close command");
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void abort() throws IOException {
        WritableUtils.writeVInt(this.stream, MessageType.ABORT.code);
        LOG.debug("Sent abort command");
    }

    @Override // org.apache.hadoop.mapred.pipes.DownwardProtocol
    public void flush() throws IOException {
        this.stream.flush();
    }

    private void writeObject(Writable writable) throws IOException {
        if (writable instanceof Text) {
            Text text = (Text) writable;
            int length = text.getLength();
            WritableUtils.writeVInt(this.stream, length);
            this.stream.write(text.getBytes(), 0, length);
            return;
        }
        if (writable instanceof BytesWritable) {
            BytesWritable bytesWritable = (BytesWritable) writable;
            int length2 = bytesWritable.getLength();
            WritableUtils.writeVInt(this.stream, length2);
            this.stream.write(bytesWritable.getBytes(), 0, length2);
            return;
        }
        this.buffer.reset();
        writable.write(this.buffer);
        int length3 = this.buffer.getLength();
        WritableUtils.writeVInt(this.stream, length3);
        this.stream.write(this.buffer.getData(), 0, length3);
    }
}
