package io.aeron.archive;

import io.aeron.Aeron;
import io.aeron.AvailableImageHandler;
import io.aeron.ChannelUri;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.CommonContext;
import io.aeron.ConcurrentPublication;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.archive.Archive;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.RecordingDescriptorDecoder;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.driver.media.UdpChannel;
import io.aeron.logbuffer.LogBufferDescriptor;
import java.io.File;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.agrona.CloseHelper;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.CountersReader;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/aeron/archive/ArchiveConductor.class */
public abstract class ArchiveConductor extends SessionWorker<Session> implements AvailableImageHandler {
    private static final int CONTROL_TERM_LENGTH = AeronArchive.Configuration.controlTermBufferLength();
    private static final int CONTROL_MTU = AeronArchive.Configuration.controlMtuLength();
    private final ChannelUriStringBuilder channelBuilder;
    private final Long2ObjectHashMap<ReplaySession> replaySessionByIdMap;
    private final Long2ObjectHashMap<RecordingSession> recordingSessionByIdMap;
    private final Map<String, Subscription> recordingSubscriptionMap;
    private final UnsafeBuffer descriptorBuffer;
    private final RecordingDescriptorDecoder recordingDescriptorDecoder;
    private final RecordingSummary recordingSummary;
    private final UnsafeBuffer tempBuffer;
    private final Aeron aeron;
    private final AgentInvoker aeronAgentInvoker;
    private final AgentInvoker driverAgentInvoker;
    private final EpochClock epochClock;
    private final CachedEpochClock cachedEpochClock;
    private final File archiveDir;
    private final FileChannel archiveDirChannel;
    private final Subscription controlSubscription;
    private final Subscription localControlSubscription;
    private final Catalog catalog;
    private final ArchiveMarkFile markFile;
    private final RecordingEventsProxy recordingEventsProxy;
    private final int maxConcurrentRecordings;
    private final int maxConcurrentReplays;
    protected final Archive.Context ctx;
    protected final ControlResponseProxy controlResponseProxy;
    protected SessionWorker<ReplaySession> replayer;
    protected SessionWorker<RecordingSession> recorder;
    private long nextControlSessionId;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ArchiveConductor(Aeron aeron, Archive.Context context) {
        super("archive-conductor", context.countedErrorHandler());
        this.channelBuilder = new ChannelUriStringBuilder();
        this.replaySessionByIdMap = new Long2ObjectHashMap<>();
        this.recordingSessionByIdMap = new Long2ObjectHashMap<>();
        this.recordingSubscriptionMap = new HashMap();
        this.descriptorBuffer = new UnsafeBuffer();
        this.recordingDescriptorDecoder = new RecordingDescriptorDecoder();
        this.recordingSummary = new RecordingSummary();
        this.tempBuffer = new UnsafeBuffer(new byte[CountersReader.METADATA_LENGTH]);
        this.cachedEpochClock = new CachedEpochClock();
        this.nextControlSessionId = ThreadLocalRandom.current().nextInt();
        this.aeron = aeron;
        this.ctx = context;
        this.aeronAgentInvoker = context.ownsAeronClient() ? aeron.conductorAgentInvoker() : null;
        this.driverAgentInvoker = context.mediaDriverAgentInvoker();
        this.epochClock = context.epochClock();
        this.archiveDir = context.archiveDir();
        this.archiveDirChannel = context.archiveDirChannel();
        this.controlResponseProxy = new ControlResponseProxy();
        this.maxConcurrentRecordings = context.maxConcurrentRecordings();
        this.maxConcurrentReplays = context.maxConcurrentReplays();
        this.controlSubscription = aeron.addSubscription(context.controlChannel(), context.controlStreamId(), this, null);
        this.localControlSubscription = aeron.addSubscription(context.localControlChannel(), context.localControlStreamId(), this, null);
        this.recordingEventsProxy = new RecordingEventsProxy(context.idleStrategy(), aeron.addExclusivePublication(context.recordingEventsChannel(), context.recordingEventsStreamId()));
        this.cachedEpochClock.update(this.epochClock.time());
        this.catalog = context.catalog();
        this.markFile = context.archiveMarkFile();
    }

