package org.apache.stratos.cartridge.agent.data.publisher.log;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cartridge.agent.data.publisher.DataContext;
import org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration;
import org.wso2.carbon.databridge.commons.StreamDefinition;

/* loaded from: input_file:org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher.class */
public class FileBasedLogPublisher extends LogPublisher implements Runnable {
    private static final Log log = LogFactory.getLog(FileBasedLogPublisher.class);
    private ExecutorService executorService;
    private Process process;
    private Scanner scanner;

    /* loaded from: input_file:org/apache/stratos/cartridge/agent/data/publisher/log/FileBasedLogPublisher$FileBasedLogPublisherTaskThreadFactory.class */
    class FileBasedLogPublisherTaskThreadFactory implements ThreadFactory {
        private String filePath;

        public FileBasedLogPublisherTaskThreadFactory(String str) {
            this.filePath = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "File based log publisher thread  - " + this.filePath);
        }
    }

    public FileBasedLogPublisher(DataPublisherConfiguration dataPublisherConfiguration, StreamDefinition streamDefinition, String str, String str2) {
        super(dataPublisherConfiguration, streamDefinition, str, str2);
        this.executorService = Executors.newSingleThreadExecutor(new FileBasedLogPublisherTaskThreadFactory(str));
    }

    @Override // org.apache.stratos.cartridge.agent.data.publisher.log.LogPublisher
    public void start() {
        this.executorService.submit(this);
    }

    @Override // org.apache.stratos.cartridge.agent.data.publisher.log.LogPublisher
    public void stop() {
        try {
            this.process.getInputStream().close();
        } catch (IOException e) {
            log.error("Error in closing [tail -F] input stream", e);
        }
        this.scanner.close();
        this.process.destroy();
        this.executorService.shutdownNow();
        terminate();
        log.info("Terminated log publisher for file: " + this.filePath);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.process = Runtime.getRuntime().exec(Constants.TAIL_COMMAND + this.filePath);
            log.info("Starting log publisher for file: " + this.filePath + ", thread: " + Thread.currentThread().getName());
            this.scanner = new Scanner(this.process.getInputStream());
            while (this.scanner.hasNextLine()) {
                DataContext dataContext = new DataContext();
                dataContext.setCorrelationData(null);
                dataContext.setMetaData(new Object[]{this.memberId});
                dataContext.setPayloadData(new Object[]{this.scanner.nextLine()});
                publish(dataContext);
            }
        } catch (IOException e) {
            log.error("Error tailing file ", e);
            throw new RuntimeException(e);
        }
    }
}
