/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.connectors;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration;
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer;
import com.amazonaws.services.kinesis.connectors.interfaces.IBuffer;
import com.amazonaws.services.kinesis.connectors.interfaces.ICollectionTransformer;
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter;
import com.amazonaws.services.kinesis.connectors.interfaces.IFilter;
import com.amazonaws.services.kinesis.connectors.interfaces.ITransformer;
import com.amazonaws.services.kinesis.connectors.interfaces.ITransformerBase;
import com.amazonaws.services.kinesis.model.Record;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class KinesisConnectorRecordProcessor<T, U>
implements IRecordProcessor {
    private final IEmitter<U> emitter;
    private final ITransformerBase<T, U> transformer;
    private final IFilter<T> filter;
    private final IBuffer<T> buffer;
    private final int retryLimit;
    private final long backoffInterval;
    private boolean isShutdown = false;
    private static final Log LOG = LogFactory.getLog(KinesisConnectorRecordProcessor.class);
    private String shardId;

    public KinesisConnectorRecordProcessor(IBuffer<T> buffer, IFilter<T> filter, IEmitter<U> emitter, ITransformerBase<T, U> transformer, KinesisConnectorConfiguration configuration) {
        if (buffer == null || filter == null || emitter == null || transformer == null) {
            throw new IllegalArgumentException("buffer, filter, emitter, and transformer must not be null");
        }
        this.buffer = buffer;
        this.filter = filter;
        this.emitter = emitter;
        this.transformer = transformer;
        this.retryLimit = configuration.RETRY_LIMIT <= 0 ? 1 : configuration.RETRY_LIMIT;
        this.backoffInterval = configuration.BACKOFF_INTERVAL;
    }

    public void initialize(String shardId) {
        this.shardId = shardId;
    }

    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        if (this.isShutdown) {
            LOG.warn((Object)("processRecords called on shutdown record processor for shardId: " + this.shardId));
            return;
        }
        if (this.shardId == null) {
            throw new IllegalStateException("Record processor not initialized");
        }
        for (Record record : records) {
            try {
                if (this.transformer instanceof ITransformer) {
                    ITransformer singleTransformer = (ITransformer)this.transformer;
                    this.filterAndBufferRecord(singleTransformer.toClass(record), record);
                    continue;
                }
                if (this.transformer instanceof ICollectionTransformer) {
                    ICollectionTransformer listTransformer = (ICollectionTransformer)this.transformer;
                    Collection transformedRecords = listTransformer.toClass(record);
                    for (Object transformedRecord : transformedRecords) {
                        this.filterAndBufferRecord(transformedRecord, record);
                    }
                    continue;
                }
                throw new RuntimeException("Transformer must implement ITransformer or ICollectionTransformer");
            }
            catch (IOException e) {
                LOG.error((Object)e);
            }
        }
        if (this.buffer.shouldFlush()) {
            List<U> emitItems = this.transformToOutput(this.buffer.getRecords());
            this.emit(checkpointer, emitItems);
        }
    }

    private void filterAndBufferRecord(T transformedRecord, Record record) {
        if (this.filter.keepRecord(transformedRecord)) {
            this.buffer.consumeRecord(transformedRecord, record.getData().array().length, record.getSequenceNumber());
        }
    }

    private List<U> transformToOutput(List<T> items) {
        ArrayList<U> emitItems = new ArrayList<U>();
        for (T item : items) {
            try {
                emitItems.add(this.transformer.fromClass(item));
            }
            catch (IOException e) {
                LOG.error((Object)("Failed to transform record " + item + " to output type"), (Throwable)e);
            }
        }
        return emitItems;
    }

    private void emit(IRecordProcessorCheckpointer checkpointer, List<U> emitItems) {
        List<U> unprocessed = new ArrayList<U>(emitItems);
        try {
            for (int numTries = 0; numTries < this.retryLimit && !(unprocessed = this.emitter.emit(new UnmodifiableBuffer<U>(this.buffer, unprocessed))).isEmpty(); ++numTries) {
                try {
                    Thread.sleep(this.backoffInterval);
                    continue;
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            if (!unprocessed.isEmpty()) {
                this.emitter.fail(unprocessed);
            }
            String lastSequenceNumberProcessed = this.buffer.getLastSequenceNumber();
            this.buffer.clear();
            if (lastSequenceNumberProcessed != null) {
                checkpointer.checkpoint(lastSequenceNumberProcessed);
            }
        }
        catch (InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException | IOException e) {
            LOG.error((Object)e);
            this.emitter.fail(unprocessed);
        }
    }

    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        LOG.info((Object)("Shutting down record processor with shardId: " + this.shardId + " with reason " + reason));
        if (this.isShutdown) {
            LOG.warn((Object)("Record processor for shardId: " + this.shardId + " has been shutdown multiple times."));
            return;
        }
        switch (reason) {
            case TERMINATE: {
                this.emit(checkpointer, this.transformToOutput(this.buffer.getRecords()));
                try {
                    checkpointer.checkpoint();
                }
                catch (InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException e) {
                    LOG.error((Object)e);
                }
                break;
            }
            case ZOMBIE: {
                break;
            }
            default: {
                throw new IllegalStateException("invalid shutdown reason");
            }
        }
        this.emitter.shutdown();
        this.isShutdown = true;
    }
}

