/*
 * Decompiled with CFR 0.152.
 */
package org.red5.server.stream.consumer;

import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.mina.core.buffer.IoBuffer;
import org.red5.io.IStreamableFile;
import org.red5.io.ITag;
import org.red5.io.ITagWriter;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.service.IStreamableFileService;
import org.red5.server.api.stream.IStreamableFileFactory;
import org.red5.server.messaging.IMessage;
import org.red5.server.messaging.IMessageComponent;
import org.red5.server.messaging.IPipe;
import org.red5.server.messaging.IPipeConnectionListener;
import org.red5.server.messaging.IPushableConsumer;
import org.red5.server.messaging.OOBControlMessage;
import org.red5.server.messaging.PipeConnectionEvent;
import org.red5.server.net.rtmp.event.FlexStreamSend;
import org.red5.server.net.rtmp.event.IRTMPEvent;
import org.red5.server.net.rtmp.event.VideoData;
import org.red5.server.net.rtmp.message.Constants;
import org.red5.server.stream.IStreamData;
import org.red5.server.stream.StreamableFileFactory;
import org.red5.server.stream.consumer.ImmutableTag;
import org.red5.server.stream.message.RTMPMessage;
import org.red5.server.stream.message.ResetMessage;
import org.red5.server.util.ScopeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

