/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.logging;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientResponseObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;

public class BeamFnLoggingClient
implements AutoCloseable {
    private static final String ROOT_LOGGER_NAME = "";
    private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity.Enum> LOG_LEVEL_MAP = ImmutableMap.builder().put(Level.SEVERE, BeamFnApi.LogEntry.Severity.Enum.ERROR).put(Level.WARNING, BeamFnApi.LogEntry.Severity.Enum.WARN).put(Level.INFO, BeamFnApi.LogEntry.Severity.Enum.INFO).put(Level.FINE, BeamFnApi.LogEntry.Severity.Enum.DEBUG).put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE).build();
    private static final ImmutableMap<SdkHarnessOptions.LogLevel, Level> LEVEL_CONFIGURATION = ImmutableMap.builder().put(SdkHarnessOptions.LogLevel.OFF, Level.OFF).put(SdkHarnessOptions.LogLevel.ERROR, Level.SEVERE).put(SdkHarnessOptions.LogLevel.WARN, Level.WARNING).put(SdkHarnessOptions.LogLevel.INFO, Level.INFO).put(SdkHarnessOptions.LogLevel.DEBUG, Level.FINE).put(SdkHarnessOptions.LogLevel.TRACE, Level.FINEST).build();
    private static final Formatter FORMATTER = new SimpleFormatter();
    private static final int MAX_BUFFERED_LOG_ENTRY_COUNT = 10000;
    private static final Object COMPLETED = new Object();
    private final Collection<Logger> configuredLoggers;
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final ManagedChannel channel;
    private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
    private final LogControlObserver inboundObserver;
    private final LogRecordHandler logRecordHandler;
    private final CompletableFuture<Object> inboundObserverCompletion;
    private final Phaser phaser;

    public BeamFnLoggingClient(PipelineOptions options, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory) {
        this.apiServiceDescriptor = apiServiceDescriptor;
        this.inboundObserverCompletion = new CompletableFuture();
        this.configuredLoggers = new ArrayList<Logger>();
        this.phaser = new Phaser(1);
        this.channel = channelFactory.apply(apiServiceDescriptor);
        LogManager logManager = LogManager.getLogManager();
        logManager.reset();
        Logger rootLogger = logManager.getLogger(ROOT_LOGGER_NAME);
        for (Handler handler : rootLogger.getHandlers()) {
            rootLogger.removeHandler(handler);
        }
        SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
        if (loggingOptions.getDefaultSdkHarnessLogLevel() != null) {
            rootLogger.setLevel(LEVEL_CONFIGURATION.get((Object)loggingOptions.getDefaultSdkHarnessLogLevel()));
        }
        if (loggingOptions.getSdkHarnessLogLevelOverrides() != null) {
            for (Map.Entry loggerOverride : loggingOptions.getSdkHarnessLogLevelOverrides().entrySet()) {
                Logger logger = Logger.getLogger((String)loggerOverride.getKey());
                logger.setLevel(LEVEL_CONFIGURATION.get(loggerOverride.getValue()));
                this.configuredLoggers.add(logger);
            }
        }
        BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(this.channel);
        this.inboundObserver = new LogControlObserver();
        this.logRecordHandler = new LogRecordHandler(options.as(GcsOptions.class).getExecutorService());
        this.logRecordHandler.setLevel(Level.ALL);
        this.outboundObserver = (CallStreamObserver)stub.logging(this.inboundObserver);
        rootLogger.addHandler(this.logRecordHandler);
    }

    @Override
    public void close() throws Exception {
        try {
            for (Logger logger : this.configuredLoggers) {
                logger.setLevel(null);
            }
            this.configuredLoggers.clear();
            LogManager.getLogManager().readConfiguration();
            this.logRecordHandler.close();
            this.inboundObserverCompletion.get();
        }
        finally {
            this.channel.shutdown();
            if (!this.channel.awaitTermination(10L, TimeUnit.SECONDS)) {
                this.channel.shutdownNow();
            }
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper(BeamFnLoggingClient.class).add("apiServiceDescriptor", this.apiServiceDescriptor).toString();
    }

    private class LogControlObserver
    implements ClientResponseObserver<BeamFnApi.LogEntry, BeamFnApi.LogControl> {
        private LogControlObserver() {
        }

        @Override
        public void beforeStart(ClientCallStreamObserver requestStream) {
            requestStream.setOnReadyHandler(BeamFnLoggingClient.this.phaser::arrive);
        }

        @Override
        public void onNext(BeamFnApi.LogControl value) {
        }

        @Override
        public void onError(Throwable t) {
            BeamFnLoggingClient.this.inboundObserverCompletion.completeExceptionally(t);
        }

        @Override
        public void onCompleted() {
            BeamFnLoggingClient.this.inboundObserverCompletion.complete(COMPLETED);
        }
    }

    private class LogRecordHandler
    extends Handler
    implements Runnable {
        private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries = new LinkedBlockingDeque<BeamFnApi.LogEntry>(10000);
        private final Future<?> bufferedLogWriter;
        private Thread logEntryHandlerThread;

        private LogRecordHandler(ExecutorService executorService) {
            this.bufferedLogWriter = executorService.submit(this);
        }

        @Override
        public void publish(LogRecord record) {
            BeamFnApi.LogEntry.Severity.Enum severity = (BeamFnApi.LogEntry.Severity.Enum)LOG_LEVEL_MAP.get(record.getLevel());
            if (severity == null) {
                return;
            }
            BeamFnApi.LogEntry.Builder builder = BeamFnApi.LogEntry.newBuilder().setSeverity(severity).setLogLocation(record.getLoggerName()).setMessage(FORMATTER.formatMessage(record)).setThread(Integer.toString(record.getThreadID())).setTimestamp(Timestamp.newBuilder().setSeconds(record.getMillis() / 1000L).setNanos((int)(record.getMillis() % 1000L) * 1000000));
            if (record.getThrown() != null) {
                builder.setTrace(Throwables.getStackTraceAsString(record.getThrown()));
            }
            if (Thread.currentThread() != this.logEntryHandlerThread) {
                try {
                    this.bufferedLogEntries.put(builder.build());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            } else {
                this.bufferedLogEntries.offer(builder.build());
            }
        }

        @Override
        public void run() {
            this.logEntryHandlerThread = Thread.currentThread();
            ArrayList additionalLogEntries = new ArrayList(10000);
            Throwable thrown = null;
            try {
                while (!BeamFnLoggingClient.this.phaser.isTerminated()) {
                    BeamFnApi.LogEntry logEntry = this.bufferedLogEntries.poll(1L, TimeUnit.SECONDS);
                    if (logEntry == null) continue;
                    int phase = BeamFnLoggingClient.this.phaser.getPhase();
                    if (!BeamFnLoggingClient.this.outboundObserver.isReady()) {
                        BeamFnLoggingClient.this.phaser.awaitAdvance(phase);
                    }
                    BeamFnApi.LogEntry.List.Builder builder = BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
                    this.bufferedLogEntries.drainTo(additionalLogEntries);
                    builder.addAllLogEntries(additionalLogEntries);
                    BeamFnLoggingClient.this.outboundObserver.onNext(builder.build());
                    additionalLogEntries.clear();
                }
                this.bufferedLogEntries.drainTo(additionalLogEntries);
                if (!additionalLogEntries.isEmpty()) {
                    BeamFnLoggingClient.this.outboundObserver.onNext(BeamFnApi.LogEntry.List.newBuilder().addAllLogEntries(additionalLogEntries).build());
                }
            }
            catch (Throwable t) {
                thrown = t;
            }
            if (thrown != null) {
                BeamFnLoggingClient.this.outboundObserver.onError(Status.INTERNAL.withDescription(Throwables.getStackTraceAsString(thrown)).asException());
                throw new IllegalStateException(thrown);
            }
            BeamFnLoggingClient.this.outboundObserver.onCompleted();
        }

        @Override
        public void flush() {
        }

        @Override
        public synchronized void close() {
            if (BeamFnLoggingClient.this.phaser.isTerminated()) {
                return;
            }
            BeamFnLoggingClient.this.phaser.forceTermination();
            try {
                this.bufferedLogWriter.get();
            }
            catch (CancellationException cancellationException) {
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