    @Override // org.agrona.concurrent.Agent
    public void onStart() {
        this.replayer = newReplayer();
        this.recorder = newRecorder();
    }

    @Override // io.aeron.AvailableImageHandler
    public void onAvailableImage(Image image) {
        addSession(new ControlSessionDemuxer(image, this));
    }

    protected abstract SessionWorker<RecordingSession> newRecorder();

    protected abstract SessionWorker<ReplaySession> newReplayer();

    @Override // io.aeron.archive.SessionWorker
    protected final void preSessionsClose() {
        closeSessionWorkers();
    }

    protected abstract void closeSessionWorkers();

    @Override // io.aeron.archive.SessionWorker
    protected void postSessionsClose() {
        if (this.ctx.ownsAeronClient()) {
            return;
        }
        Iterator<Subscription> it = this.recordingSubscriptionMap.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        CloseHelper.close(this.localControlSubscription);
        CloseHelper.close(this.controlSubscription);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.aeron.archive.SessionWorker
    public int preWork() {
        long time = this.epochClock.time();
        if (this.cachedEpochClock.time() != time) {
            this.cachedEpochClock.update(time);
            this.markFile.updateActivityTimestamp(time);
        }
        return 0 + invokeDriverConductor() + invokeClientConductor();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final int invokeDriverConductor() {
        if (null != this.driverAgentInvoker) {
            return this.driverAgentInvoker.invoke();
        }
        return 0;
    }

    private int invokeClientConductor() {
        if (null != this.aeronAgentInvoker) {
            return this.aeronAgentInvoker.invoke();
        }
        return 0;
    }

    Catalog catalog() {
        return this.catalog;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startRecordingSubscription(long j, ControlSession controlSession, int i, String str, SourceLocation sourceLocation) {
        if (this.recordingSessionByIdMap.size() >= this.maxConcurrentRecordings) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "Max concurrent recordings reached: " + this.maxConcurrentRecordings, this.controlResponseProxy);
            return;
        }
        try {
            String build = strippedChannelBuilder(str).build();
            String makeKey = makeKey(i, build);
            if (this.recordingSubscriptionMap.get(makeKey) == null) {
                this.recordingSubscriptionMap.put(makeKey, this.aeron.addSubscription((str.contains(UdpChannel.MEDIA_ID) && sourceLocation == SourceLocation.LOCAL) ? CommonContext.SPY_PREFIX + build : build, i, image -> {
                    startRecordingSession(controlSession, j, build, str, image);
                }, null));
                controlSession.sendOkResponse(j, this.controlResponseProxy);
            } else {
                controlSession.sendResponse(j, ControlResponseCode.ERROR, "Recording already setup for subscription: " + makeKey, this.controlResponseProxy);
            }
        } catch (Exception e) {
            this.errorHandler.onError(e);
            controlSession.sendResponse(j, ControlResponseCode.ERROR, e.getMessage(), this.controlResponseProxy);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopRecording(long j, ControlSession controlSession, int i, String str) {
        try {
            String makeKey = makeKey(i, strippedChannelBuilder(str).build());
            Subscription remove = this.recordingSubscriptionMap.remove(makeKey);
            if (remove != null) {
                remove.close();
                controlSession.sendOkResponse(j, this.controlResponseProxy);
            } else {
                controlSession.sendResponse(j, ControlResponseCode.ERROR, "No recording subscription found for: " + makeKey, this.controlResponseProxy);
            }
        } catch (Exception e) {
            this.errorHandler.onError(e);
            controlSession.sendResponse(j, ControlResponseCode.ERROR, e.getMessage(), this.controlResponseProxy);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListRecordingsSession newListRecordingsSession(long j, long j2, int i, ControlSession controlSession) {
        return new ListRecordingsSession(j, j2, i, this.catalog, this.controlResponseProxy, controlSession, this.descriptorBuffer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListRecordingsForUriSession newListRecordingsForUriSession(long j, long j2, int i, int i2, String str, ControlSession controlSession) {
        return new ListRecordingsForUriSession(j, j2, i, str, i2, this.catalog, this.controlResponseProxy, controlSession, this.descriptorBuffer, this.recordingDescriptorDecoder);
    }

    public void listRecording(long j, ControlSession controlSession, long j2) {
        if (this.catalog.wrapAndValidateDescriptor(j2, this.descriptorBuffer)) {
            controlSession.sendDescriptor(j, this.descriptorBuffer, this.controlResponseProxy);
        } else {
            controlSession.sendRecordingUnknown(j, j2, this.controlResponseProxy);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startReplay(long j, ControlSession controlSession, long j2, long j3, long j4, int i, String str) {
        if (this.replaySessionByIdMap.size() >= this.maxConcurrentReplays) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "Max concurrent replays reached: " + this.maxConcurrentReplays, this.controlResponseProxy);
            return;
        }
        if (!this.catalog.hasRecording(j2)) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "Unknown recording : " + j2, this.controlResponseProxy);
            return;
        }
        this.catalog.recordingSummary(j2, this.recordingSummary);
        long j5 = this.recordingSummary.startPosition;
        if (j3 != -1) {
            if (!validateReplayPosition(j, controlSession, j3, this.recordingSummary)) {
                return;
            } else {
                j5 = j3;
            }
        }
        ExclusivePublication newReplayPublication = newReplayPublication(j, controlSession, str, i, j5, this.recordingSummary);
        RecordingSession recordingSession = this.recordingSessionByIdMap.get(j2);
        ReplaySession replaySession = new ReplaySession(j5, j4, this.catalog, controlSession, this.archiveDir, this.controlResponseProxy, j, this.cachedEpochClock, newReplayPublication, this.recordingSummary, null == recordingSession ? null : recordingSession.recordingPosition());
        this.replaySessionByIdMap.put(replaySession.sessionId(), (long) replaySession);
        this.replayer.addSession(replaySession);
    }

    private boolean validateReplayPosition(long j, ControlSession controlSession, long j2, RecordingSummary recordingSummary) {
        if ((j2 & 31) != 0) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "requested replay start position(=" + j2 + ") is not a multiple of FRAME_ALIGNMENT (=32)", this.controlResponseProxy);
            return false;
        }
        long j3 = recordingSummary.startPosition;
        if (j2 - j3 < 0) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "requested replay start position(=" + j2 + ") is before recording start position(=" + j3 + ")", this.controlResponseProxy);
            return false;
        }
        long j4 = recordingSummary.stopPosition;
        if (j4 == -1 || j2 < j4) {
            return true;
        }
        controlSession.sendResponse(j, ControlResponseCode.ERROR, "requested replay start position(=" + j2 + ") must be before current highest recorded position(=" + j4 + ")", this.controlResponseProxy);
        return false;
    }

    public void stopReplay(long j, ControlSession controlSession, long j2) {
        ReplaySession replaySession = this.replaySessionByIdMap.get(j2);
        if (null == replaySession) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "Replay session not known: id=" + j2, this.controlResponseProxy);
        } else {
            replaySession.abort();
            controlSession.sendOkResponse(j, this.controlResponseProxy);
        }
    }

    public void extendRecording(long j, ControlSession controlSession, long j2, int i, String str, SourceLocation sourceLocation) {
        if (this.recordingSessionByIdMap.size() >= this.maxConcurrentRecordings) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "Max concurrent recordings reached: " + this.maxConcurrentRecordings, this.controlResponseProxy);
            return;
        }
        if (!this.catalog.hasRecording(j2)) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "Unknown recording : " + j2, this.controlResponseProxy);
            return;
        }
        if (this.recordingSessionByIdMap.containsKey(j2)) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "Can not extend active recording : " + j2, this.controlResponseProxy);
            return;
        }
        RecordingSummary recordingSummary = new RecordingSummary();
        this.catalog.recordingSummary(j2, recordingSummary);
        ChannelUri parse = ChannelUri.parse(str);
        String str2 = parse.get(CommonContext.SESSION_ID_PARAM_NAME);
        if (null == str2 || recordingSummary.sessionId != Integer.parseInt(str2)) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "Extend recording channel must contain correct sessionId: " + j2, this.controlResponseProxy);
            return;
        }
        try {
            String build = strippedChannelBuilder(parse).build();
            String makeKey = makeKey(i, build);
            if (this.recordingSubscriptionMap.get(makeKey) == null) {
                this.recordingSubscriptionMap.put(makeKey, this.aeron.addSubscription((str.contains(UdpChannel.MEDIA_ID) && sourceLocation == SourceLocation.LOCAL) ? CommonContext.SPY_PREFIX + build : build, i, image -> {
                    extendRecordingSession(controlSession, j, j2, build, str, recordingSummary, image);
                }, null));
                controlSession.sendOkResponse(j, this.controlResponseProxy);
            } else {
                controlSession.sendResponse(j, ControlResponseCode.ERROR, "Recording already setup for subscription: " + makeKey, this.controlResponseProxy);
            }
        } catch (Exception e) {
            this.errorHandler.onError(e);
            controlSession.sendResponse(j, ControlResponseCode.ERROR, e.getMessage(), this.controlResponseProxy);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ControlSession newControlSession(long j, int i, String str, ControlSessionDemuxer controlSessionDemuxer) {
        ConcurrentPublication addPublication = this.aeron.addPublication(!str.contains(CommonContext.TERM_LENGTH_PARAM_NAME) ? strippedChannelBuilder(str).termLength(Integer.valueOf(CONTROL_TERM_LENGTH)).mtu(Integer.valueOf(CONTROL_MTU)).build() : str, i);
        long j2 = this.nextControlSessionId;
        this.nextControlSessionId = j2 + 1;
        ControlSession controlSession = new ControlSession(j2, j, controlSessionDemuxer, addPublication, this, this.cachedEpochClock, this.controlResponseProxy);
        addSession(controlSession);
        return controlSession;
    }

    ChannelUriStringBuilder strippedChannelBuilder(ChannelUri channelUri) {
        this.channelBuilder.clear().media(channelUri.media()).endpoint(channelUri.get(CommonContext.ENDPOINT_PARAM_NAME)).networkInterface(channelUri.get(CommonContext.INTERFACE_PARAM_NAME)).controlEndpoint(channelUri.get(CommonContext.MDC_CONTROL_PARAM_NAME)).sessionId(ChannelUriStringBuilder.integerValueOf(channelUri.get(CommonContext.SESSION_ID_PARAM_NAME)));
        return this.channelBuilder;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelUriStringBuilder strippedChannelBuilder(String str) {
        return strippedChannelBuilder(ChannelUri.parse(str));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeRecordingSession(RecordingSession recordingSession) {
        long sessionId = recordingSession.sessionId();
        this.recordingSessionByIdMap.remove(sessionId);
        this.catalog.recordingStopped(sessionId, recordingSession.recordingPosition().get(), this.cachedEpochClock.time());
        closeSession(recordingSession);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeReplaySession(ReplaySession replaySession) {
        this.replaySessionByIdMap.remove(replaySession.sessionId());
        closeSession(replaySession);
    }

    private void startRecordingSession(ControlSession controlSession, long j, String str, String str2, Image image) {
        validateMaxConcurrentRecordings(controlSession, j, str2, image);
        int sessionId = image.sessionId();
        int streamId = image.subscription().streamId();
        String sourceIdentity = image.sourceIdentity();
        int termBufferLength = image.termBufferLength();
        int mtuLength = image.mtuLength();
        int initialTermId = image.initialTermId();
        long joinPosition = image.joinPosition();
        long addNewRecording = this.catalog.addNewRecording(joinPosition, this.cachedEpochClock.time(), initialTermId, this.ctx.segmentFileLength(), termBufferLength, mtuLength, sessionId, streamId, str, str2, sourceIdentity);
        Counter allocate = RecordingPos.allocate(this.aeron, this.tempBuffer, addNewRecording, controlSession.sessionId(), j, sessionId, streamId, str);
        allocate.setOrdered(joinPosition);
        RecordingSession recordingSession = new RecordingSession(addNewRecording, joinPosition, str2, this.recordingEventsProxy, image, allocate, this.archiveDirChannel, this.ctx);
        this.recordingSessionByIdMap.put(addNewRecording, (long) recordingSession);
        this.recorder.addSession(recordingSession);
    }

    private void extendRecordingSession(ControlSession controlSession, long j, long j2, String str, String str2, RecordingSummary recordingSummary, Image image) {
        validateMaxConcurrentRecordings(controlSession, j, str2, image);
        validateImageForExtendRecording(j, controlSession, image, recordingSummary);
        int sessionId = image.sessionId();
        int streamId = image.subscription().streamId();
        long joinPosition = image.joinPosition();
        Counter allocate = RecordingPos.allocate(this.aeron, this.tempBuffer, j2, controlSession.sessionId(), j, sessionId, streamId, str);
        allocate.setOrdered(joinPosition);
        RecordingSession recordingSession = new RecordingSession(j2, recordingSummary.startPosition, str2, this.recordingEventsProxy, image, allocate, this.archiveDirChannel, this.ctx);
        this.catalog.extendRecording(j2);
        this.recordingSessionByIdMap.put(j2, (long) recordingSession);
        this.recorder.addSession(recordingSession);
    }

    private ExclusivePublication newReplayPublication(long j, ControlSession controlSession, String str, int i, long j2, RecordingSummary recordingSummary) {
        int i2 = recordingSummary.initialTermId;
        int i3 = recordingSummary.termBufferLength;
        try {
            return this.aeron.addExclusivePublication(strippedChannelBuilder(str).mtu(Integer.valueOf(recordingSummary.mtuLength)).termLength(Integer.valueOf(i3)).initialTermId(Integer.valueOf(i2)).termId(Integer.valueOf(LogBufferDescriptor.computeTermIdFromPosition(j2, LogBufferDescriptor.positionBitsToShift(i3), i2))).termOffset(Integer.valueOf((int) (j2 & (i3 - 1)))).build(), i);
        } catch (Exception e) {
            controlSession.sendResponse(j, ControlResponseCode.ERROR, "Failed to create replay publication - " + e, this.controlResponseProxy);
            throw e;
        }
    }

    private void validateMaxConcurrentRecordings(ControlSession controlSession, long j, String str, Image image) {
        if (this.recordingSessionByIdMap.size() >= this.maxConcurrentRecordings) {
            String str2 = "Max concurrent recordings reached, can't record: " + image.subscription().streamId() + ":" + str;
            controlSession.attemptErrorResponse(j, str2, this.controlResponseProxy);
            throw new IllegalStateException(str2);
        }
    }

    private void validateImageForExtendRecording(long j, ControlSession controlSession, Image image, RecordingSummary recordingSummary) {
        if (image.joinPosition() != recordingSummary.stopPosition) {
            String str = "Can't extend recording: " + recordingSummary.recordingId + ": image joinPosition=" + image.joinPosition() + " not equal to recording stopPosition=" + recordingSummary.stopPosition;
            controlSession.attemptErrorResponse(j, str, this.controlResponseProxy);
            throw new IllegalStateException(str);
        }
        if (image.termBufferLength() != recordingSummary.termBufferLength) {
            String str2 = "Can't extend recording: " + recordingSummary.recordingId + ": image termBufferLength=" + image.termBufferLength() + " not equal to recording termBufferLength=" + recordingSummary.termBufferLength;
            controlSession.attemptErrorResponse(j, str2, this.controlResponseProxy);
            throw new IllegalStateException(str2);
        }
        if (image.mtuLength() != recordingSummary.mtuLength) {
            String str3 = "Can't extend recording: " + recordingSummary.recordingId + ": image mtuLength=" + image.mtuLength() + " not equal to recording mtuLength=" + recordingSummary.mtuLength;
            controlSession.attemptErrorResponse(j, str3, this.controlResponseProxy);
            throw new IllegalStateException(str3);
        }
    }

    private static String makeKey(int i, String str) {
        return i + ":" + str;
    }
}