public class FileConsumer
implements Constants,
IPushableConsumer,
IPipeConnectionListener,
InitializingBean,
DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(FileConsumer.class);
    private ScheduledExecutorService scheduledExecutorService;
    private int schedulerThreadSize = 1;
    private PriorityQueue<QueuedData> queue;
    private ReentrantReadWriteLock reentrantLock;
    private volatile Lock writeLock;
    private volatile Lock readLock;
    private IScope scope;
    private File file;
    private ITagWriter writer;
    private String mode;
    private int startTimestamp = -1;
    private int lastTimestamp;
    private ITag videoConfigurationTag;
    private ITag audioConfigurationTag;
    private int queueThreshold = -1;
    private int percentage = 25;
    private boolean delayWrite = false;
    private volatile int lastWrittenTs = -1;
    private volatile Future<?> writerFuture;
    private AtomicBoolean gotVideoKeyFrame = new AtomicBoolean(false);

    public FileConsumer() {
    }

    public FileConsumer(IScope scope, File file) {
        this();
        this.scope = scope;
        this.file = file;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.delayWrite) {
            this.scheduledExecutorService = Executors.newScheduledThreadPool(this.schedulerThreadSize, (ThreadFactory)new CustomizableThreadFactory("FileConsumerExecutor-"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pushMessage(IPipe pipe, IMessage message) throws IOException {
        if (message instanceof RTMPMessage) {
            IRTMPEvent msg = ((RTMPMessage)message).getBody();
            byte dataType = msg.getDataType();
            int timestamp = msg.getTimestamp();
            log.debug("Data type: {} timestamp: {}", (Object)dataType, (Object)timestamp);
            if (!(msg instanceof FlexStreamSend)) {
                this.lastTimestamp = timestamp;
            }
            if (msg instanceof VideoData && !this.gotVideoKeyFrame.get()) {
                VideoData video = (VideoData)msg;
                if (video.getFrameType() == VideoData.FrameType.KEYFRAME) {
                    log.debug("Got our first keyframe");
                    this.gotVideoKeyFrame.set(true);
                } else {
                    log.debug("Skipping video data since keyframe has not been written yet");
                    return;
                }
            }
            if (this.writer == null) {
                this.init();
            }
            if (!this.delayWrite) {
                this.write(timestamp, msg);
            } else {
                QueuedData queued = null;
                if (msg instanceof IStreamData) {
                    if (log.isTraceEnabled()) {
                        log.trace("Stream data, body saved. Data type: {} class type: {}", (Object)dataType, (Object)msg.getClass().getName());
                    }
                    queued = new QueuedData(timestamp, dataType, (IStreamData)((Object)msg));
                } else {
                    if (log.isTraceEnabled()) {
                        log.trace("Non-stream data, body not saved. Data type: {} class type: {}", (Object)dataType, (Object)msg.getClass().getName());
                    }
                    queued = new QueuedData(timestamp, dataType);
                }
                this.writeLock.lock();
                try {
                    this.queue.add(queued);
                }
                finally {
                    this.writeLock.unlock();
                }
                int queueSize = 0;
                this.readLock.lock();
                try {
                    queueSize = this.queue.size();
                }
                finally {
                    this.readLock.unlock();
                }
                if (msg instanceof VideoData) {
                    this.writeQueuedDataSlice(this.createTimestampLimitedSlice(msg.getTimestamp()));
                } else if (this.queueThreshold >= 0 && queueSize >= this.queueThreshold) {
                    this.writeQueuedDataSlice(this.createFixedLengthSlice(this.queueThreshold / (100 / this.percentage)));
                }
            }
        } else if (message instanceof ResetMessage) {
            this.startTimestamp = -1;
        } else if (log.isDebugEnabled()) {
            log.debug("Ignoring pushed message: {}", (Object)message);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeQueuedDataSlice(final QueuedData[] slice) {
        if (this.acquireWriteFuture(slice.length)) {
            this.writerFuture = this.scheduledExecutorService.submit(new Runnable(){

                @Override
                public void run() {
                    log.trace("Spawning queue writer thread");
                    FileConsumer.this.doWrites(slice);
                }
            });
        } else {
            this.writeLock.lock();
            try {
                List<QueuedData> unwritten = Arrays.asList(slice);
                for (QueuedData queued : unwritten) {
                    if (!queued.hasData()) continue;
                    this.queue.add(queued);
                }
            }
            finally {
                this.writeLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private QueuedData[] createFixedLengthSlice(int sliceLength) {
        log.debug("Creating data slice to write of length {}.", (Object)sliceLength);
        QueuedData[] slice = new QueuedData[sliceLength];
        log.trace("Slice length: {}", (Object)slice.length);
        this.writeLock.lock();
        try {
            if (log.isTraceEnabled()) {
                log.trace("Queue length: {}", (Object)this.queue.size());
            }
            for (int q = 0; q < sliceLength; ++q) {
                slice[q] = (QueuedData)this.queue.remove();
            }
            if (log.isTraceEnabled()) {
                log.trace("Queue length (after removal): {}", (Object)this.queue.size());
            }
        }
        finally {
            this.writeLock.unlock();
        }
        return slice;
    }

    private QueuedData[] createTimestampLimitedSlice(int timestamp) {
        log.debug("Creating data slice up until timestamp {}.", (Object)timestamp);
        ArrayList<QueuedData> slice = new ArrayList<QueuedData>();
        this.writeLock.lock();
        try {
            if (log.isTraceEnabled()) {
                log.trace("Queue length: {}", (Object)this.queue.size());
            }
            if (!this.queue.isEmpty()) {
                while (!this.queue.isEmpty() && this.queue.peek().getTimestamp() <= timestamp) {
                    slice.add((QueuedData)this.queue.remove());
                }
                if (log.isTraceEnabled()) {
                    log.trace("Queue length (after removal): {}", (Object)this.queue.size());
                }
            }
        }
        finally {
            this.writeLock.unlock();
        }
        return slice.toArray(new QueuedData[slice.size()]);
    }

    private boolean acquireWriteFuture(int sliceLength) {
        if (sliceLength > 0) {
            Object writeResult = null;
            int timeout = sliceLength * 500;
            if (this.writerFuture != null) {
                try {
                    writeResult = this.writerFuture.get(timeout, TimeUnit.MILLISECONDS);
                }
                catch (Exception e) {
                    log.warn("Exception waiting for write result. Timeout: {}ms", (Object)timeout, (Object)e);
                    return false;
                }
            }
            log.debug("Write future result (expect null): {}", writeResult);
            return true;
        }
        return false;
    }

    @Override
    public void onOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControlMessage oobCtrlMsg) {
    }

    @Override
    public void onPipeConnectionEvent(PipeConnectionEvent event) {
        switch (event.getType()) {
            case CONSUMER_CONNECT_PUSH: {
                Map<String, Object> paramMap;
                if (event.getConsumer() != this || (paramMap = event.getParamMap()) == null) break;
                this.mode = (String)paramMap.get("mode");
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void init() throws IOException {
        log.debug("Init");
        if (this.file != null) {
            if (this.delayWrite) {
                this.queue = new PriorityQueue(this.queueThreshold <= 0 ? 11 : this.queueThreshold);
                this.reentrantLock = new ReentrantReadWriteLock();
                this.writeLock = this.reentrantLock.writeLock();
                this.readLock = this.reentrantLock.readLock();
            }
            IStreamableFileFactory factory = (IStreamableFileFactory)ScopeUtils.getScopeService(this.scope, IStreamableFileFactory.class, StreamableFileFactory.class);
            File folder = this.file.getParentFile();
            if (!folder.exists() && !folder.mkdirs()) {
                throw new IOException("Could not create parent folder");
            }
            if (!this.file.isFile()) {
                this.file.createNewFile();
            } else if (!this.file.canWrite()) {
                throw new IOException("The file is read-only");
            }
            IStreamableFileService service = factory.getService(this.file);
            IStreamableFile flv = service.getStreamableFile(this.file);
            if (this.mode == null || this.mode.equals("record")) {
                this.writer = flv.getWriter();
                if (this.videoConfigurationTag != null) {
                    this.writer.writeTag(this.videoConfigurationTag);
                    this.videoConfigurationTag = null;
                }
                if (this.audioConfigurationTag == null) return;
                this.writer.writeTag(this.audioConfigurationTag);
                this.audioConfigurationTag = null;
                return;
            } else {
                if (!this.mode.equals("append")) throw new IllegalStateException(String.format("Illegal mode type: %s", this.mode));
                this.writer = flv.getAppendWriter();
            }
            return;
        } else {
            log.warn("Consumer is uninitialized");
        }
    }

    public void uninit() {
        log.debug("Uninit");
        if (this.writer != null) {
            if (this.writerFuture != null) {
                try {
                    this.writerFuture.get();
                }
                catch (Exception e) {
                    log.warn("Exception waiting for write result on uninit", (Throwable)e);
                }
                if (this.writerFuture.cancel(false)) {
                    log.debug("Future completed");
                }
            }
            this.writerFuture = null;
            if (this.delayWrite) {
                this.doWrites();
                this.queue.clear();
                this.queue = null;
            }
            this.writer.close();
            this.writer = null;
        }
        this.file = null;
    }

    public final void doWrites() {
        Object[] slice = null;
        this.writeLock.lock();
        try {
            slice = this.queue.toArray(new QueuedData[0]);
            if (this.queue.removeAll(Arrays.asList(slice))) {
                log.debug("Queued writes transfered, count: {}", (Object)slice.length);
            }
        }
        finally {
            this.writeLock.unlock();
        }
        Arrays.sort(slice);
        this.doWrites((QueuedData[])slice);
    }

    public final void doWrites(QueuedData[] slice) {
        for (QueuedData queued : slice) {
            int tmpTs = queued.getTimestamp();
            if (this.lastWrittenTs <= tmpTs) {
                if (queued.hasData()) {
                    this.write(queued);
                    this.lastWrittenTs = tmpTs;
                    queued.dispose();
                    continue;
                }
                if (!log.isTraceEnabled()) continue;
                log.trace("Queued data was not available");
                continue;
            }
            queued.dispose();
        }
        slice = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void write(int timestamp, IRTMPEvent msg) {
        byte dataType = msg.getDataType();
        log.debug("Write - timestamp: {} type: {}", (Object)timestamp, (Object)dataType);
        IoBuffer data = ((IStreamData)((Object)msg)).getData();
        if (data != null) {
            if (this.startTimestamp == -1) {
                this.startTimestamp = timestamp;
                timestamp = 0;
            } else {
                timestamp -= this.startTimestamp;
            }
            ImmutableTag tag = ImmutableTag.build(dataType, timestamp, data, 0);
            if (tag.getBodySize() > 0 || dataType == 8) {
                try {
                    if (timestamp >= 0) {
                        if (!this.writer.writeTag(tag)) {
                            log.warn("Tag was not written");
                        }
                    } else {
                        log.warn("Skipping message with negative timestamp.");
                    }
                }
                catch (IOException e) {
                    log.error("Error writing tag", (Throwable)e);
                }
                finally {
                    if (data != null) {
                        data.clear();
                        data.free();
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void write(QueuedData queued) {
        byte dataType = queued.getDataType();
        int timestamp = queued.getTimestamp();
        log.debug("Write - timestamp: {} type: {}", (Object)timestamp, (Object)dataType);
        ITag tag = queued.getData();
        if (tag != null && (tag.getBodySize() > 0 || dataType == 8)) {
            if (this.startTimestamp == -1) {
                this.startTimestamp = timestamp;
                timestamp = 0;
            } else {
                timestamp -= this.startTimestamp;
            }
            tag.setTimestamp(timestamp);
            try {
                if (timestamp >= 0) {
                    if (!this.writer.writeTag(tag)) {
                        log.warn("Tag was not written");
                    }
                } else {
                    log.warn("Skipping message with negative timestamp.");
                }
            }
            catch (ClosedChannelException cce) {
                log.error("The writer is no longer able to write to the file: {} writable: {}", (Object)this.file.getName(), (Object)this.file.canWrite());
            }
            catch (IOException e) {
                log.warn("Error writing tag", (Throwable)e);
                if (e.getCause() instanceof ClosedChannelException) {
                    log.error("The writer is no longer able to write to the file: {} writable: {}", (Object)this.file.getName(), (Object)this.file.canWrite());
                }
            }
            finally {
                queued.dispose();
            }
        }
    }

    public void setVideoDecoderConfiguration(IRTMPEvent decoderConfig) {
        if (decoderConfig instanceof IStreamData) {
            IoBuffer data = ((IStreamData)((Object)decoderConfig)).getData().asReadOnlyBuffer();
            this.videoConfigurationTag = ImmutableTag.build(decoderConfig.getDataType(), 0, data, 0);
        }
    }

    public void setAudioDecoderConfiguration(IRTMPEvent decoderConfig) {
        if (decoderConfig instanceof IStreamData) {
            IoBuffer data = ((IStreamData)((Object)decoderConfig)).getData().asReadOnlyBuffer();
            this.audioConfigurationTag = ImmutableTag.build(decoderConfig.getDataType(), 0, data, 0);
        }
    }

    public void setScope(IScope scope) {
        this.scope = scope;
    }

    public void setFile(File file) {
        this.file = file;
    }

    public File getFile() {
        return this.file;
    }

    public void setQueueThreshold(int queueThreshold) {
        this.queueThreshold = queueThreshold;
    }

    public int getQueueThreshold() {
        return this.queueThreshold;
    }

    public boolean isDelayWrite() {
        return this.delayWrite;
    }

    public void setDelayWrite(boolean delayWrite) {
        this.delayWrite = delayWrite;
    }

    public int getSchedulerThreadSize() {
        return this.schedulerThreadSize;
    }

    public void setSchedulerThreadSize(int schedulerThreadSize) {
        this.schedulerThreadSize = schedulerThreadSize;
    }

    public void setMode(String mode) {
        this.mode = mode;
    }

    public void destroy() throws Exception {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdown();
        }
    }

    private static final class QueuedData
    implements Comparable<QueuedData> {
        ITag tag;

        QueuedData(int timestamp, byte dataType) {
            this.tag = ImmutableTag.build(dataType, timestamp);
        }

        QueuedData(int timestamp, byte dataType, IStreamData streamData) {
            this.tag = ImmutableTag.build(dataType, timestamp, streamData.getData());
        }

        public int getTimestamp() {
            return this.tag.getTimestamp();
        }

        public byte getDataType() {
            return this.tag.getDataType();
        }

        public ITag getData() {
            return this.tag;
        }

        public boolean hasData() {
            return this.tag != null;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + this.tag.getDataType();
            result = 31 * result + this.tag.getTimestamp();
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || this.getClass() != obj.getClass()) {
                return false;
            }
            QueuedData other = (QueuedData)obj;
            if (this.tag.getDataType() != other.getDataType()) {
                return false;
            }
            return this.tag.getTimestamp() == other.getTimestamp();
        }

        @Override
        public int compareTo(QueuedData other) {
            if (this.tag.getTimestamp() > other.getTimestamp()) {
                return 1;
            }
            if (this.tag.getTimestamp() < other.getTimestamp()) {
                return -1;
            }
            return 0;
        }

        public void dispose() {
            this.tag = null;
        }
    }
}

