/*
 * Decompiled with CFR 0.152.
 */
package com.basho.riak.client.api.commands.mapreduce;

import com.basho.riak.client.api.RiakException;
import com.basho.riak.client.api.StreamableRiakCommand;
import com.basho.riak.client.api.commands.mapreduce.BinaryValueSerializer;
import com.basho.riak.client.api.commands.mapreduce.BucketInput;
import com.basho.riak.client.api.commands.mapreduce.BucketInputSerializer;
import com.basho.riak.client.api.commands.mapreduce.BucketKeyInput;
import com.basho.riak.client.api.commands.mapreduce.BucketKeyInputSerializer;
import com.basho.riak.client.api.commands.mapreduce.FunctionPhase;
import com.basho.riak.client.api.commands.mapreduce.FunctionPhaseSerializer;
import com.basho.riak.client.api.commands.mapreduce.IndexInput;
import com.basho.riak.client.api.commands.mapreduce.IndexInputSerializer;
import com.basho.riak.client.api.commands.mapreduce.LinkPhase;
import com.basho.riak.client.api.commands.mapreduce.LinkPhaseSerializer;
import com.basho.riak.client.api.commands.mapreduce.MapPhase;
import com.basho.riak.client.api.commands.mapreduce.MapReduceInput;
import com.basho.riak.client.api.commands.mapreduce.MapReducePhase;
import com.basho.riak.client.api.commands.mapreduce.MapReduceSpec;
import com.basho.riak.client.api.commands.mapreduce.ReducePhase;
import com.basho.riak.client.api.commands.mapreduce.SearchInput;
import com.basho.riak.client.api.commands.mapreduce.SearchInputSerializer;
import com.basho.riak.client.api.convert.ConversionException;
import com.basho.riak.client.core.FutureOperation;
import com.basho.riak.client.core.StreamingRiakFuture;
import com.basho.riak.client.core.operations.MapReduceOperation;
import com.basho.riak.client.core.query.functions.Function;
import com.basho.riak.client.core.util.BinaryValue;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TransferQueue;

