package org.apache.camel.component.dataset;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.Service;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.EventDrivenPollingConsumer;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/camel/component/dataset/DataSetEndpoint.class */
public class DataSetEndpoint extends MockEndpoint implements Service {
    private static final transient Log LOG = LogFactory.getLog(DataSetEndpoint.class);
    private DataSet dataSet;
    private AtomicInteger receivedCounter;
    private long produceDelay;
    private long consumeDelay;
    private long startTime;

    public DataSetEndpoint(String str, Component component, DataSet dataSet) {
        super(str, component);
        this.receivedCounter = new AtomicInteger();
        this.produceDelay = -1L;
        this.consumeDelay = -1L;
        this.dataSet = dataSet;
    }

    public static void assertEquals(String str, Object obj, Object obj2, Exchange exchange) {
        if (!ObjectHelper.equal(obj, obj2)) {
            throw new AssertionError(str + " does not match. Expected: " + obj + " but was: " + obj2 + " on  " + exchange + " with headers: " + exchange.getIn().getHeaders());
        }
    }

    @Override // org.apache.camel.impl.DefaultEndpoint, org.apache.camel.Endpoint
    public PollingConsumer<Exchange> createPollingConsumer() throws Exception {
        return new EventDrivenPollingConsumer(this);
    }

    @Override // org.apache.camel.component.mock.MockEndpoint, org.apache.camel.Endpoint
    public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
        return new DataSetConsumer(this, processor);
    }

    @Override // org.apache.camel.component.mock.MockEndpoint
    public void reset() {
        super.reset();
        this.receivedCounter.set(0);
    }

    @Override // org.apache.camel.component.mock.MockEndpoint
    public int getReceivedCounter() {
        return this.receivedCounter.get();
    }

    public Exchange createExchange(long j) throws Exception {
        Exchange createExchange = createExchange();
        getDataSet().populateMessage(createExchange, j);
        createExchange.getIn().setHeader(DataSet.INDEX_HEADER, Long.valueOf(j));
        return createExchange;
    }

    @Override // org.apache.camel.component.mock.MockEndpoint
    protected void waitForCompleteLatch() throws InterruptedException {
        setResultWaitTime(getDataSet().getSize() * 4000);
        super.waitForCompleteLatch();
    }

    public DataSet getDataSet() {
        return this.dataSet;
    }

    public void setDataSet(DataSet dataSet) {
        this.dataSet = dataSet;
    }

    public long getConsumeDelay() {
        return this.consumeDelay;
    }

    public void setConsumeDelay(long j) {
        this.consumeDelay = j;
    }

    public long getProduceDelay() {
        return this.produceDelay;
    }

    public void setProduceDelay(long j) {
        this.produceDelay = j;
    }

    @Override // org.apache.camel.component.mock.MockEndpoint
    protected void performAssertions(Exchange exchange) throws Exception {
        if (this.startTime == 0) {
            this.startTime = System.currentTimeMillis();
        }
        int incrementAndGet = this.receivedCounter.incrementAndGet();
        long j = incrementAndGet - 1;
        Exchange createExchange = createExchange(j);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received message: " + j + " = " + exchange);
        }
        assertMessageExpected(j, createExchange, exchange);
        if (this.consumeDelay > 0) {
            Thread.sleep(this.consumeDelay);
        }
        if (incrementAndGet % getDataSet().getReportCount() == 0) {
            reportProgress(exchange, incrementAndGet);
        }
    }

    protected void reportProgress(Exchange exchange, int i) {
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - this.startTime;
        this.startTime = currentTimeMillis;
        LOG.info("Received: " + i + " messages so far. Last group took: " + j + " millis");
    }

    protected void assertMessageExpected(long j, Exchange exchange, Exchange exchange2) throws Exception {
        assertEquals("Header: camelDataSetIndex", Long.valueOf(j), Long.valueOf(((Long) ExchangeHelper.getMandatoryHeader(exchange2, DataSet.INDEX_HEADER, Long.class)).longValue()), exchange2);
        getDataSet().assertMessageExpected(this, exchange, exchange2, j);
    }

    @Override // org.apache.camel.Service
    public void start() throws Exception {
        expectedMessageCount((int) getDataSet().getSize());
    }

    @Override // org.apache.camel.Service
    public void stop() throws Exception {
    }
}
