package com.espertech.esperio;

import com.espertech.esper.adapter.AdapterState;
import com.espertech.esper.adapter.AdapterStateManager;
import com.espertech.esper.client.EPException;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.time.CurrentTimeEvent;
import com.espertech.esper.core.EPServiceProviderSPI;
import com.espertech.esper.core.EPStatementHandle;
import com.espertech.esper.core.EPStatementHandleCallback;
import com.espertech.esper.core.ExtensionServicesContext;
import com.espertech.esper.core.StatementFilterVersion;
import com.espertech.esper.schedule.ScheduleHandleCallback;
import com.espertech.esper.schedule.ScheduleSlot;
import com.espertech.esper.schedule.SchedulingService;
import com.espertech.esper.util.ExecutionPathDebugLog;
import com.espertech.esper.util.ManagedLockImpl;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:production/esperio-csv/com/espertech/esperio/AbstractCoordinatedAdapter.class */
public abstract class AbstractCoordinatedAdapter implements CoordinatedAdapter {
    private static final Log log = LogFactory.getLog(AbstractCoordinatedAdapter.class);
    protected ScheduleSlot scheduleSlot;
    private EPServiceProvider epService;
    private EPRuntime runtime;
    private SchedulingService schedulingService;
    private boolean usingEngineThread;
    private boolean usingExternalTimer;
    private long startTime;
    private AbstractSender sender;
    protected final AdapterStateManager stateManager = new AdapterStateManager();
    protected final SortedSet<SendableEvent> eventsToSend = new TreeSet(new SendableEventComparator());
    private long currentTime = 0;
    private long lastEventTime = 0;

    public AbstractCoordinatedAdapter(EPServiceProvider ePServiceProvider, boolean z, boolean z2) {
        this.usingEngineThread = z;
        this.usingExternalTimer = z2;
        setSender(new DirectSender());
        if (ePServiceProvider == null) {
            return;
        }
        if (!(ePServiceProvider instanceof EPServiceProviderSPI)) {
            throw new IllegalArgumentException("Invalid epService provided");
        }
        this.epService = ePServiceProvider;
        this.runtime = ePServiceProvider.getEPRuntime();
        this.schedulingService = ((EPServiceProviderSPI) ePServiceProvider).getSchedulingService();
    }

    @Override // com.espertech.esper.adapter.Adapter
    public AdapterState getState() {
        return this.stateManager.getState();
    }

    @Override // com.espertech.esper.adapter.Adapter
    public void start() throws EPException {
        if (ExecutionPathDebugLog.isDebugEnabled && log.isDebugEnabled()) {
            log.debug(".start");
        }
        if (this.runtime == null) {
            throw new EPException("Attempting to start an Adapter that hasn't had the epService provided");
        }
        this.startTime = getCurrentTime();
        if (ExecutionPathDebugLog.isDebugEnabled && log.isDebugEnabled()) {
            log.debug(".start startTime==" + this.startTime);
        }
        this.stateManager.start();
        this.sender.setRuntime(this.runtime);
        continueSendingEvents();
    }

    @Override // com.espertech.esper.adapter.Adapter
    public void pause() throws EPException {
        this.stateManager.pause();
    }

    @Override // com.espertech.esper.adapter.Adapter
    public void resume() throws EPException {
        this.stateManager.resume();
        continueSendingEvents();
    }

    @Override // com.espertech.esper.adapter.Adapter
    public void destroy() throws EPException {
        if (this.sender != null) {
            this.sender.onFinish();
        }
        this.stateManager.destroy();
        close();
    }

    @Override // com.espertech.esper.adapter.Adapter
    public void stop() throws EPException {
        if (ExecutionPathDebugLog.isDebugEnabled && log.isDebugEnabled()) {
            log.debug(".stop");
        }
        this.stateManager.stop();
        this.eventsToSend.clear();
        this.currentTime = 0L;
        reset();
    }

    @Override // com.espertech.esperio.CoordinatedAdapter
    public void disallowStateTransitions() {
        this.stateManager.disallowStateTransitions();
    }

    @Override // com.espertech.esperio.CoordinatedAdapter
    public void setUsingEngineThread(boolean z) {
        this.usingEngineThread = z;
    }

    @Override // com.espertech.esperio.CoordinatedAdapter
    public void setUsingExternalTimer(boolean z) {
        this.usingExternalTimer = z;
    }

    @Override // com.espertech.esperio.CoordinatedAdapter
    public void setScheduleSlot(ScheduleSlot scheduleSlot) {
        this.scheduleSlot = scheduleSlot;
    }

    @Override // com.espertech.esperio.CoordinatedAdapter
    public void setEPService(EPServiceProvider ePServiceProvider) {
        if (ePServiceProvider == null) {
            throw new NullPointerException("epService cannot be null");
        }
        if (!(ePServiceProvider instanceof EPServiceProviderSPI)) {
            throw new IllegalArgumentException("Invalid type of EPServiceProvider");
        }
        EPServiceProviderSPI ePServiceProviderSPI = (EPServiceProviderSPI) ePServiceProvider;
        this.runtime = ePServiceProviderSPI.getEPRuntime();
        this.schedulingService = ePServiceProviderSPI.getSchedulingService();
        this.sender.setRuntime(this.runtime);
    }

