/*
 * Decompiled with CFR 0.152.
 */
package org.apache.stratos.cartridge.agent.data.publisher.log;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cartridge.agent.data.publisher.DataContext;
import org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration;
import org.apache.stratos.cartridge.agent.data.publisher.log.Constants;
import org.apache.stratos.cartridge.agent.data.publisher.log.LogPublisher;
import org.wso2.carbon.databridge.commons.StreamDefinition;

public class FileBasedLogPublisher
extends LogPublisher
implements Runnable {
    private static final Log log = LogFactory.getLog(FileBasedLogPublisher.class);
    private ExecutorService executorService;
    private Process process;
    private Scanner scanner;

    public FileBasedLogPublisher(DataPublisherConfiguration dataPublisherConfig, StreamDefinition streamDefinition, String filePath, String memberId, String tenantId, String alias, Long datetime) {
        super(dataPublisherConfig, streamDefinition, filePath, memberId, tenantId, alias, datetime);
        this.executorService = Executors.newSingleThreadExecutor(new FileBasedLogPublisherTaskThreadFactory(filePath));
    }

    @Override
    public void start() {
        this.executorService.submit(this);
    }

    @Override
    public void stop() {
        try {
            this.process.getInputStream().close();
        }
        catch (IOException e) {
            log.error((Object)"Error in closing [tail -F] input stream", (Throwable)e);
        }
        this.scanner.close();
        this.process.destroy();
        this.executorService.shutdownNow();
        this.terminate();
        log.info((Object)("Terminated log publisher for file: " + this.filePath));
    }

    @Override
    public void run() {
        Runtime r = Runtime.getRuntime();
        try {
            this.process = r.exec(Constants.TAIL_COMMAND + this.filePath);
        }
        catch (IOException e) {
            log.error((Object)"Error tailing file ", (Throwable)e);
            throw new RuntimeException(e);
        }
        log.info((Object)("Starting log publisher for file: " + this.filePath + ", thread: " + Thread.currentThread().getName()));
        this.scanner = new Scanner(this.process.getInputStream());
        while (this.scanner.hasNextLine()) {
            DataContext dataContext = new DataContext();
            dataContext.setCorrelationData(null);
            dataContext.setMetaData(new Object[]{this.memberId});
            dataContext.setPayloadData(new Object[]{this.tenantId, this.alias, "", this.datetime, "", this.scanner.nextLine(), "", "", this.memberId, ""});
            this.publish(dataContext);
        }
    }

    class FileBasedLogPublisherTaskThreadFactory
    implements ThreadFactory {
        private String filePath;

        public FileBasedLogPublisherTaskThreadFactory(String filePath) {
            this.filePath = filePath;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "File based log publisher thread  - " + this.filePath);
        }
    }
}

