package org.apache.hama.pipes;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.pipes.BinaryProtocol;
import org.apache.hama.util.KeyValuePair;

/* loaded from: input_file:org/apache/hama/pipes/StreamingProtocol.class */
public class StreamingProtocol<K1 extends Writable, V1 extends Writable> extends BinaryProtocol<K1, V1, Text, Text> {
    private static final Pattern PROTOCOL_STRING_PATTERN = Pattern.compile("=");
    private final CyclicBarrier ackBarrier;
    private volatile boolean brokenBarrier;

    /* loaded from: input_file:org/apache/hama/pipes/StreamingProtocol$StreamingUplinkReaderThread.class */
    public class StreamingUplinkReaderThread extends BinaryProtocol<K1, V1, Text, Text>.UplinkReaderThread {
        private BufferedReader reader;

        public StreamingUplinkReaderThread(BSPPeer<K1, V1, Text, Text, BytesWritable> bSPPeer, InputStream inputStream) throws IOException {
            super(bSPPeer, inputStream);
            this.reader = new BufferedReader(new InputStreamReader(this.inStream));
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void sendMessage() throws IOException {
            this.peer.send(this.reader.readLine(), new BytesWritable(this.reader.readLine().getBytes()));
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void getMessage() throws IOException {
            BytesWritable currentMessage = this.peer.getCurrentMessage();
            if (currentMessage != null) {
                StreamingProtocol.this.writeLine(new String(currentMessage.getBytes()));
            } else {
                StreamingProtocol.this.writeLine("%%-1%%");
            }
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void getMessageCount() throws IOException {
            StreamingProtocol.this.writeLine("" + this.peer.getNumCurrentMessages());
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void getSuperstepCount() throws IOException {
            StreamingProtocol.this.writeLine("" + this.peer.getSuperstepCount());
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void getPeerName() throws IOException {
            int parseInt = Integer.parseInt(this.reader.readLine());
            if (parseInt == -1) {
                StreamingProtocol.this.writeLine(this.peer.getPeerName());
            } else {
                StreamingProtocol.this.writeLine(this.peer.getPeerName(parseInt));
            }
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void getPeerIndex() throws IOException {
            StreamingProtocol.this.writeLine("" + this.peer.getPeerIndex());
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void getAllPeerNames() throws IOException {
            StreamingProtocol.this.writeLine("" + this.peer.getAllPeerNames().length);
            for (String str : this.peer.getAllPeerNames()) {
                StreamingProtocol.this.writeLine(str);
            }
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void getPeerCount() throws IOException {
            StreamingProtocol.this.writeLine("" + this.peer.getAllPeerNames().length);
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void sync() throws IOException, SyncException, InterruptedException {
            this.peer.sync();
            StreamingProtocol.this.writeLine(StreamingProtocol.this.getProtocolString(BinaryProtocol.MessageType.SYNC) + "_SUCCESS");
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void writeKeyValue() throws IOException {
            this.peer.write(new Text(this.reader.readLine()), new Text(this.reader.readLine()));
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void readKeyValue() throws IOException {
            KeyValuePair<K1, V1> readNext = this.peer.readNext();
            if (readNext == null) {
                StreamingProtocol.this.writeLine("%%-1%%");
                StreamingProtocol.this.writeLine("%%-1%%");
            } else {
                StreamingProtocol.this.writeLine(readNext.getKey() + "");
                StreamingProtocol.this.writeLine(readNext.getValue() + "");
            }
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void reopenInput() throws IOException {
            this.peer.reopenInput();
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public int readCommand() throws IOException {
            String readLine = this.reader.readLine();
            if (readLine == null || readLine.isEmpty()) {
                return -1;
            }
            String[] split = StreamingProtocol.PROTOCOL_STRING_PATTERN.split(readLine, 2);
            split[0] = split[0].replace("%", "");
            if (checkAcks(split)) {
                return -1;
            }
            try {
                int parseInt = Integer.parseInt(split[0]);
                if (parseInt != BinaryProtocol.MessageType.LOG.code) {
                    return parseInt;
                }
                BinaryProtocol.LOG.info(split[1]);
                return -1;
            } catch (NumberFormatException e) {
                e.printStackTrace();
                return -2;
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public void onError(Throwable th) {
            super.onError(th);
            StreamingProtocol.this.ackBarrier.reset();
            StreamingProtocol.this.brokenBarrier = true;
        }

        private boolean checkAcks(String[] strArr) {
            if (!strArr[0].startsWith("ACK_")) {
                return false;
            }
            try {
                StreamingProtocol.this.ackBarrier.await();
                return true;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return true;
            } catch (BrokenBarrierException e2) {
                e2.printStackTrace();
                return true;
            }
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public /* bridge */ /* synthetic */ void incrementCounter() throws IOException {
            super.incrementCounter();
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread, java.lang.Thread, java.lang.Runnable
        public /* bridge */ /* synthetic */ void run() {
            super.run();
        }

        @Override // org.apache.hama.pipes.BinaryProtocol.UplinkReaderThread
        public /* bridge */ /* synthetic */ void closeConnection() throws IOException {
            super.closeConnection();
        }
    }

    public StreamingProtocol(BSPPeer<K1, V1, Text, Text, BytesWritable> bSPPeer, OutputStream outputStream, InputStream inputStream) throws IOException {
        super(bSPPeer, outputStream, inputStream);
        this.ackBarrier = new CyclicBarrier(2);
        this.brokenBarrier = false;
    }

    @Override // org.apache.hama.pipes.BinaryProtocol, org.apache.hama.pipes.DownwardProtocol
    public void start() throws IOException {
        writeLine(BinaryProtocol.MessageType.START, null);
        writeLine("0");
        setBSPJob(this.peer.getConfiguration());
        try {
            this.ackBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e2) {
            e2.printStackTrace();
        }
    }

    @Override // org.apache.hama.pipes.BinaryProtocol
    public void setBSPJob(Configuration configuration) throws IOException {
        writeLine(BinaryProtocol.MessageType.SET_BSPJOB_CONF, null);
        ArrayList arrayList = new ArrayList();
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            arrayList.add(entry.getKey());
            arrayList.add(entry.getValue());
        }
        writeLine(arrayList.size());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            writeLine((String) it2.next());
        }
        flush();
    }

    @Override // org.apache.hama.pipes.BinaryProtocol, org.apache.hama.pipes.DownwardProtocol
    public void runSetup(boolean z, boolean z2) throws IOException {
        writeLine(BinaryProtocol.MessageType.RUN_SETUP, null);
        waitOnAck();
    }

    @Override // org.apache.hama.pipes.BinaryProtocol, org.apache.hama.pipes.DownwardProtocol
    public void runBsp(boolean z, boolean z2) throws IOException {
        writeLine(BinaryProtocol.MessageType.RUN_BSP, null);
        waitOnAck();
    }

    public void waitOnAck() {
        try {
            if (!this.brokenBarrier) {
                this.ackBarrier.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e2) {
            e2.printStackTrace();
        }
    }

    @Override // org.apache.hama.pipes.BinaryProtocol, org.apache.hama.pipes.DownwardProtocol
    public void runCleanup(boolean z, boolean z2) throws IOException {
        writeLine(BinaryProtocol.MessageType.RUN_CLEANUP, null);
        waitOnAck();
    }

    @Override // org.apache.hama.pipes.BinaryProtocol
    public BinaryProtocol<K1, V1, Text, Text>.UplinkReaderThread getUplinkReader(BSPPeer<K1, V1, Text, Text, BytesWritable> bSPPeer, InputStream inputStream) throws IOException {
        return new StreamingUplinkReaderThread(bSPPeer, inputStream);
    }

    public void writeLine(int i) throws IOException {
        writeLine("" + i);
    }

    public void writeLine(String str) throws IOException {
        this.stream.write((str + "\n").getBytes());
        this.stream.flush();
    }

    public void writeLine(BinaryProtocol.MessageType messageType, String str) throws IOException {
        this.stream.write((getProtocolString(messageType) + (str == null ? "" : str) + "\n").getBytes());
        this.stream.flush();
    }

    public String getProtocolString(BinaryProtocol.MessageType messageType) {
        return "%" + messageType.code + "%=";
    }
}
