/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.containers.exporter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule;
import io.camunda.zeebe.protocol.record.Record;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
import org.apache.hc.core5.http.nio.AsyncResponseProducer;
import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apiguardian.api.API;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@API(status=API.Status.INTERNAL)
final class RecordHandler
implements AsyncServerRequestHandler<Message<HttpRequest, byte[]>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RecordHandler.class);
    private static final ObjectMapper MAPPER = new ObjectMapper().registerModule((Module)new ZeebeProtocolModule());
    private final Consumer<Record<?>> recordConsumer;
    private final boolean autoAcknowledge;
    private final Map<Integer, Long> positions = new ConcurrentHashMap<Integer, Long>();

    RecordHandler(Consumer<Record<?>> recordConsumer, boolean autoAcknowledge) {
        this.recordConsumer = Objects.requireNonNull(recordConsumer, "must specify a record consumer");
        this.autoAcknowledge = autoAcknowledge;
    }

    void acknowledge(int partitionId, long position) {
        this.positions.merge(partitionId, position, Math::max);
    }

    public AsyncRequestConsumer<Message<HttpRequest, byte[]>> prepare(HttpRequest request, EntityDetails entityDetails, HttpContext context) {
        return new BasicRequestConsumer((AsyncEntityConsumer)new BasicAsyncEntityConsumer());
    }

    public void handle(Message<HttpRequest, byte[]> requestObject, AsyncServerRequestHandler.ResponseTrigger responseTrigger, HttpContext context) throws HttpException, IOException {
        List records;
        byte[] requestBody = (byte[])requestObject.getBody();
        if (requestBody == null || requestBody.length == 0) {
            BasicHttpResponse response = new BasicHttpResponse(400, "must send a list of records as body");
            responseTrigger.submitResponse((AsyncResponseProducer)new BasicResponseProducer((HttpResponse)response), context);
            return;
        }
        try {
            records = (List)MAPPER.readValue(requestBody, new TypeReference<List<Record<?>>>(){});
        }
        catch (IOException e) {
            BasicHttpResponse response = new BasicHttpResponse(400, "failed to deserialize records, see receiver logs for more");
            responseTrigger.submitResponse((AsyncResponseProducer)new BasicResponseProducer((HttpResponse)response), context);
            LOGGER.warn("Failed to deserialize exported records", (Throwable)e);
            return;
        }
        if (records.isEmpty()) {
            BasicHttpResponse response = new BasicHttpResponse(204, "no records given");
            responseTrigger.submitResponse((AsyncResponseProducer)new BasicResponseProducer((HttpResponse)response), context);
            return;
        }
        for (Record record : records) {
            this.recordConsumer.accept(record);
            if (!this.autoAcknowledge) continue;
            this.acknowledge(record.getPartitionId(), record.getPosition());
        }
        int partitionId = ((Record)records.get(0)).getPartitionId();
        AsyncResponseProducer responseProducer = this.createSuccessfulResponse(partitionId);
        responseTrigger.submitResponse(responseProducer, context);
    }

    private AsyncResponseProducer createSuccessfulResponse(int partitionId) throws JsonProcessingException {
        Long position = this.positions.get(partitionId);
        if (position == null) {
            BasicHttpResponse response = new BasicHttpResponse(204, "no acknowledged position for partition " + partitionId);
            return new BasicResponseProducer((HttpResponse)response);
        }
        BasicHttpResponse response = new BasicHttpResponse(200);
        response.setHeader("Content-Type", (Object)"application/json");
        byte[] responseBody = MAPPER.writeValueAsBytes(Collections.singletonMap("position", position));
        return new BasicResponseProducer((HttpResponse)response, (AsyncEntityProducer)new BasicAsyncEntityProducer(responseBody));
    }
}

