/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.datastream;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.experimental.CollectSink;
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;

@Experimental
public final class DataStreamUtils {
    public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws IOException {
        InetAddress clientAddress;
        TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
        SocketStreamIterator iter = new SocketStreamIterator(serializer);
        StreamExecutionEnvironment env = stream.getExecutionEnvironment();
        if (env instanceof RemoteStreamEnvironment) {
            String host = ((RemoteStreamEnvironment)env).getHost();
            int port = ((RemoteStreamEnvironment)env).getPort();
            try {
                clientAddress = ConnectionUtils.findConnectingAddress((InetSocketAddress)new InetSocketAddress(host, port), (long)2000L, (long)400L);
            }
            catch (Exception e) {
                throw new IOException("Could not determine an suitable network address to receive back data from the streaming program.", e);
            }
        }
        if (env instanceof LocalStreamEnvironment) {
            clientAddress = InetAddress.getLoopbackAddress();
        } else {
            try {
                clientAddress = InetAddress.getLocalHost();
            }
            catch (UnknownHostException e) {
                throw new IOException("Could not determine this machines own local address to receive back data from the streaming program.", e);
            }
        }
        DataStreamSink<OUT> sink = stream.addSink(new CollectSink(clientAddress, iter.getPort(), serializer));
        sink.setParallelism(1);
        new CallExecute(env, iter).start();
        return iter;
    }

    public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(DataStream<T> stream, KeySelector<T, K> keySelector) {
        return DataStreamUtils.reinterpretAsKeyedStream(stream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, stream.getType()));
    }

    public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(DataStream<T> stream, KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {
        PartitionTransformation<T> partitionTransformation = new PartitionTransformation<T>(stream.getTransformation(), new ForwardPartitioner());
        return new KeyedStream<T, K>(stream, partitionTransformation, keySelector, typeInfo);
    }

    private DataStreamUtils() {
    }

    private static class CallExecute
    extends Thread {
        private final StreamExecutionEnvironment toTrigger;
        private final SocketStreamIterator<?> toNotify;

        private CallExecute(StreamExecutionEnvironment toTrigger, SocketStreamIterator<?> toNotify) {
            this.toTrigger = toTrigger;
            this.toNotify = toNotify;
        }

        @Override
        public void run() {
            try {
                this.toTrigger.execute();
            }
            catch (Throwable t) {
                this.toNotify.notifyOfError(t);
            }
        }
    }
}

