/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.connectors.dynamodb;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.PutRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration;
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer;
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DynamoDBEmitter
implements IEmitter<Map<String, AttributeValue>> {
    private static final Log LOG = LogFactory.getLog(DynamoDBEmitter.class);
    protected final String dynamoDBEndpoint;
    protected final String dynamoDBTableName;
    protected final AmazonDynamoDBClient dynamoDBClient;

    public DynamoDBEmitter(KinesisConnectorConfiguration configuration) {
        this.dynamoDBEndpoint = configuration.DYNAMODB_ENDPOINT;
        this.dynamoDBTableName = configuration.DYNAMODB_DATA_TABLE_NAME;
        this.dynamoDBClient = new AmazonDynamoDBClient(configuration.AWS_CREDENTIALS_PROVIDER);
        this.dynamoDBClient.setEndpoint(this.dynamoDBEndpoint);
    }

    @Override
    public List<Map<String, AttributeValue>> emit(UnmodifiableBuffer<Map<String, AttributeValue>> buffer) throws IOException {
        List<Map<String, AttributeValue>> resultList;
        HashMap<WriteRequest, Map<String, AttributeValue>> requestMap = new HashMap<WriteRequest, Map<String, AttributeValue>>();
        ArrayList<Map<String, AttributeValue>> unproc = new ArrayList<Map<String, AttributeValue>>();
        ArrayList<WriteRequest> rList = new ArrayList<WriteRequest>();
        Set<Map<String, AttributeValue>> uniqueItems = this.uniqueItems(buffer.getRecords());
        for (Map<String, AttributeValue> item : uniqueItems) {
            WriteRequest wr = new WriteRequest().withPutRequest(new PutRequest().withItem(item));
            requestMap.put(wr, item);
            rList.add(wr);
            if (rList.size() != 16) continue;
            resultList = this.performBatchRequest(rList, requestMap);
            unproc.addAll(resultList);
            rList.clear();
        }
        resultList = this.performBatchRequest(rList, requestMap);
        unproc.addAll(resultList);
        LOG.info((Object)("Successfully emitted " + (buffer.getRecords().size() - unproc.size()) + " records into DynamoDB."));
        return unproc;
    }

    @Override
    public void fail(List<Map<String, AttributeValue>> records) {
        for (Map<String, AttributeValue> record : records) {
            LOG.error((Object)("Could not emit record: " + record));
        }
    }

    private List<Map<String, AttributeValue>> performBatchRequest(List<WriteRequest> rList, Map<WriteRequest, Map<String, AttributeValue>> requestMap) throws IOException {
        HashMap<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
        if (rList.isEmpty()) {
            return Collections.emptyList();
        }
        requestItems.put(this.dynamoDBTableName, rList);
        BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest().withRequestItems(requestItems);
        try {
            BatchWriteItemResult result = this.dynamoDBClient.batchWriteItem(batchWriteItemRequest);
            return this.unproccessedItems(result, requestMap);
        }
        catch (AmazonClientException e) {
            String message = "Amazon DynamoDB Client could not perform batch request";
            LOG.error((Object)message, (Throwable)e);
            throw new IOException(message, e);
        }
        catch (Exception e) {
            String message = "Unexpected Exception while performing batch request";
            LOG.error((Object)message, (Throwable)e);
            throw new IOException(message, e);
        }
    }

    private List<Map<String, AttributeValue>> unproccessedItems(BatchWriteItemResult result, Map<WriteRequest, Map<String, AttributeValue>> requestMap) {
        Collection items = result.getUnprocessedItems().values();
        ArrayList<Map<String, AttributeValue>> unprocessed = new ArrayList<Map<String, AttributeValue>>();
        for (List list : items) {
            for (WriteRequest request : list) {
                unprocessed.add(requestMap.get(request));
            }
        }
        return unprocessed;
    }

    public Set<Map<String, AttributeValue>> uniqueItems(List<Map<String, AttributeValue>> items) {
        return new HashSet<Map<String, AttributeValue>>(items);
    }

    @Override
    public void shutdown() {
        this.dynamoDBClient.shutdown();
    }
}

