/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.writer;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.Retriable;
import org.apache.gobblin.writer.WatermarkAwareWriter;
import org.apache.gobblin.writer.WatermarkAwareWriterWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetryWriter<D>
extends WatermarkAwareWriterWrapper<D>
implements DataWriter<D>,
FinalState,
SpeculativeAttemptAwareConstruct {
    private static final Logger LOG = LoggerFactory.getLogger(RetryWriter.class);
    public static final String RETRY_CONF_PREFIX = "gobblin.writer.retry.";
    public static final String RETRY_WRITER_ENABLED = "gobblin.writer.retry.enabled";
    public static final String FAILED_RETRY_WRITES_METER = "gobblin.writer.retry.failed_writes";
    public static final String RETRY_MULTIPLIER = "gobblin.writer.retry.multiplier";
    public static final String RETRY_MAX_WAIT_MS_PER_INTERVAL = "gobblin.writer.retry.max_wait_ms_per_interval";
    public static final String RETRY_MAX_ATTEMPTS = "gobblin.writer.retry.max_attempts";
    public static final String FAILED_WRITES_KEY = "FailedWrites";
    private final DataWriter<D> writer;
    private final Retryer<Void> retryer;
    private long failedWrites;

    public RetryWriter(DataWriter<D> writer, State state) {
        this.writer = writer;
        this.retryer = this.buildRetryer(state);
        if (this.writer instanceof WatermarkAwareWriter) {
            this.setWatermarkAwareWriter((WatermarkAwareWriter)this.writer);
        }
    }

    private Retryer<Void> buildRetryer(State state) {
        RetryerBuilder<Void> builder = null;
        builder = this.writer instanceof Retriable ? ((Retriable)this.writer).getRetryerBuilder() : RetryWriter.createRetryBuilder(state);
        if (GobblinMetrics.isEnabled((State)state)) {
            final Optional retryMeter = Optional.of((Object)Instrumented.getMetricContext((State)state, this.getClass()).meter(FAILED_RETRY_WRITES_METER));
            builder.withRetryListener(new RetryListener(){

                public <V> void onRetry(Attempt<V> attempt) {
                    if (attempt.hasException()) {
                        LOG.warn("Caught exception. This may be retried.", attempt.getExceptionCause());
                        Instrumented.markMeter((Optional)retryMeter);
                        RetryWriter.this.failedWrites++;
                    }
                }
            });
        }
        return builder.build();
    }

    public void close() throws IOException {
        this.writer.close();
    }

    public void writeEnvelope(final RecordEnvelope<D> recordEnvelope) throws IOException {
        Callable<Void> writeCall = new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                RetryWriter.this.writer.writeEnvelope(recordEnvelope);
                return null;
            }
        };
        this.callWithRetry(writeCall);
    }

    public void commit() throws IOException {
        Callable<Void> commitCall = new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                RetryWriter.this.writer.commit();
                return null;
            }
        };
        this.callWithRetry(commitCall);
    }

    private void callWithRetry(Callable<Void> callable) throws IOException {
        try {
            this.retryer.wrap(callable).call();
        }
        catch (RetryException | ExecutionException e) {
            throw new IOException(e);
        }
    }

    public void cleanup() throws IOException {
        this.writer.cleanup();
    }

    public long recordsWritten() {
        return this.writer.recordsWritten();
    }

    public long bytesWritten() throws IOException {
        return this.writer.bytesWritten();
    }

    public static RetryerBuilder<Void> createRetryBuilder(State state) {
        Predicate<Throwable> transients = new Predicate<Throwable>(){

            public boolean apply(Throwable t) {
                return !(t instanceof NonTransientException);
            }
        };
        long multiplier = state.getPropAsLong(RETRY_MULTIPLIER, 500L);
        long maxWaitMsPerInterval = state.getPropAsLong(RETRY_MAX_WAIT_MS_PER_INTERVAL, 10000L);
        int maxAttempts = state.getPropAsInt(RETRY_MAX_ATTEMPTS, 5);
        return RetryerBuilder.newBuilder().retryIfException((Predicate)transients).withWaitStrategy(WaitStrategies.exponentialWait((long)multiplier, (long)maxWaitMsPerInterval, (TimeUnit)TimeUnit.MILLISECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt((int)maxAttempts));
    }

    @Override
    public boolean isSpeculativeAttemptSafe() {
        if (this.writer instanceof SpeculativeAttemptAwareConstruct) {
            return ((SpeculativeAttemptAwareConstruct)this.writer).isSpeculativeAttemptSafe();
        }
        return false;
    }

    public State getFinalState() {
        State state = new State();
        if (this.writer instanceof FinalState) {
            state.addAll(((FinalState)this.writer).getFinalState());
        } else {
            LOG.warn("Wrapped writer does not implement FinalState: " + this.writer.getClass());
        }
        state.setProp(FAILED_WRITES_KEY, (Object)this.failedWrites);
        return state;
    }

    public ControlMessageHandler getMessageHandler() {
        return this.writer.getMessageHandler();
    }

    public void flush() throws IOException {
        this.writer.flush();
    }
}