    protected abstract void close();

    protected abstract void replaceFirstEventToSend();

    protected abstract void reset();

    /* JADX INFO: Access modifiers changed from: private */
    public void continueSendingEvents() {
        boolean z = true;
        while (true) {
            boolean z2 = z;
            if (this.stateManager.getState() != AdapterState.STARTED || !z2) {
                return;
            }
            this.currentTime = getCurrentTime();
            if (ExecutionPathDebugLog.isDebugEnabled && log.isDebugEnabled()) {
                log.debug(".continueSendingEvents currentTime==" + this.currentTime);
            }
            fillEventsToSend();
            sendSoonestEvents();
            z = waitToSendEvents();
        }
    }

    private boolean waitToSendEvents() {
        if (this.usingExternalTimer) {
            return false;
        }
        if (this.usingEngineThread) {
            scheduleNextCallback();
            return false;
        }
        try {
            Thread.sleep(this.eventsToSend.isEmpty() ? 100L : this.eventsToSend.first().getSendTime() - (this.currentTime - this.startTime));
            return true;
        } catch (InterruptedException e) {
            throw new EPException(e);
        }
    }

    private long getCurrentTime() {
        return this.usingEngineThread ? this.schedulingService.getTime() : System.currentTimeMillis();
    }

    private void fillEventsToSend() {
        SendableEvent read;
        if (!this.eventsToSend.isEmpty() || (read = read()) == null) {
            return;
        }
        this.eventsToSend.add(read);
    }

    private void sendSoonestEvents() {
        if (!this.usingExternalTimer) {
            while (!this.eventsToSend.isEmpty() && this.eventsToSend.first().getSendTime() <= this.currentTime - this.startTime) {
                sendFirstEvent();
            }
            return;
        }
        while (!this.eventsToSend.isEmpty()) {
            long sendTime = this.eventsToSend.first().getSendTime();
            if (sendTime > this.lastEventTime) {
                this.sender.sendEvent(null, new CurrentTimeEvent(this.lastEventTime));
                this.lastEventTime = sendTime;
            }
            sendFirstEvent();
        }
        this.sender.sendEvent(null, new CurrentTimeEvent(this.lastEventTime));
    }

    private void sendFirstEvent() {
        if (ExecutionPathDebugLog.isDebugEnabled && log.isDebugEnabled()) {
            log.debug(".sendFirstEvent currentTime==" + this.currentTime);
            log.debug(".sendFirstEvent sending event " + this.eventsToSend.first() + ", its sendTime==" + this.eventsToSend.first().getSendTime());
        }
        this.sender.setRuntime(this.runtime);
        this.eventsToSend.first().send(this.sender);
        replaceFirstEventToSend();
    }

    private void scheduleNextCallback() {
        EPStatementHandleCallback ePStatementHandleCallback = new EPStatementHandleCallback(new EPStatementHandle("AbstractCoordinatedAdapter", "AbstractCoordinatedAdapter", null, new ManagedLockImpl("CSV"), "AbstractCoordinatedAdapter", false, ((EPServiceProviderSPI) this.epService).getMetricReportingService().getStatementHandle("AbstractCoordinatedAdapter", "AbstractCoordinatedAdapter"), 0, false, new StatementFilterVersion()), new ScheduleHandleCallback() { // from class: com.espertech.esperio.AbstractCoordinatedAdapter.1
            @Override // com.espertech.esper.schedule.ScheduleHandleCallback
            public void scheduledTrigger(ExtensionServicesContext extensionServicesContext) {
                AbstractCoordinatedAdapter.this.continueSendingEvents();
            }
        });
        if (this.eventsToSend.isEmpty()) {
            if (ExecutionPathDebugLog.isDebugEnabled && log.isDebugEnabled()) {
                log.debug(".scheduleNextCallback no events to send, scheduling callback in 100 ms");
            }
            this.schedulingService.add(100L, ePStatementHandleCallback, new ScheduleSlot(0, 0));
            return;
        }
        long sendTime = this.eventsToSend.first().getSendTime() - (this.currentTime - this.startTime);
        ScheduleSlot scheduleSlot = this.eventsToSend.first().getScheduleSlot();
        if (ExecutionPathDebugLog.isDebugEnabled && log.isDebugEnabled()) {
            log.debug(".scheduleNextCallback schedulingCallback in " + sendTime + " milliseconds");
        }
        this.schedulingService.add(sendTime, ePStatementHandleCallback, scheduleSlot);
    }

    public EPRuntime getRuntime() {
        return this.runtime;
    }

    public void setSender(AbstractSender abstractSender) {
        this.sender = abstractSender;
        this.sender.setRuntime(this.runtime);
    }
}
