/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.writer;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
import org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.source.extractor.CheckpointableWatermark;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.MetadataUpdateControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.writer.CloseOnFlushWriterWrapper;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.DataWriterBuilder;
import org.apache.gobblin.writer.MultiWriterWatermarkTracker;
import org.apache.gobblin.writer.PartitionAwareDataWriterBuilder;
import org.apache.gobblin.writer.WatermarkAwareWriter;
import org.apache.gobblin.writer.WriterWrapper;
import org.apache.gobblin.writer.partitioner.WriterPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionedDataWriter<S, D>
extends WriterWrapper<D>
implements FinalState,
SpeculativeAttemptAwareConstruct,
WatermarkAwareWriter<D> {
    private static final Logger log = LoggerFactory.getLogger(PartitionedDataWriter.class);
    private static final GenericRecord NON_PARTITIONED_WRITER_KEY = new GenericData.Record((Schema)SchemaBuilder.record((String)"Dummy").fields().endRecord());
    private int writerIdSuffix = 0;
    private final String baseWriterId;
    private final State state;
    private final int branchId;
    private final Optional<WriterPartitioner> partitioner;
    private final LoadingCache<GenericRecord, DataWriter<D>> partitionWriters;
    private final Optional<PartitionAwareDataWriterBuilder> builder;
    private final DataWriterBuilder writerBuilder;
    private final boolean shouldPartition;
    private final Closer closer;
    private final ControlMessageHandler controlMessageHandler;
    private boolean isSpeculativeAttemptSafe;
    private boolean isWatermarkCapable;

    public PartitionedDataWriter(final DataWriterBuilder<S, D> builder, final State state) throws IOException {
        this.state = state;
        this.branchId = builder.branch;
        this.isSpeculativeAttemptSafe = true;
        this.isWatermarkCapable = true;
        this.baseWriterId = builder.getWriterId();
        this.closer = Closer.create();
        this.writerBuilder = builder;
        this.controlMessageHandler = new PartitionDataWriterMessageHandler();
        this.partitionWriters = CacheBuilder.newBuilder().build(new CacheLoader<GenericRecord, DataWriter<D>>(){

            public DataWriter<D> load(final GenericRecord key) throws Exception {
                return (DataWriter)PartitionedDataWriter.this.closer.register((Closeable)new InstrumentedPartitionedDataWriterDecorator(new CloseOnFlushWriterWrapper(new Supplier<DataWriter<D>>(){

                    @Override
                    public DataWriter<D> get() {
                        try {
                            return PartitionedDataWriter.this.createPartitionWriter(key);
                        }
                        catch (IOException e) {
                            throw new RuntimeException("Error creating writer", e);
                        }
                    }
                }, state), state, key));
            }
        });
        if (state.contains("writer.partitioner.class")) {
            Preconditions.checkArgument((boolean)(builder instanceof PartitionAwareDataWriterBuilder), (Object)String.format("%s was specified but the writer %s does not support partitioning.", "writer.partitioner.class", builder.getClass().getCanonicalName()));
            try {
                this.shouldPartition = true;
                this.builder = Optional.of(PartitionAwareDataWriterBuilder.class.cast(builder));
                this.partitioner = Optional.of(WriterPartitioner.class.cast(ConstructorUtils.invokeConstructor(Class.forName(state.getProp("writer.partitioner.class")), (Object[])new Object[]{state, builder.getBranches(), builder.getBranch()})));
                Preconditions.checkArgument((boolean)((PartitionAwareDataWriterBuilder)this.builder.get()).validatePartitionSchema(((WriterPartitioner)this.partitioner.get()).partitionSchema()), (Object)String.format("Writer %s does not support schema from partitioner %s", builder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
            }
            catch (ReflectiveOperationException roe) {
                throw new IOException(roe);
            }
        } else {
            this.shouldPartition = false;
            CloseOnFlushWriterWrapper closeOnFlushWriterWrapper = new CloseOnFlushWriterWrapper(new Supplier<DataWriter<D>>(){

                @Override
                public DataWriter<D> get() {
                    try {
                        return builder.withWriterId(PartitionedDataWriter.this.baseWriterId + "_" + PartitionedDataWriter.this.writerIdSuffix++).build();
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Error creating writer", e);
                    }
                }
            }, state);
            DataWriter dataWriter = (DataWriter)closeOnFlushWriterWrapper.getDecoratedObject();
            InstrumentedDataWriterDecorator writer = (InstrumentedDataWriterDecorator)this.closer.register((Closeable)new InstrumentedDataWriterDecorator(closeOnFlushWriterWrapper, state));
            this.isSpeculativeAttemptSafe = this.isDataWriterForPartitionSafe(dataWriter);
            this.isWatermarkCapable = this.isDataWriterWatermarkCapable(dataWriter);
            this.partitionWriters.put((Object)NON_PARTITIONED_WRITER_KEY, (Object)writer);
            this.partitioner = Optional.absent();
            this.builder = Optional.absent();
        }
    }

    private boolean isDataWriterWatermarkCapable(DataWriter<D> dataWriter) {
        return dataWriter instanceof WatermarkAwareWriter && ((WatermarkAwareWriter)dataWriter).isWatermarkCapable();
    }

    public void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException {
        try {
            DataWriter<Object> writer = this.getDataWriterForRecord(recordEnvelope.getRecord());
            writer.writeEnvelope(recordEnvelope);
        }
        catch (ExecutionException ee) {
            throw new IOException(ee);
        }
    }

    private DataWriter<D> getDataWriterForRecord(D record) throws ExecutionException {
        GenericRecord partition = this.shouldPartition ? ((WriterPartitioner)this.partitioner.get()).partitionForRecord(record) : NON_PARTITIONED_WRITER_KEY;
        return (DataWriter)this.partitionWriters.get((Object)partition);
    }

    public void commit() throws IOException {
        int writersCommitted = 0;
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            try {
                ((DataWriter)entry.getValue()).commit();
                ++writersCommitted;
            }
            catch (Throwable throwable) {
                log.error(String.format("Failed to commit writer for partition %s.", entry.getKey()), throwable);
            }
        }
        if (writersCommitted < this.partitionWriters.asMap().size()) {
            throw new IOException("Failed to commit all writers.");
        }
    }

    public void cleanup() throws IOException {
        int writersCleanedUp = 0;
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            try {
                ((DataWriter)entry.getValue()).cleanup();
                ++writersCleanedUp;
            }
            catch (Throwable throwable) {
                log.error(String.format("Failed to cleanup writer for partition %s.", entry.getKey()));
            }
        }
        if (writersCleanedUp < this.partitionWriters.asMap().size()) {
            throw new IOException("Failed to clean up all writers.");
        }
    }

    public long recordsWritten() {
        long totalRecords = 0L;
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            totalRecords += ((DataWriter)entry.getValue()).recordsWritten();
        }
        return totalRecords;
    }

    public long bytesWritten() throws IOException {
        long totalBytes = 0L;
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            totalBytes += ((DataWriter)entry.getValue()).bytesWritten();
        }
        return totalBytes;
    }

    public void close() throws IOException {
        try {
            this.serializePartitionInfoToState();
        }
        finally {
            this.closer.close();
        }
    }

    private DataWriter<D> createPartitionWriter(GenericRecord partition) throws IOException {
        if (!this.builder.isPresent()) {
            throw new IOException("Writer builder not found. This is an error in the code.");
        }
        DataWriter dataWriter = ((PartitionAwareDataWriterBuilder)this.builder.get()).forPartition(partition).withWriterId(this.baseWriterId + "_" + this.writerIdSuffix++).build();
        this.isSpeculativeAttemptSafe = this.isSpeculativeAttemptSafe && this.isDataWriterForPartitionSafe(dataWriter);
        this.isWatermarkCapable = this.isWatermarkCapable && this.isDataWriterWatermarkCapable(dataWriter);
        return dataWriter;
    }

    public State getFinalState() {
        State state = new State();
        try {
            for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
                if (!(entry.getValue() instanceof FinalState)) continue;
                State partitionFinalState = ((FinalState)entry.getValue()).getFinalState();
                if (this.shouldPartition) {
                    for (String key : partitionFinalState.getPropertyNames()) {
                        partitionFinalState.setProp(key + "_" + AvroUtils.serializeAsPath((GenericRecord)((GenericRecord)entry.getKey()), (boolean)false, (boolean)true), (Object)partitionFinalState.getProp(key));
                    }
                }
                state.addAll(partitionFinalState);
            }
            state.setProp("RecordsWritten", (Object)this.recordsWritten());
            state.setProp("BytesWritten", (Object)this.bytesWritten());
        }
        catch (Exception exception) {
            log.warn("Failed to get final state." + exception.getMessage());
        }
        return state;
    }

    @Override
    public boolean isSpeculativeAttemptSafe() {
        return this.isSpeculativeAttemptSafe;
    }

    private boolean isDataWriterForPartitionSafe(DataWriter dataWriter) {
        return dataWriter instanceof SpeculativeAttemptAwareConstruct && ((SpeculativeAttemptAwareConstruct)dataWriter).isSpeculativeAttemptSafe();
    }

    public boolean isWatermarkCapable() {
        return this.isWatermarkCapable;
    }

    public Map<String, CheckpointableWatermark> getCommittableWatermark() {
        MultiWriterWatermarkTracker watermarkTracker = new MultiWriterWatermarkTracker();
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            Map unacknowledgedWatermark;
            if (!(entry.getValue() instanceof WatermarkAwareWriter)) continue;
            Map commitableWatermarks = ((WatermarkAwareWriter)entry.getValue()).getCommittableWatermark();
            if (!commitableWatermarks.isEmpty()) {
                watermarkTracker.committedWatermarks(commitableWatermarks);
            }
            if ((unacknowledgedWatermark = ((WatermarkAwareWriter)entry.getValue()).getUnacknowledgedWatermark()).isEmpty()) continue;
            watermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
        }
        return watermarkTracker.getAllCommitableWatermarks();
    }

    public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
        MultiWriterWatermarkTracker watermarkTracker = new MultiWriterWatermarkTracker();
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            Map unacknowledgedWatermark = ((WatermarkAwareWriter)entry.getValue()).getUnacknowledgedWatermark();
            if (unacknowledgedWatermark.isEmpty()) continue;
            watermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
        }
        return watermarkTracker.getAllUnacknowledgedWatermarks();
    }

    public ControlMessageHandler getMessageHandler() {
        return this.controlMessageHandler;
    }

    private static String getPartitionsKey(int branchId) {
        return String.format("writer.%d.partitions", branchId);
    }

    private void serializePartitionInfoToState() {
        ArrayList<PartitionDescriptor> descriptors = new ArrayList<PartitionDescriptor>();
        for (DataWriter writer : this.partitionWriters.asMap().values()) {
            Descriptor descriptor = writer.getDataDescriptor();
            if (null == descriptor) {
                log.warn("Drop partition info as writer {} returns a null PartitionDescriptor", (Object)writer.toString());
                continue;
            }
            if (!(descriptor instanceof PartitionDescriptor)) {
                log.warn("Drop partition info as writer {} does not return a PartitionDescriptor", (Object)writer.toString());
                continue;
            }
            descriptors.add((PartitionDescriptor)descriptor);
        }
        if (descriptors.size() > 0) {
            this.state.setProp(PartitionedDataWriter.getPartitionsKey(this.branchId), (Object)PartitionDescriptor.toPartitionJsonList(descriptors));
        } else {
            log.info("Partitions info not available. Will not serialize partitions");
        }
    }

    public static List<PartitionDescriptor> getPartitionInfoAndClean(State state, int branchId) {
        String partitionsKey = PartitionedDataWriter.getPartitionsKey(branchId);
        String json = state.getProp(partitionsKey);
        if (Strings.isNullOrEmpty((String)json)) {
            return Lists.newArrayList();
        }
        state.removeProp(partitionsKey);
        return PartitionDescriptor.fromPartitionJsonList((String)json);
    }

    private class PartitionDataWriterMessageHandler
    implements ControlMessageHandler {
        private PartitionDataWriterMessageHandler() {
        }

        public void handleMessage(ControlMessage message) {
            StreamEntity.ForkCloner cloner = message.forkCloner();
            if (message instanceof MetadataUpdateControlMessage) {
                PartitionedDataWriter.this.writerBuilder.withSchema(((MetadataUpdateControlMessage)message).getGlobalMetadata().getSchema());
            }
            for (DataWriter writer : PartitionedDataWriter.this.partitionWriters.asMap().values()) {
                ControlMessage cloned = (ControlMessage)cloner.getClone();
                writer.getMessageHandler().handleMessage(cloned);
            }
            cloner.close();
        }
    }
}