public abstract class MapReduce
extends StreamableRiakCommand.StreamableRiakCommandWithSameInfo<Response, BinaryValue, MapReduceOperation.Response> {
    private final MapReduceSpec spec;
    static ObjectMapper mrObjectMapper = MapReduce.initializeMRObjectMapper();

    protected MapReduce(MapReduceInput input, Builder builder) {
        this.spec = new MapReduceSpec(input, builder.phases, builder.timeout);
    }

    protected MapReduceOperation buildCoreOperation(boolean streamResults) {
        BinaryValue jobSpec;
        try {
            String spec = this.writeSpec();
            jobSpec = BinaryValue.create(spec);
        }
        catch (RiakException e) {
            throw new RuntimeException(e);
        }
        return new MapReduceOperation.Builder(jobSpec).streamResults(streamResults).build();
    }

    @Override
    protected Response convertResponse(FutureOperation<MapReduceOperation.Response, ?, BinaryValue> request, MapReduceOperation.Response coreResponse) {
        return new Response(coreResponse.getResults());
    }

    @Override
    protected Response createResponse(int timeout, StreamingRiakFuture<MapReduceOperation.Response, BinaryValue> coreFuture) {
        return new Response(coreFuture, timeout);
    }

    String writeSpec() throws RiakException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try {
            JsonGenerator jg = new JsonFactory().createGenerator((OutputStream)out, JsonEncoding.UTF8);
            jg.setCodec((ObjectCodec)mrObjectMapper);
            List<MapReducePhase> phases = this.spec.getPhases();
            phases.get(phases.size() - 1).setKeep(true);
            jg.writeObject((Object)this.spec);
            jg.flush();
            return out.toString("UTF-8");
        }
        catch (IOException e) {
            throw new RiakException(e);
        }
    }

    private static ObjectMapper initializeMRObjectMapper() {
        ObjectMapper objectMapper = new ObjectMapper();
        SimpleModule specModule = new SimpleModule("SpecModule", Version.unknownVersion());
        specModule.addSerializer(LinkPhase.class, (JsonSerializer)new LinkPhaseSerializer());
        specModule.addSerializer(FunctionPhase.class, (JsonSerializer)new FunctionPhaseSerializer());
        specModule.addSerializer(BucketInput.class, (JsonSerializer)new BucketInputSerializer());
        specModule.addSerializer(SearchInput.class, (JsonSerializer)new SearchInputSerializer());
        specModule.addSerializer(BucketKeyInput.class, (JsonSerializer)new BucketKeyInputSerializer());
        specModule.addSerializer(IndexInput.class, (JsonSerializer)new IndexInputSerializer());
        specModule.addSerializer(BinaryValue.class, (JsonSerializer)new BinaryValueSerializer());
        objectMapper.registerModule((Module)specModule);
        return objectMapper;
    }

    public static class Response
    extends StreamableRiakCommand.StreamableResponse<Response, BinaryValue> {
        private final Map<Integer, ArrayNode> results;
        private final MapReduceResponseIterator responseIterator;

        Response(StreamingRiakFuture<MapReduceOperation.Response, BinaryValue> coreFuture, int pollTimeout) {
            this.responseIterator = new MapReduceResponseIterator(coreFuture, pollTimeout);
            this.results = null;
        }

        public Response(Map<Integer, ArrayNode> results) {
            this.results = results;
            this.responseIterator = null;
        }

        @Override
        public boolean isStreaming() {
            return this.responseIterator != null;
        }

        public boolean hasResultForPhase(int i) {
            return this.results.containsKey(i);
        }

        public ArrayNode getResultForPhase(int i) {
            return this.results.get(i);
        }

        public ArrayNode getResultsFromAllPhases() {
            return this.flattenResults();
        }

        public <T> Collection<T> getResultsFromAllPhases(Class<T> resultType) {
            ArrayNode flat = this.flattenResults();
            ObjectMapper mapper = new ObjectMapper();
            try {
                return (Collection)mapper.readValue(flat.toString(), (JavaType)mapper.getTypeFactory().constructCollectionType(Collection.class, resultType));
            }
            catch (IOException ex) {
                throw new ConversionException("Could not convert Mapreduce response", ex);
            }
        }

        private ArrayNode flattenResults() {
            JsonNodeFactory factory = JsonNodeFactory.instance;
            ArrayNode flatArray = factory.arrayNode();
            for (Map.Entry<Integer, ArrayNode> entry : this.results.entrySet()) {
                flatArray.addAll(entry.getValue());
            }
            return flatArray;
        }

        @Override
        public Iterator<Response> iterator() {
            if (this.isStreaming()) {
                return this.responseIterator;
            }
            throw new UnsupportedOperationException("Iterating is only supported for streamable response.");
        }

        private class MapReduceResponseIterator
        implements Iterator<Response> {
            final StreamingRiakFuture<MapReduceOperation.Response, BinaryValue> coreFuture;
            final TransferQueue<MapReduceOperation.Response> resultsQueue;
            private final int pollTimeout;

            MapReduceResponseIterator(StreamingRiakFuture<MapReduceOperation.Response, BinaryValue> coreFuture, int pollTimeout) {
                this.coreFuture = coreFuture;
                this.resultsQueue = coreFuture.getResultsQueue();
                this.pollTimeout = pollTimeout;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean hasNext() {
                boolean interrupted = Thread.interrupted();
                try {
                    boolean interruptedLastLoop;
                    boolean foundEntry = false;
                    do {
                        interruptedLastLoop = false;
                        try {
                            foundEntry = this.peekWaitForNextQueueEntry();
                        }
                        catch (InterruptedException e) {
                            interrupted = true;
                            interruptedLastLoop = true;
                        }
                    } while (interruptedLastLoop);
                    boolean bl = foundEntry;
                    return bl;
                }
                finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
            }

            private boolean peekWaitForNextQueueEntry() throws InterruptedException {
                while (this.resultsQueue.isEmpty() && !this.coreFuture.isDone()) {
                    if (!this.resultsQueue.isEmpty()) continue;
                    Thread.sleep(this.pollTimeout);
                }
                return !this.resultsQueue.isEmpty();
            }

            @Override
            public Response next() {
                MapReduceOperation.Response responseChunk = (MapReduceOperation.Response)this.resultsQueue.remove();
                return new Response(responseChunk.getResults());
            }
        }
    }

    protected static abstract class Builder<T extends Builder<T>> {
        protected final List<MapReducePhase> phases = new LinkedList<MapReducePhase>();
        protected Long timeout;

        protected Builder() {
        }

        public T timeout(long timeout) {
            this.timeout = timeout;
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withMapPhase(Function phaseFunction, boolean keep) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new MapPhase(phaseFunction, keep));
            }
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withMapPhase(Function phaseFunction, Object arg, boolean keep) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new MapPhase(phaseFunction, arg, keep));
            }
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withMapPhase(Function phaseFunction, Object arg) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new MapPhase(phaseFunction, arg));
            }
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withMapPhase(Function phaseFunction) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new MapPhase(phaseFunction));
            }
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withReducePhase(Function phaseFunction, boolean keep) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new ReducePhase(phaseFunction, keep));
            }
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withReducePhase(Function phaseFunction, Object arg, boolean keep) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new ReducePhase(phaseFunction, arg, keep));
            }
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withReducePhase(Function phaseFunction, Object arg) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new ReducePhase(phaseFunction, arg));
            }
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withReducePhase(Function phaseFunction) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new ReducePhase(phaseFunction));
            }
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withLinkPhase(String bucket, String tag, boolean keep) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new LinkPhase(bucket, tag, keep));
            }
            return this.self();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public T withLinkPhase(String bucket, String tag) {
            List<MapReducePhase> list = this.phases;
            synchronized (list) {
                this.phases.add(new LinkPhase(bucket, tag));
            }
            return this.self();
        }

        protected abstract T self();
    }
}

