/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.examples;

import java.util.ArrayList;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.EdgesFunctionWithVertexValue;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexJoinFunction;
import org.apache.flink.graph.examples.data.MusicProfilesData;
import org.apache.flink.graph.library.LabelPropagation;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;

public class MusicProfiles
implements ProgramDescription {
    private static boolean fileOutput = false;
    private static String userSongTripletsInputPath = null;
    private static String mismatchesInputPath = null;
    private static String topTracksOutputPath = null;
    private static int playcountThreshold = 0;
    private static String communitiesOutputPath = null;
    private static int maxIterations = 10;

    public static void main(String[] args) throws Exception {
        if (!MusicProfiles.parseParameters(args)) {
            return;
        }
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<String, String, Integer>> triplets = MusicProfiles.getUserSongTripletsData(env);
        MapOperator mismatches = MusicProfiles.getMismatchesData(env).map((MapFunction)new ExtractMismatchSongIds());
        CoGroupOperator validTriplets = triplets.coGroup((DataSet)mismatches).where(new int[]{1}).equalTo(new int[]{0}).with((CoGroupFunction)new FilterOutMismatches());
        Graph userSongGraph = Graph.fromTupleDataSet((DataSet)validTriplets, (ExecutionEnvironment)env);
        FilterOperator usersWithTopTrack = userSongGraph.groupReduceOnEdges((EdgesFunctionWithVertexValue)new GetTopSongPerUser(), EdgeDirection.OUT).filter((FilterFunction)new FilterSongNodes());
        if (fileOutput) {
            usersWithTopTrack.writeAsCsv(topTracksOutputPath, "\n", "\t");
        } else {
            usersWithTopTrack.print();
        }
        DistinctOperator similarUsers = userSongGraph.getEdges().filter((FilterFunction)new FilterFunction<Edge<String, Integer>>(){

            public boolean filter(Edge<String, Integer> edge) {
                return (Integer)edge.getValue() > playcountThreshold;
            }
        }).groupBy(new int[]{1}).reduceGroup((GroupReduceFunction)new CreateSimilarUserEdges()).distinct();
        Graph similarUsersGraph = Graph.fromDataSet((DataSet)similarUsers, (MapFunction)new MapFunction<String, Long>(){

            public Long map(String value) {
                return 1L;
            }
        }, (ExecutionEnvironment)env).getUndirected();
        MapOperator idsWithInitialLabels = DataSetUtils.zipWithUniqueId((DataSet)similarUsersGraph.getVertexIds()).map((MapFunction)new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>(){

            public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception {
                return new Tuple2(tuple2.f1, tuple2.f0);
            }
        });
        DataSet verticesWithCommunity = (DataSet)similarUsersGraph.joinWithVertices((DataSet)idsWithInitialLabels, (VertexJoinFunction)new VertexJoinFunction<Long, Long>(){

            public Long vertexJoin(Long vertexValue, Long inputValue) {
                return inputValue;
            }
        }).run((GraphAlgorithm)new LabelPropagation(maxIterations));
        if (fileOutput) {
            verticesWithCommunity.writeAsCsv(communitiesOutputPath, "\n", "\t");
            env.execute();
        } else {
            verticesWithCommunity.print();
        }
    }

    public String getDescription() {
        return "Music Profiles Example";
    }

    private static boolean parseParameters(String[] args) {
        if (args.length > 0) {
            if (args.length != 6) {
                System.err.println("Usage: MusicProfiles <input user song triplets path> <input song mismatches path> <output top tracks path> <playcount threshold> <output communities path> <num iterations>");
                return false;
            }
            fileOutput = true;
            userSongTripletsInputPath = args[0];
            mismatchesInputPath = args[1];
            topTracksOutputPath = args[2];
            playcountThreshold = Integer.parseInt(args[3]);
            communitiesOutputPath = args[4];
            maxIterations = Integer.parseInt(args[5]);
        } else {
            System.out.println("Executing Music Profiles example with default parameters and built-in default data.");
            System.out.println("  Provide parameters to read input data from files.");
            System.out.println("  See the documentation for the correct format of input files.");
            System.out.println("Usage: MusicProfiles <input user song triplets path> <input song mismatches path> <output top tracks path> <playcount threshold> <output communities path> <num iterations>");
        }
        return true;
    }

    private static DataSet<Tuple3<String, String, Integer>> getUserSongTripletsData(ExecutionEnvironment env) {
        if (fileOutput) {
            return env.readCsvFile(userSongTripletsInputPath).lineDelimiter("\n").fieldDelimiter("\t").types(String.class, String.class, Integer.class);
        }
        return MusicProfilesData.getUserSongTriplets(env);
    }

    private static DataSet<String> getMismatchesData(ExecutionEnvironment env) {
        if (fileOutput) {
            return env.readTextFile(mismatchesInputPath);
        }
        return MusicProfilesData.getMismatches(env);
    }

    private static final class CreateSimilarUserEdges
    implements GroupReduceFunction<Edge<String, Integer>, Edge<String, NullValue>> {
        private CreateSimilarUserEdges() {
        }

        public void reduce(Iterable<Edge<String, Integer>> edges, Collector<Edge<String, NullValue>> out) {
            ArrayList<Object> listeners = new ArrayList<Object>();
            for (Edge<String, Integer> edge : edges) {
                listeners.add(edge.getSource());
            }
            for (int i = 0; i < listeners.size() - 1; ++i) {
                for (int j = i + 1; j < listeners.size(); ++j) {
                    out.collect((Object)new Edge(listeners.get(i), listeners.get(j), (Object)NullValue.getInstance()));
                }
            }
        }
    }

    private static final class GetTopSongPerUser
    implements EdgesFunctionWithVertexValue<String, NullValue, Integer, Tuple2<String, String>> {
        private GetTopSongPerUser() {
        }

        public void iterateEdges(Vertex<String, NullValue> vertex, Iterable<Edge<String, Integer>> edges, Collector<Tuple2<String, String>> out) throws Exception {
            int maxPlaycount = 0;
            String topSong = "";
            for (Edge<String, Integer> edge : edges) {
                if ((Integer)edge.getValue() <= maxPlaycount) continue;
                maxPlaycount = (Integer)edge.getValue();
                topSong = (String)edge.getTarget();
            }
            out.collect((Object)new Tuple2(vertex.getId(), (Object)topSong));
        }
    }

    private static final class FilterSongNodes
    implements FilterFunction<Tuple2<String, String>> {
        private FilterSongNodes() {
        }

        public boolean filter(Tuple2<String, String> value) throws Exception {
            return !((String)value.f1).equals("");
        }
    }

    private static final class FilterOutMismatches
    implements CoGroupFunction<Tuple3<String, String, Integer>, Tuple1<String>, Tuple3<String, String, Integer>> {
        private FilterOutMismatches() {
        }

        public void coGroup(Iterable<Tuple3<String, String, Integer>> triplets, Iterable<Tuple1<String>> invalidSongs, Collector<Tuple3<String, String, Integer>> out) {
            if (!invalidSongs.iterator().hasNext()) {
                for (Tuple3<String, String, Integer> triplet : triplets) {
                    out.collect(triplet);
                }
            }
        }
    }

    private static final class ExtractMismatchSongIds
    implements MapFunction<String, Tuple1<String>> {
        private ExtractMismatchSongIds() {
        }

        public Tuple1<String> map(String value) {
            String[] tokens = value.split("\\s+");
            String songId = tokens[1].substring(1);
            return new Tuple1((Object)songId);
        }
    }
}

