/*
 * Decompiled with CFR 0.152.
 */
package io.antmedia.streamsource;

import io.antmedia.AntMediaApplicationAdapter;
import io.antmedia.AppSettings;
import io.antmedia.datastore.db.DataStore;
import io.antmedia.datastore.db.types.Broadcast;
import io.antmedia.datastore.db.types.BroadcastUpdate;
import io.antmedia.licence.ILicenceService;
import io.antmedia.rest.model.Result;
import io.antmedia.shutdown.AMSShutdownManager;
import io.antmedia.streamsource.StreamFetcher;
import io.vertx.core.Vertx;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.red5.server.api.scope.IScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamFetcherManager {
    protected static Logger logger = LoggerFactory.getLogger(StreamFetcherManager.class);
    private int streamCheckerCount = 0;
    private Map<String, StreamFetcher> streamFetcherList = new ConcurrentHashMap<String, StreamFetcher>();
    private int streamCheckerIntervalMs = 10000;
    private DataStore datastore;
    private IScope scope;
    private long streamFetcherScheduleJobName = -1L;
    protected AtomicBoolean isJobRunning = new AtomicBoolean(false);
    private boolean restartStreamAutomatically = true;
    private Vertx vertx;
    private int lastRestartCount;
    private AppSettings appSettings;
    private ILicenceService licenseService;
    boolean serverShuttingDown = false;

    public StreamFetcherManager(Vertx vertx, DataStore datastore, IScope scope) {
        this.vertx = vertx;
        this.datastore = datastore;
        this.scope = scope;
        this.appSettings = (AppSettings)scope.getContext().getBean("app.settings");
        this.licenseService = (ILicenceService)scope.getContext().getBean(ILicenceService.BeanName.LICENCE_SERVICE.toString());
        AMSShutdownManager.getInstance().subscribe(() -> this.shuttingDown());
    }

    public void shuttingDown() {
        this.serverShuttingDown = true;
    }

    public StreamFetcher make(Broadcast stream, IScope scope, Vertx vertx) {
        return new StreamFetcher(stream.getStreamUrl(), stream.getStreamId(), stream.getType(), scope, vertx, stream.getSeekTimeInMs());
    }

    public int getStreamCheckerInterval() {
        return this.streamCheckerIntervalMs;
    }

    public void testSetStreamCheckerInterval(int streamCheckerInterval) {
        this.streamCheckerIntervalMs = streamCheckerInterval;
    }

    public boolean isStreamRunning(Broadcast broadcast) {
        boolean isStreamLive = false;
        if (this.streamFetcherList.containsKey(broadcast.getStreamId())) {
            logger.info("Stream is still on FetcherManagerList so it's active for streamId:{}", (Object)broadcast.getStreamId());
            isStreamLive = true;
        }
        if (!isStreamLive) {
            isStreamLive = AntMediaApplicationAdapter.isStreaming(broadcast);
        }
        return isStreamLive;
    }

    public Result startStreamScheduler(StreamFetcher streamScheduler) {
        Result result = new Result(false);
        result.setDataId(streamScheduler.getStreamId());
        if (!this.licenseService.isLicenceSuspended()) {
            logger.info("Starting stream fetcher for streamId:{}", (Object)streamScheduler.getStreamId());
            streamScheduler.startStream();
            if (this.streamFetcherList.containsKey(streamScheduler.getStreamId())) {
                logger.warn("There is already a stream schedule exists for streamId:{} ", (Object)streamScheduler.getStreamId());
            }
            this.streamFetcherList.put(streamScheduler.getStreamId(), streamScheduler);
            if (this.streamFetcherScheduleJobName == -1L) {
                this.scheduleStreamFetcherJob();
            }
            result.setSuccess(true);
        } else {
            logger.error("License is suspend and new stream scheduler is not started {}", (Object)streamScheduler.getStreamUrl());
            result.setMessage("License is suspended");
        }
        return result;
    }

    public Result startStreaming(@Nonnull Broadcast broadcast) {
        boolean alreadyFetching = this.isStreamRunning(broadcast);
        StreamFetcher streamScheduler = null;
        Result result = new Result(false);
        if (!alreadyFetching) {
            try {
                streamScheduler = this.make(broadcast, this.scope, this.vertx);
                streamScheduler.setRestartStream(this.restartStreamAutomatically);
                streamScheduler.setDataStore(this.getDatastore());
                result = this.startStreamScheduler(streamScheduler);
            }
            catch (Exception e) {
                logger.error(ExceptionUtils.getStackTrace((Throwable)e));
                result.setMessage("Problem occured while fetching the stream");
            }
        } else {
            logger.info("Stream is already active for streamId:{}", (Object)broadcast.getStreamId());
            result.setMessage("Stream is already active. It's already streaming or trying to connect");
        }
        return result;
    }

    public Result stopStreaming(String streamId) {
        StreamFetcher scheduler;
        logger.warn("inside of stopStreaming for {}", (Object)streamId);
        Result result = new Result(false);
        if (StringUtils.isNotBlank((CharSequence)streamId) && (scheduler = this.streamFetcherList.remove(streamId)) != null) {
            scheduler.stopStream();
            result.setSuccess(true);
        }
        result.setMessage((String)(result.isSuccess() ? "Stream stopped" : "No matching stream source in this server:" + streamId));
        result.setDataId(streamId);
        return result;
    }

    public void stopCheckerJob() {
        if (this.streamFetcherScheduleJobName != -1L) {
            this.vertx.cancelTimer(this.streamFetcherScheduleJobName);
            this.streamFetcherScheduleJobName = -1L;
        }
    }

    public static Result checkStreamUrlWithHTTP(String url) {
        Result result = new Result(false);
        try {
            URL checkUrl = new URL(url);
            HttpURLConnection huc = (HttpURLConnection)checkUrl.openConnection();
            int responseCode = huc.getResponseCode();
            if (responseCode >= 200 && responseCode < 301) {
                result.setSuccess(true);
                return result;
            }
            result.setSuccess(false);
            result.setMessage("URL " + url + "responded:" + responseCode);
            return result;
        }
        catch (IOException e) {
            result.setSuccess(false);
            return result;
        }
    }

    public void playNextItemInList(String streamId, StreamFetcher.IStreamFetcherListener listener) {
        Broadcast playlist = this.datastore.get(streamId);
        if (playlist != null) {
            this.playItemInList(playlist, listener, -1);
        }
    }

    public Result playItemInList(Broadcast playlist, StreamFetcher.IStreamFetcherListener listener, int index) {
        this.stopStreaming(playlist.getStreamId());
        Result result = new Result(false);
        if (this.serverShuttingDown) {
            logger.info("Playlist will not try to play the next item because server is shutting down");
            result.setMessage("Playlist will not try to play the next item because server is shutting down");
            return result;
        }
        if (!"finished".equals(playlist.getPlayListStatus()) && this.skipNextPlaylistQueue(playlist, index) != null) {
            int currentStreamIndex = playlist.getCurrentPlayIndex();
            if (StreamFetcherManager.checkStreamUrlWithHTTP(playlist.getPlayListItemList().get(currentStreamIndex).getStreamUrl()).isSuccess()) {
                Broadcast.PlayListItem fetchedBroadcast = playlist.getPlayListItemList().get(currentStreamIndex);
                BroadcastUpdate broadcastUpdate = new BroadcastUpdate();
                broadcastUpdate.setPlayListStatus(playlist.getPlayListStatus());
                broadcastUpdate.setCurrentPlayIndex(playlist.getCurrentPlayIndex());
                this.datastore.updateBroadcastFields(playlist.getStreamId(), broadcastUpdate);
                StreamFetcher newStreamScheduler = new StreamFetcher(fetchedBroadcast.getStreamUrl(), playlist.getStreamId(), fetchedBroadcast.getType(), this.scope, this.vertx, fetchedBroadcast.getSeekTimeInMs());
                newStreamScheduler.setStreamFetcherListener(listener);
                newStreamScheduler.setRestartStream(false);
                result = this.startStreamScheduler(newStreamScheduler);
            } else {
                logger.info("Current Playlist Stream URL -> {} is invalid", (Object)playlist.getPlayListItemList().get(currentStreamIndex).getStreamUrl());
                playlist = this.skipNextPlaylistQueue(playlist, -1);
                result = this.startPlaylist(playlist);
            }
        } else {
            result.setMessage("Playlist is either stopped or there is no item to play");
        }
        return result;
    }

    public Result startPlaylist(Broadcast playlist) {
        Result result = new Result(false);
        List<Broadcast.PlayListItem> playListItemList = playlist.getPlayListItemList();
        if (this.isStreamRunning(playlist)) {
            logger.warn("Playlist is already running for stream:{}", (Object)playlist.getStreamId());
            String msg = "Playlist is already running for stream:" + playlist.getStreamId();
            logger.warn(msg);
            result.setMessage(msg);
        } else if (playListItemList != null && !playListItemList.isEmpty()) {
            Broadcast.PlayListItem playlistBroadcastItem;
            if (playlist.getCurrentPlayIndex() >= playlist.getPlayListItemList().size() || playlist.getCurrentPlayIndex() < 0) {
                logger.warn("Resetting current play index to 0 because it's not in correct range for id: {}", (Object)playlist.getStreamId());
                playlist.setCurrentPlayIndex(0);
            }
            if (StreamFetcherManager.checkStreamUrlWithHTTP((playlistBroadcastItem = playlist.getPlayListItemList().get(playlist.getCurrentPlayIndex())).getStreamUrl()).isSuccess()) {
                logger.info("Starting playlist item:{} for streamId:{}", (Object)playlistBroadcastItem.getStreamUrl(), (Object)playlist.getStreamId());
                StreamFetcher streamScheduler = new StreamFetcher(playlistBroadcastItem.getStreamUrl(), playlist.getStreamId(), playlistBroadcastItem.getType(), this.scope, this.vertx, playlistBroadcastItem.getSeekTimeInMs());
                playlist.setPlayListStatus("broadcasting");
                BroadcastUpdate broadcastUpdate = new BroadcastUpdate();
                broadcastUpdate.setPlayListStatus(playlist.getPlayListStatus());
                broadcastUpdate.setCurrentPlayIndex(playlist.getCurrentPlayIndex());
                this.datastore.updateBroadcastFields(playlist.getStreamId(), broadcastUpdate);
                String streamId = playlist.getStreamId();
                streamScheduler.setStreamFetcherListener(listener -> this.playNextItemInList(streamId, listener));
                streamScheduler.setRestartStream(false);
                this.startStreamScheduler(streamScheduler);
                result.setSuccess(true);
            } else {
                logger.warn("Current Playlist Stream URL -> {} is invalid", (Object)playlistBroadcastItem.getStreamUrl());
                playlist = this.skipNextPlaylistQueue(playlist, -1);
                if (StreamFetcherManager.checkStreamUrlWithHTTP(playlist.getPlayListItemList().get(playlist.getCurrentPlayIndex()).getStreamUrl()).isSuccess()) {
                    result = this.startPlaylist(playlist);
                } else {
                    playlist.setStatus("finished");
                    BroadcastUpdate broadcastUpdate = new BroadcastUpdate();
                    broadcastUpdate.setStatus(playlist.getStatus());
                    broadcastUpdate.setCurrentPlayIndex(playlist.getCurrentPlayIndex());
                    this.datastore.updateBroadcastFields(playlist.getStreamId(), broadcastUpdate);
                    result.setSuccess(false);
                }
            }
        } else {
            String msg = "There is no playlist  for stream id:" + playlist.getStreamId();
            logger.warn(msg);
            result.setMessage(msg);
        }
        return result;
    }

    public Broadcast skipNextPlaylistQueue(Broadcast playlist, int index) {
        int currentStreamIndex = index;
        if (index < 0) {
            currentStreamIndex = playlist.getCurrentPlayIndex() + 1;
        }
        if (playlist.getPlayListItemList().size() <= currentStreamIndex) {
            playlist.setCurrentPlayIndex(0);
            if (!playlist.isPlaylistLoopEnabled()) {
                logger.info("Play list looping is not enabled. It will be stopped for stream: {}", (Object)playlist.getStreamId());
                playlist.setPlayListStatus("finished");
                BroadcastUpdate broadcastUpdate = new BroadcastUpdate();
                broadcastUpdate.setPlayListStatus(playlist.getPlayListStatus());
                broadcastUpdate.setCurrentPlayIndex(playlist.getCurrentPlayIndex());
                this.datastore.updateBroadcastFields(playlist.getStreamId(), broadcastUpdate);
                return null;
            }
            logger.info("Playlist has finished and playlist loop is enabled so setting index to 0 for playlist:{}", (Object)playlist.getStreamId());
        } else {
            playlist.setCurrentPlayIndex(currentStreamIndex);
        }
        logger.info("Next index to play in play list is {} for stream: {}", (Object)playlist.getCurrentPlayIndex(), (Object)playlist.getStreamId());
        return playlist;
    }

    private void scheduleStreamFetcherJob() {
        if (this.streamFetcherScheduleJobName != -1L) {
            this.vertx.cancelTimer(this.streamFetcherScheduleJobName);
        }
        this.streamFetcherScheduleJobName = this.vertx.setPeriodic((long)this.streamCheckerIntervalMs, l -> {
            if (!this.streamFetcherList.isEmpty()) {
                boolean restart;
                ++this.streamCheckerCount;
                logger.debug("StreamFetcher Check Count:{}", (Object)this.streamCheckerCount);
                int countToRestart = 0;
                int restartStreamFetcherPeriodSeconds = this.appSettings.getRestartStreamFetcherPeriod();
                if (restartStreamFetcherPeriodSeconds > 0) {
                    int streamCheckIntervalSec = this.streamCheckerIntervalMs / 1000;
                    countToRestart = this.streamCheckerCount * streamCheckIntervalSec / restartStreamFetcherPeriodSeconds;
                }
                boolean bl = restart = countToRestart > this.lastRestartCount;
                if (restart) {
                    this.lastRestartCount = countToRestart;
                    logger.info("This is {} times that restarting streams", (Object)this.lastRestartCount);
                }
                this.controlStreamFetchers(restart);
            }
        });
        logger.info("StreamFetcherSchedule job name {}", (Object)this.streamFetcherScheduleJobName);
    }

    public boolean isToBeStoppedAutomatically(Broadcast broadcast) {
        logger.info("broadcast is autoStartStopEnabled:{} isAnyonewatching:{} startTime:{} streamCheckerIntervalMs:{}", new Object[]{broadcast.isAutoStartStopEnabled(), broadcast.isAnyoneWatching(), broadcast.getStartTime(), this.streamCheckerIntervalMs});
        return broadcast.isAutoStartStopEnabled() && !broadcast.isAnyoneWatching() && broadcast.getStartTime() != 0L && System.currentTimeMillis() > broadcast.getStartTime() + (long)this.streamCheckerIntervalMs;
    }

    public void controlStreamFetchers(boolean restart) {
        for (StreamFetcher streamScheduler : this.streamFetcherList.values()) {
            Broadcast broadcast = this.datastore.get(streamScheduler.getStreamId());
            if (broadcast != null && "playlist".equals(broadcast.getType())) continue;
            boolean autoStop = false;
            if (restart || broadcast == null || (autoStop = this.isToBeStoppedAutomatically(broadcast))) {
                logger.info("Calling stop stream {} due to restart -> {}, broadcast is null -> {}, auto stop because no viewer -> {}", new Object[]{streamScheduler.getStreamId(), restart, broadcast == null, autoStop});
                this.stopStreaming(streamScheduler.getStreamId());
            } else {
                logger.info("Stream:{} is alive -> {}, is it blocked -> {}", new Object[]{streamScheduler.getStreamId(), streamScheduler.isStreamAlive(), streamScheduler.isStreamBlocked()});
            }
            if (!restart || broadcast == null) continue;
            if (this.isStreamRunning(broadcast)) {
                logger.info("Setting stream fetcher listener to restart when it's finished for streamId:{}", (Object)broadcast.getStreamId());
                streamScheduler.setStreamFetcherListener(l -> {
                    Broadcast freshBroadcast = this.datastore.get(streamScheduler.getStreamId());
                    if (freshBroadcast != null) {
                        this.startStreaming(freshBroadcast);
                    }
                });
                continue;
            }
            this.startStreaming(broadcast);
        }
    }

    public DataStore getDatastore() {
        return this.datastore;
    }

    public void setDatastore(DataStore datastore) {
        this.datastore = datastore;
    }

    public Map<String, StreamFetcher> getStreamFetcherList() {
        return this.streamFetcherList;
    }

    public StreamFetcher getStreamFetcher(String streamId) {
        return this.streamFetcherList.get(streamId);
    }

    public void setStreamFetcherList(Map<String, StreamFetcher> streamFetcherList) {
        this.streamFetcherList = streamFetcherList;
    }

    public boolean isRestartStreamAutomatically() {
        return this.restartStreamAutomatically;
    }

    public void setRestartStreamAutomatically(boolean restartStreamAutomatically) {
        this.restartStreamAutomatically = restartStreamAutomatically;
    }

    public int getStreamCheckerCount() {
        return this.streamCheckerCount;
    }

    public void setStreamCheckerCount(int streamCheckerCount) {
        this.streamCheckerCount = streamCheckerCount;
    }

    public Result stopPlayList(String streamId) {
        logger.info("Stopping playlist for stream: {}", (Object)streamId);
        Result result = this.stopStreaming(streamId);
        if (result.isSuccess()) {
            result = new Result(false);
            Broadcast broadcast = this.datastore.get(streamId);
            if (broadcast != null && "playlist".equals(broadcast.getType())) {
                broadcast.setPlayListStatus("finished");
                BroadcastUpdate broadcastUpdate = new BroadcastUpdate();
                broadcastUpdate.setPlayListStatus(broadcast.getPlayListStatus());
                result.setSuccess(this.datastore.updateBroadcastFields(streamId, broadcastUpdate));
            } else {
                String msg = "Broadcast's type is not play list for stream:" + streamId;
                result.setMessage(msg);
                result.setDataId(streamId);
                logger.error(msg);
            }
        } else {
            logger.warn("Stop streamming returned false for stream:{} message:{}", (Object)streamId, (Object)result.getMessage());
        }
        return result;
    }
}

