package org.red5.server.stream.consumer;

import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.mina.core.buffer.IoBuffer;
import org.red5.io.IStreamableFile;
import org.red5.io.ITag;
import org.red5.io.ITagWriter;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IClientStream;
import org.red5.server.api.stream.IStreamableFileFactory;
import org.red5.server.messaging.IMessage;
import org.red5.server.messaging.IMessageComponent;
import org.red5.server.messaging.IPipe;
import org.red5.server.messaging.IPipeConnectionListener;
import org.red5.server.messaging.IPushableConsumer;
import org.red5.server.messaging.OOBControlMessage;
import org.red5.server.messaging.PipeConnectionEvent;
import org.red5.server.net.rtmp.event.FlexStreamSend;
import org.red5.server.net.rtmp.event.IRTMPEvent;
import org.red5.server.net.rtmp.event.VideoData;
import org.red5.server.net.rtmp.message.Constants;
import org.red5.server.stream.IStreamData;
import org.red5.server.stream.StreamableFileFactory;
import org.red5.server.stream.message.RTMPMessage;
import org.red5.server.stream.message.ResetMessage;
import org.red5.server.util.ScopeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

/* loaded from: input_file:org/red5/server/stream/consumer/FileConsumer.class */
public class FileConsumer implements Constants, IPushableConsumer, IPipeConnectionListener, InitializingBean, DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(FileConsumer.class);
    private ScheduledExecutorService scheduledExecutorService;
    private int schedulerThreadSize;
    private PriorityQueue<QueuedData> queue;
    private ReentrantReadWriteLock reentrantLock;
    private volatile Lock writeLock;
    private volatile Lock readLock;
    private IScope scope;
    private File file;
    private ITagWriter writer;
    private String mode;
    private int startTimestamp;
    private int lastTimestamp;
    private ITag videoConfigurationTag;
    private ITag audioConfigurationTag;
    private int queueThreshold;
    private int percentage;
    private boolean delayWrite;
    private volatile int lastWrittenTs;
    private volatile Future<?> writerFuture;
    private AtomicBoolean gotVideoKeyFrame;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/red5/server/stream/consumer/FileConsumer$QueuedData.class */
    public static final class QueuedData implements Comparable<QueuedData> {
        ITag tag;

        QueuedData(int i, byte b) {
            this.tag = ImmutableTag.build(b, i);
        }

        QueuedData(int i, byte b, IStreamData iStreamData) {
            this.tag = ImmutableTag.build(b, i, iStreamData.getData());
        }

        public int getTimestamp() {
            return this.tag.getTimestamp();
        }

        public byte getDataType() {
            return this.tag.getDataType();
        }

        public ITag getData() {
            return this.tag;
        }

        public boolean hasData() {
            return this.tag != null;
        }

        public int hashCode() {
            return (31 * ((31 * 1) + this.tag.getDataType())) + this.tag.getTimestamp();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            QueuedData queuedData = (QueuedData) obj;
            return this.tag.getDataType() == queuedData.getDataType() && this.tag.getTimestamp() == queuedData.getTimestamp();
        }

        @Override // java.lang.Comparable
        public int compareTo(QueuedData queuedData) {
            if (this.tag.getTimestamp() > queuedData.getTimestamp()) {
                return 1;
            }
            return this.tag.getTimestamp() < queuedData.getTimestamp() ? -1 : 0;
        }

        public void dispose() {
            this.tag = null;
        }
    }

    public FileConsumer() {
        this.schedulerThreadSize = 1;
        this.startTimestamp = -1;
        this.queueThreshold = -1;
        this.percentage = 25;
        this.delayWrite = false;
        this.lastWrittenTs = -1;
        this.gotVideoKeyFrame = new AtomicBoolean(false);
    }

    public FileConsumer(IScope iScope, File file) {
        this();
        this.scope = iScope;
        this.file = file;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.delayWrite) {
            this.scheduledExecutorService = Executors.newScheduledThreadPool(this.schedulerThreadSize, new CustomizableThreadFactory("FileConsumerExecutor-"));
        }
    }

    @Override // org.red5.server.messaging.IPushableConsumer
    public void pushMessage(IPipe iPipe, IMessage iMessage) throws IOException {
        QueuedData queuedData;
        if (!(iMessage instanceof RTMPMessage)) {
            if (iMessage instanceof ResetMessage) {
                this.startTimestamp = -1;
                return;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Ignoring pushed message: {}", iMessage);
                    return;
                }
                return;
            }
        }
        IRTMPEvent body = ((RTMPMessage) iMessage).getBody();
        byte dataType = body.getDataType();
        int timestamp = body.getTimestamp();
        log.debug("Data type: {} timestamp: {}", Byte.valueOf(dataType), Integer.valueOf(timestamp));
        if (!(body instanceof FlexStreamSend)) {
            this.lastTimestamp = timestamp;
        }
        if ((body instanceof VideoData) && !this.gotVideoKeyFrame.get()) {
            if (((VideoData) body).getFrameType() != VideoData.FrameType.KEYFRAME) {
                log.debug("Skipping video data since keyframe has not been written yet");
                return;
            } else {
                log.debug("Got our first keyframe");
                this.gotVideoKeyFrame.set(true);
            }
        }
        if (this.writer == null) {
            init();
        }
        if (!this.delayWrite) {
            write(timestamp, body);
            return;
        }
        if (body instanceof IStreamData) {
            if (log.isTraceEnabled()) {
                log.trace("Stream data, body saved. Data type: {} class type: {}", Byte.valueOf(dataType), body.getClass().getName());
            }
            queuedData = new QueuedData(timestamp, dataType, (IStreamData) body);
        } else {
            if (log.isTraceEnabled()) {
                log.trace("Non-stream data, body not saved. Data type: {} class type: {}", Byte.valueOf(dataType), body.getClass().getName());
            }
            queuedData = new QueuedData(timestamp, dataType);
        }
        this.writeLock.lock();
        try {
            this.queue.add(queuedData);
            this.writeLock.unlock();
            this.readLock.lock();
            try {
                int size = this.queue.size();
                this.readLock.unlock();
                if (body instanceof VideoData) {
                    writeQueuedDataSlice(createTimestampLimitedSlice(body.getTimestamp()));
                } else {
                    if (this.queueThreshold < 0 || size < this.queueThreshold) {
                        return;
                    }
                    writeQueuedDataSlice(createFixedLengthSlice(this.queueThreshold / (100 / this.percentage)));
                }
            } catch (Throwable th) {
                this.readLock.unlock();
                throw th;
            }
        } catch (Throwable th2) {
            this.writeLock.unlock();
            throw th2;
        }
    }

    private void writeQueuedDataSlice(final QueuedData[] queuedDataArr) {
        if (acquireWriteFuture(queuedDataArr.length)) {
            this.writerFuture = this.scheduledExecutorService.submit(new Runnable() { // from class: org.red5.server.stream.consumer.FileConsumer.1
                @Override // java.lang.Runnable
                public void run() {
                    FileConsumer.log.trace("Spawning queue writer thread");
                    FileConsumer.this.doWrites(queuedDataArr);
                }
            });
            return;
        }
        this.writeLock.lock();
        try {
            for (QueuedData queuedData : Arrays.asList(queuedDataArr)) {
                if (queuedData.hasData()) {
                    this.queue.add(queuedData);
                }
            }
        } finally {
            this.writeLock.unlock();
        }
    }

    private QueuedData[] createFixedLengthSlice(int i) {
        log.debug("Creating data slice to write of length {}.", Integer.valueOf(i));
        QueuedData[] queuedDataArr = new QueuedData[i];
        log.trace("Slice length: {}", Integer.valueOf(queuedDataArr.length));
        this.writeLock.lock();
        try {
            if (log.isTraceEnabled()) {
                log.trace("Queue length: {}", Integer.valueOf(this.queue.size()));
            }
            for (int i2 = 0; i2 < i; i2++) {
                queuedDataArr[i2] = this.queue.remove();
            }
            if (log.isTraceEnabled()) {
                log.trace("Queue length (after removal): {}", Integer.valueOf(this.queue.size()));
            }
            return queuedDataArr;
        } finally {
            this.writeLock.unlock();
        }
    }

    private QueuedData[] createTimestampLimitedSlice(int i) {
        log.debug("Creating data slice up until timestamp {}.", Integer.valueOf(i));
        ArrayList arrayList = new ArrayList();
        this.writeLock.lock();
        try {
            if (log.isTraceEnabled()) {
                log.trace("Queue length: {}", Integer.valueOf(this.queue.size()));
            }
            if (!this.queue.isEmpty()) {
                while (!this.queue.isEmpty() && this.queue.peek().getTimestamp() <= i) {
                    arrayList.add(this.queue.remove());
                }
                if (log.isTraceEnabled()) {
                    log.trace("Queue length (after removal): {}", Integer.valueOf(this.queue.size()));
                }
            }
            return (QueuedData[]) arrayList.toArray(new QueuedData[arrayList.size()]);
        } finally {
            this.writeLock.unlock();
        }
    }

    private boolean acquireWriteFuture(int i) {
        if (i <= 0) {
            return false;
        }
        Object obj = null;
        int i2 = i * 500;
        if (this.writerFuture != null) {
            try {
                obj = this.writerFuture.get(i2, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                log.warn("Exception waiting for write result. Timeout: {}ms", Integer.valueOf(i2), e);
                return false;
            }
        }
        log.debug("Write future result (expect null): {}", obj);
        return true;
    }

    @Override // org.red5.server.messaging.IMessageComponent
    public void onOOBControlMessage(IMessageComponent iMessageComponent, IPipe iPipe, OOBControlMessage oOBControlMessage) {
    }

    @Override // org.red5.server.messaging.IPipeConnectionListener
    public void onPipeConnectionEvent(PipeConnectionEvent pipeConnectionEvent) {
        Map<String, Object> paramMap;
        switch (pipeConnectionEvent.getType()) {
            case CONSUMER_CONNECT_PUSH:
                if (pipeConnectionEvent.getConsumer() != this || (paramMap = pipeConnectionEvent.getParamMap()) == null) {
                    return;
                }
                this.mode = (String) paramMap.get("mode");
                return;
            default:
                return;
        }
    }

    private void init() throws IOException {
        log.debug("Init");
        if (this.file == null) {
            log.warn("Consumer is uninitialized");
            return;
        }
        if (this.delayWrite) {
            this.queue = new PriorityQueue<>(this.queueThreshold <= 0 ? 11 : this.queueThreshold);
            this.reentrantLock = new ReentrantReadWriteLock();
            this.writeLock = this.reentrantLock.writeLock();
            this.readLock = this.reentrantLock.readLock();
        }
        IStreamableFileFactory iStreamableFileFactory = (IStreamableFileFactory) ScopeUtils.getScopeService(this.scope, (Class<?>) IStreamableFileFactory.class, (Class<?>) StreamableFileFactory.class);
        File parentFile = this.file.getParentFile();
        if (!parentFile.exists() && !parentFile.mkdirs()) {
            throw new IOException("Could not create parent folder");
        }
        if (!this.file.isFile()) {
            this.file.createNewFile();
        } else if (!this.file.canWrite()) {
            throw new IOException("The file is read-only");
        }
        IStreamableFile streamableFile = iStreamableFileFactory.getService(this.file).getStreamableFile(this.file);
        if (this.mode != null && !this.mode.equals(IClientStream.MODE_RECORD)) {
            if (!this.mode.equals(IClientStream.MODE_APPEND)) {
                throw new IllegalStateException(String.format("Illegal mode type: %s", this.mode));
            }
            this.writer = streamableFile.getAppendWriter();
            return;
        }
        this.writer = streamableFile.getWriter();
        if (this.videoConfigurationTag != null) {
            this.writer.writeTag(this.videoConfigurationTag);
            this.videoConfigurationTag = null;
        }
        if (this.audioConfigurationTag != null) {
            this.writer.writeTag(this.audioConfigurationTag);
            this.audioConfigurationTag = null;
        }
    }

    public void uninit() {
        log.debug("Uninit");
        if (this.writer != null) {
            if (this.writerFuture != null) {
                try {
                    this.writerFuture.get();
                } catch (Exception e) {
                    log.warn("Exception waiting for write result on uninit", e);
                }
                if (this.writerFuture.cancel(false)) {
                    log.debug("Future completed");
                }
            }
            this.writerFuture = null;
            if (this.delayWrite) {
                doWrites();
                this.queue.clear();
                this.queue = null;
            }
            this.writer.close();
            this.writer = null;
        }
        this.file = null;
    }

    public final void doWrites() {
        this.writeLock.lock();
        try {
            QueuedData[] queuedDataArr = (QueuedData[]) this.queue.toArray(new QueuedData[0]);
            if (this.queue.removeAll(Arrays.asList(queuedDataArr))) {
                log.debug("Queued writes transfered, count: {}", Integer.valueOf(queuedDataArr.length));
            }
            Arrays.sort(queuedDataArr);
            doWrites(queuedDataArr);
        } finally {
            this.writeLock.unlock();
        }
    }

    public final void doWrites(QueuedData[] queuedDataArr) {
        for (QueuedData queuedData : queuedDataArr) {
            int timestamp = queuedData.getTimestamp();
            if (this.lastWrittenTs > timestamp) {
                queuedData.dispose();
            } else if (queuedData.hasData()) {
                write(queuedData);
                this.lastWrittenTs = timestamp;
                queuedData.dispose();
            } else if (log.isTraceEnabled()) {
                log.trace("Queued data was not available");
            }
        }
    }

    private final void write(int i, IRTMPEvent iRTMPEvent) {
        int i2;
        byte dataType = iRTMPEvent.getDataType();
        log.debug("Write - timestamp: {} type: {}", Integer.valueOf(i), Byte.valueOf(dataType));
        IoBuffer data = ((IStreamData) iRTMPEvent).getData();
        if (data != null) {
            if (this.startTimestamp == -1) {
                this.startTimestamp = i;
                i2 = 0;
            } else {
                i2 = i - this.startTimestamp;
            }
            ImmutableTag build = ImmutableTag.build(dataType, i2, data, 0);
            if (build.getBodySize() > 0 || dataType == 8) {
                try {
                    try {
                        if (i2 < 0) {
                            log.warn("Skipping message with negative timestamp.");
                        } else if (!this.writer.writeTag(build)) {
                            log.warn("Tag was not written");
                        }
                        if (data != null) {
                            data.clear();
                            data.free();
                        }
                    } catch (IOException e) {
                        log.error("Error writing tag", e);
                        if (data != null) {
                            data.clear();
                            data.free();
                        }
                    }
                } catch (Throwable th) {
                    if (data != null) {
                        data.clear();
                        data.free();
                    }
                    throw th;
                }
            }
        }
    }

    private final void write(QueuedData queuedData) {
        int i;
        byte dataType = queuedData.getDataType();
        int timestamp = queuedData.getTimestamp();
        log.debug("Write - timestamp: {} type: {}", Integer.valueOf(timestamp), Byte.valueOf(dataType));
        ITag data = queuedData.getData();
        if (data != null) {
            if (data.getBodySize() > 0 || dataType == 8) {
                if (this.startTimestamp == -1) {
                    this.startTimestamp = timestamp;
                    i = 0;
                } else {
                    i = timestamp - this.startTimestamp;
                }
                data.setTimestamp(i);
                try {
                    try {
                        if (i < 0) {
                            log.warn("Skipping message with negative timestamp.");
                        } else if (!this.writer.writeTag(data)) {
                            log.warn("Tag was not written");
                        }
                        queuedData.dispose();
                    } catch (ClosedChannelException e) {
                        log.error("The writer is no longer able to write to the file: {} writable: {}", this.file.getName(), Boolean.valueOf(this.file.canWrite()));
                        queuedData.dispose();
                    } catch (IOException e2) {
                        log.warn("Error writing tag", e2);
                        if (e2.getCause() instanceof ClosedChannelException) {
                            log.error("The writer is no longer able to write to the file: {} writable: {}", this.file.getName(), Boolean.valueOf(this.file.canWrite()));
                        }
                        queuedData.dispose();
                    }
                } catch (Throwable th) {
                    queuedData.dispose();
                    throw th;
                }
            }
        }
    }

    public void setVideoDecoderConfiguration(IRTMPEvent iRTMPEvent) {
        if (iRTMPEvent instanceof IStreamData) {
            this.videoConfigurationTag = ImmutableTag.build(iRTMPEvent.getDataType(), 0, ((IStreamData) iRTMPEvent).getData().asReadOnlyBuffer(), 0);
        }
    }

    public void setAudioDecoderConfiguration(IRTMPEvent iRTMPEvent) {
        if (iRTMPEvent instanceof IStreamData) {
            this.audioConfigurationTag = ImmutableTag.build(iRTMPEvent.getDataType(), 0, ((IStreamData) iRTMPEvent).getData().asReadOnlyBuffer(), 0);
        }
    }

    public void setScope(IScope iScope) {
        this.scope = iScope;
    }

    public void setFile(File file) {
        this.file = file;
    }

    public File getFile() {
        return this.file;
    }

    public void setQueueThreshold(int i) {
        this.queueThreshold = i;
    }

    public int getQueueThreshold() {
        return this.queueThreshold;
    }

    public boolean isDelayWrite() {
        return this.delayWrite;
    }

    public void setDelayWrite(boolean z) {
        this.delayWrite = z;
    }

    public int getSchedulerThreadSize() {
        return this.schedulerThreadSize;
    }

    public void setSchedulerThreadSize(int i) {
        this.schedulerThreadSize = i;
    }

    public void setMode(String str) {
        this.mode = str;
    }

    public void destroy() throws Exception {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdown();
        }
    }
}
