/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core.construction;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.direct.repackaged.com.google.protobuf.ByteString;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nonnull;
import org.apache.beam.runners.direct.repackaged.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.java.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class TestStreamTranslation {
    private static TestStream<?> testStreamFromProtoPayload(RunnerApi.TestStreamPayload testStreamPayload, RehydratedComponents components) throws IOException {
        Coder<?> coder = components.getCoder(testStreamPayload.getCoderId());
        ArrayList events = new ArrayList();
        for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) {
            events.add(TestStreamTranslation.eventFromProto(event, coder));
        }
        return TestStream.fromRawEvents(coder, events);
    }

    public static <T> TestStream<T> getTestStream(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> application) throws IOException {
        SdkComponents sdkComponents = SdkComponents.create();
        RunnerApi.PTransform transformProto = PTransformTranslation.toProto(application, sdkComponents);
        Preconditions.checkArgument("urn:beam:transform:teststream:v1".equals(transformProto.getSpec().getUrn()), "Attempt to get %s from a transform with wrong URN %s", (Object)TestStream.class.getSimpleName(), (Object)transformProto.getSpec().getUrn());
        RunnerApi.TestStreamPayload testStreamPayload = RunnerApi.TestStreamPayload.parseFrom(transformProto.getSpec().getPayload());
        return TestStreamTranslation.testStreamFromProtoPayload(testStreamPayload, RehydratedComponents.forComponents(sdkComponents.toComponents()));
    }

    static <T> RunnerApi.TestStreamPayload.Event eventToProto(TestStream.Event<T> event, Coder<T> coder) throws IOException {
        switch (event.getType()) {
            case WATERMARK: {
                return RunnerApi.TestStreamPayload.Event.newBuilder().setWatermarkEvent(RunnerApi.TestStreamPayload.Event.AdvanceWatermark.newBuilder().setNewWatermark(((TestStream.WatermarkEvent)event).getWatermark().getMillis())).build();
            }
            case PROCESSING_TIME: {
                return RunnerApi.TestStreamPayload.Event.newBuilder().setProcessingTimeEvent(RunnerApi.TestStreamPayload.Event.AdvanceProcessingTime.newBuilder().setAdvanceDuration(((TestStream.ProcessingTimeEvent)event).getProcessingTimeAdvance().getMillis())).build();
            }
            case ELEMENT: {
                RunnerApi.TestStreamPayload.Event.AddElements.Builder builder = RunnerApi.TestStreamPayload.Event.AddElements.newBuilder();
                for (TimestampedValue element : ((TestStream.ElementEvent)event).getElements()) {
                    builder.addElements(RunnerApi.TestStreamPayload.TimestampedElement.newBuilder().setTimestamp(element.getTimestamp().getMillis()).setEncodedElement(ByteString.copyFrom(CoderUtils.encodeToByteArray(coder, (Object)element.getValue()))));
                }
                return RunnerApi.TestStreamPayload.Event.newBuilder().setElementEvent(builder).build();
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported type of %s: %s", TestStream.Event.class.getCanonicalName(), event.getType()));
    }

    static <T> TestStream.Event<T> eventFromProto(RunnerApi.TestStreamPayload.Event protoEvent, Coder<T> coder) throws IOException {
        switch (protoEvent.getEventCase()) {
            case WATERMARK_EVENT: {
                return TestStream.WatermarkEvent.advanceTo((Instant)new Instant(protoEvent.getWatermarkEvent().getNewWatermark()));
            }
            case PROCESSING_TIME_EVENT: {
                return TestStream.ProcessingTimeEvent.advanceBy((Duration)Duration.millis((long)protoEvent.getProcessingTimeEvent().getAdvanceDuration()));
            }
            case ELEMENT_EVENT: {
                ArrayList<TimestampedValue> decodedElements = new ArrayList<TimestampedValue>();
                for (RunnerApi.TestStreamPayload.TimestampedElement element : protoEvent.getElementEvent().getElementsList()) {
                    decodedElements.add(TimestampedValue.of((Object)CoderUtils.decodeFromByteArray(coder, (byte[])element.getEncodedElement().toByteArray()), (Instant)new Instant(element.getTimestamp())));
                }
                return TestStream.ElementEvent.add(decodedElements);
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported type of %s: %s", RunnerApi.TestStreamPayload.Event.class.getCanonicalName(), protoEvent.getEventCase()));
    }

    static RunnerApi.TestStreamPayload payloadForTestStreamLike(TestStreamLike transform, SdkComponents components) throws IOException {
        return RunnerApi.TestStreamPayload.newBuilder().setCoderId(components.registerCoder(transform.getValueCoder())).addAllEvents(transform.getEvents()).build();
    }

    @VisibleForTesting
    static <T> RunnerApi.TestStreamPayload payloadForTestStream(final TestStream<T> testStream, SdkComponents components) throws IOException {
        return TestStreamTranslation.payloadForTestStreamLike(new TestStreamLike(){

            public Coder<T> getValueCoder() {
                return testStream.getValueCoder();
            }

            @Override
            public List<RunnerApi.TestStreamPayload.Event> getEvents() {
                try {
                    ArrayList<RunnerApi.TestStreamPayload.Event> protoEvents = new ArrayList<RunnerApi.TestStreamPayload.Event>();
                    for (TestStream.Event event : testStream.getEvents()) {
                        protoEvents.add(TestStreamTranslation.eventToProto(event, testStream.getValueCoder()));
                    }
                    return protoEvents;
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }, components);
    }

    static class TestStreamTranslator
    implements PTransformTranslation.TransformPayloadTranslator<TestStream<?>> {
        TestStreamTranslator() {
        }

        @Override
        public String getUrn(TestStream<?> transform) {
            return "urn:beam:transform:teststream:v1";
        }

        @Override
        public RunnerApi.FunctionSpec translate(AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components) throws IOException {
            return this.translateTyped((TestStream)transform.getTransform(), components);
        }

        @Override
        public PTransformTranslation.RawPTransform<?, ?> rehydrate(RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) throws IOException {
            Preconditions.checkArgument(protoTransform.getSpec() != null, "%s received transform with null spec", (Object)this.getClass().getSimpleName());
            Preconditions.checkArgument(protoTransform.getSpec().getUrn().equals("urn:beam:transform:teststream:v1"));
            return new RawTestStream(RunnerApi.TestStreamPayload.parseFrom(protoTransform.getSpec().getPayload()), rehydratedComponents);
        }

        private <T> RunnerApi.FunctionSpec translateTyped(TestStream<T> testStream, SdkComponents components) throws IOException {
            return RunnerApi.FunctionSpec.newBuilder().setUrn("urn:beam:transform:teststream:v1").setPayload(TestStreamTranslation.payloadForTestStream(testStream, components).toByteString()).build();
        }

        @AutoService(value=TransformPayloadTranslatorRegistrar.class)
        public static class Registrar
        implements TransformPayloadTranslatorRegistrar {
            @Override
            public Map<? extends Class<? extends PTransform>, ? extends PTransformTranslation.TransformPayloadTranslator> getTransformPayloadTranslators() {
                return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
            }

            @Override
            public Map<String, ? extends PTransformTranslation.TransformPayloadTranslator> getTransformRehydrators() {
                return Collections.singletonMap("urn:beam:transform:teststream:v1", new TestStreamTranslator());
            }
        }
    }

    @VisibleForTesting
    static class RawTestStream<T>
    extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>>
    implements TestStreamLike {
        private final transient RehydratedComponents rehydratedComponents;
        private final RunnerApi.TestStreamPayload payload;
        private final Coder<T> valueCoder;
        private final RunnerApi.FunctionSpec spec;

        public RawTestStream(RunnerApi.TestStreamPayload payload, RehydratedComponents rehydratedComponents) {
            this.payload = payload;
            this.spec = RunnerApi.FunctionSpec.newBuilder().setUrn("urn:beam:transform:teststream:v1").setPayload(payload.toByteString()).build();
            this.rehydratedComponents = rehydratedComponents;
            try {
                this.valueCoder = rehydratedComponents.getCoder(payload.getCoderId());
            }
            catch (IOException exc) {
                throw new IllegalArgumentException(String.format("Failure extracting coder with id '%s' for %s", payload.getCoderId(), TestStream.class.getSimpleName()), exc);
            }
        }

        @Override
        public String getUrn() {
            return "urn:beam:transform:teststream:v1";
        }

        @Override
        @Nonnull
        public RunnerApi.FunctionSpec getSpec() {
            return this.spec;
        }

        @Override
        public RunnerApi.FunctionSpec migrate(SdkComponents components) throws IOException {
            return RunnerApi.FunctionSpec.newBuilder().setUrn("urn:beam:transform:teststream:v1").setPayload(TestStreamTranslation.payloadForTestStreamLike(this, components).toByteString()).build();
        }

        public Coder<T> getValueCoder() {
            return this.valueCoder;
        }

        @Override
        public List<RunnerApi.TestStreamPayload.Event> getEvents() {
            return this.payload.getEventsList();
        }
    }

    private static interface TestStreamLike {
        public Coder<?> getValueCoder();

        public List<RunnerApi.TestStreamPayload.Event> getEvents();
    }
}

