/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.qualitychecker.row;

import com.google.common.base.Strings;
import io.reactivex.Flowable;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.qualitychecker.row.RowLevelErrFileWriter;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicy;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.FlushControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RowLevelPolicyChecker<S, D>
implements Closeable,
FinalState,
RecordStreamProcessor<S, S, D, D>,
SpeculativeAttemptAwareConstruct {
    private static final Logger log = LoggerFactory.getLogger(RowLevelPolicyChecker.class);
    private final List<RowLevelPolicy> list;
    private final String stateId;
    private final FileSystem fs;
    private boolean errFileOpen;
    private final FrontLoadedSampler sampler;
    private RowLevelErrFileWriter writer;
    private final RowLevelPolicyCheckResults results;
    private boolean allowSpeculativeExecWhenWriteErrFile;
    static final String ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY = "allowSpeculativeExecutionWithErrFilePolicy";

    @Override
    public boolean isSpeculativeAttemptSafe() {
        return this.list.stream().noneMatch(x -> x.getType().equals((Object)RowLevelPolicy.Type.ERR_FILE)) || this.allowSpeculativeExecWhenWriteErrFile;
    }

    public RowLevelPolicyChecker(List<RowLevelPolicy> list, String stateId, FileSystem fs) {
        this(list, stateId, fs, new State());
    }

    public RowLevelPolicyChecker(List<RowLevelPolicy> list, String stateId, FileSystem fs, State state) {
        this.list = list;
        this.stateId = stateId;
        this.fs = fs;
        this.errFileOpen = false;
        this.results = new RowLevelPolicyCheckResults();
        this.sampler = new FrontLoadedSampler(state.getPropAsLong("qualitychecker.row.errFile.recordsPerTask", 1000000L), 1.5);
        this.allowSpeculativeExecWhenWriteErrFile = state.getPropAsBoolean(ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY, true);
    }

    public boolean executePolicies(Object record, RowLevelPolicyCheckResults results) throws IOException {
        for (RowLevelPolicy p : this.list) {
            RowLevelPolicy.Result result = p.executePolicy(record);
            results.put(p, result);
            if (this.checkResult(result, p, record)) continue;
            return false;
        }
        return true;
    }

    protected boolean checkResult(RowLevelPolicy.Result checkResult, RowLevelPolicy p, Object record) throws IOException {
        boolean result = true;
        if (checkResult.equals((Object)RowLevelPolicy.Result.FAILED)) {
            if (p.getType().equals((Object)RowLevelPolicy.Type.FAIL)) {
                throw new RuntimeException("RowLevelPolicy " + p + " failed on record " + record);
            }
            if (p.getType().equals((Object)RowLevelPolicy.Type.ERR_FILE) && this.sampler.acceptNext()) {
                if (!this.errFileOpen) {
                    this.writer = new RowLevelErrFileWriter(this.fs);
                    this.writer.open(this.getErrFilePath(p));
                    this.writer.write(record);
                } else {
                    this.writer.write(record);
                }
                this.errFileOpen = true;
            }
            result = false;
        }
        return result;
    }

    Path getErrFilePath(RowLevelPolicy policy) {
        String errFileName = HadoopUtils.sanitizePath((String)policy.toString(), (String)"-");
        if (!Strings.isNullOrEmpty((String)this.stateId)) {
            errFileName = errFileName + "-" + this.stateId;
        }
        if (this.allowSpeculativeExecWhenWriteErrFile) {
            errFileName = errFileName + "-" + System.currentTimeMillis();
        }
        errFileName = errFileName + ".err";
        return new Path(policy.getErrFileLocation(), errFileName);
    }

    @Override
    public void close() throws IOException {
        if (this.errFileOpen) {
            this.writer.close();
            this.errFileOpen = false;
        }
    }

    public State getFinalState() {
        State state = new State();
        for (RowLevelPolicy policy : this.list) {
            state.addAll(policy.getFinalState());
        }
        return state;
    }

    public RecordStreamWithMetadata<D, S> processStream(RecordStreamWithMetadata<D, S> inputStream, WorkUnitState state) {
        Flowable filteredStream = inputStream.getRecordStream().filter(r -> {
            if (r instanceof ControlMessage) {
                this.getMessageHandler().handleMessage((ControlMessage)r);
                return true;
            }
            if (r instanceof RecordEnvelope) {
                boolean accept = this.executePolicies(((RecordEnvelope)r).getRecord(), this.results);
                if (!accept) {
                    r.ack();
                }
                return accept;
            }
            return true;
        });
        filteredStream = filteredStream.doFinally(this::close);
        return inputStream.withRecordStream(filteredStream);
    }

    protected ControlMessageHandler getMessageHandler() {
        return new ControlMessageHandler(){

            public void handleMessage(ControlMessage message) {
                if (message instanceof FlushControlMessage) {
                    try {
                        RowLevelPolicyChecker.this.close();
                    }
                    catch (IOException ioe) {
                        log.error("Failed to close errFile", (Throwable)ioe);
                    }
                }
            }
        };
    }

    public List<RowLevelPolicy> getList() {
        return this.list;
    }

    public RowLevelPolicyCheckResults getResults() {
        return this.results;
    }

    @ThreadSafe
    static class FrontLoadedSampler {
        private final long targetRecordsAccepted;
        private final double decayFactor;
        private final AtomicLong errorRecords = new AtomicLong();
        private final AtomicLong nextErrorRecordWritten = new AtomicLong();

        public FrontLoadedSampler(long targetRecordsAccepted, double decayFactor) {
            this.targetRecordsAccepted = targetRecordsAccepted;
            this.decayFactor = Math.max(1.0, decayFactor);
            if (this.targetRecordsAccepted <= 0L) {
                this.nextErrorRecordWritten.set(Long.MAX_VALUE);
            }
        }

        public boolean acceptNext() {
            long recordNum = this.errorRecords.getAndIncrement();
            while (recordNum >= this.nextErrorRecordWritten.get()) {
                if (!this.nextErrorRecordWritten.compareAndSet(recordNum, this.computeNextErrorRecordWritten())) continue;
                return true;
            }
            return false;
        }

        private long computeNextErrorRecordWritten() {
            long current = this.nextErrorRecordWritten.get();
            if (current < this.targetRecordsAccepted) {
                return current + 1L;
            }
            return (long)(this.decayFactor * (double)current) + 1L;
        }
    }
}

