package org.apache.storm.hive.trident;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.storm.hive.common.HiveOptions;
import org.apache.storm.hive.common.HiveUtils;
import org.apache.storm.hive.common.HiveWriter;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.topology.FailedException;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/hive/trident/HiveState.class */
public class HiveState implements State {
    private static final Logger LOG = LoggerFactory.getLogger(HiveState.class);
    private HiveOptions options;
    private ExecutorService callTimeoutPool;
    private transient Timer heartBeatTimer;
    private Map<HiveEndPoint, HiveWriter> allWriters;
    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
    private Boolean sendHeartBeat = true;
    private UserGroupInformation ugi = null;
    private Boolean kerberosEnabled = false;
    private Integer currentBatchSize = 0;

    public HiveState(HiveOptions hiveOptions) {
        this.options = hiveOptions;
    }

    public void beginCommit(Long l) {
    }

    public void commit(Long l) {
    }

    public void prepare(Map map, IMetricsContext iMetricsContext, int i, int i2) {
        try {
            if (this.options.getKerberosPrincipal() == null && this.options.getKerberosKeytab() == null) {
                this.kerberosEnabled = false;
            } else {
                if (this.options.getKerberosPrincipal() == null || this.options.getKerberosKeytab() == null) {
                    throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal  & KerberosKeytab");
                }
                this.kerberosEnabled = true;
            }
            if (this.kerberosEnabled.booleanValue()) {
                try {
                    this.ugi = HiveUtils.authenticate(this.options.getKerberosKeytab(), this.options.getKerberosPrincipal());
                } catch (HiveUtils.AuthenticationFailed e) {
                    LOG.error("Hive kerberos authentication failed " + e.getMessage(), e);
                    throw new IllegalArgumentException(e);
                }
            }
            this.allWriters = new ConcurrentHashMap();
            this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("hive-bolt-%d").build());
            this.heartBeatTimer = new Timer();
            setupHeartBeatTimer();
        } catch (Exception e2) {
            LOG.warn("unable to make connection to hive ", e2);
        }
    }

    public void updateState(List<TridentTuple> list, TridentCollector tridentCollector) {
        try {
            writeTuples(list);
        } catch (Exception e) {
            abortAndCloseWriters();
            LOG.warn("hive streaming failed.", e);
            throw new FailedException(e);
        }
    }

    private void writeTuples(List<TridentTuple> list) throws Exception {
        for (TridentTuple tridentTuple : list) {
            getOrCreateWriter(HiveUtils.makeEndPoint(this.options.getMapper().mapPartitions(tridentTuple), this.options)).write(this.options.getMapper().mapRecord(tridentTuple));
            Integer num = this.currentBatchSize;
            this.currentBatchSize = Integer.valueOf(this.currentBatchSize.intValue() + 1);
            if (this.currentBatchSize.intValue() >= this.options.getBatchSize().intValue()) {
                flushAllWriters();
                this.currentBatchSize = 0;
            }
        }
    }

    private void abortAndCloseWriters() {
        try {
            this.sendHeartBeat = false;
            abortAllWriters();
            closeAllWriters();
        } catch (Exception e) {
            LOG.warn("unable to close hive connections. ", e);
        }
    }

    private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
        Iterator<Map.Entry<HiveEndPoint, HiveWriter>> it = this.allWriters.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().abort();
        }
    }

    private void closeAllWriters() throws InterruptedException, IOException {
        Iterator<Map.Entry<HiveEndPoint, HiveWriter>> it = this.allWriters.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().close();
        }
        this.allWriters.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupHeartBeatTimer() {
        if (this.options.getHeartBeatInterval().intValue() > 0) {
            this.heartBeatTimer.schedule(new TimerTask() { // from class: org.apache.storm.hive.trident.HiveState.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        if (HiveState.this.sendHeartBeat.booleanValue()) {
                            HiveState.LOG.debug("Start sending heartbeat on all writers");
                            HiveState.this.sendHeartBeatOnAllWriters();
                            HiveState.this.setupHeartBeatTimer();
                        }
                    } catch (Exception e) {
                        HiveState.LOG.warn("Failed to heartbeat on HiveWriter ", e);
                    }
                }
            }, this.options.getHeartBeatInterval().intValue() * 1000);
        }
    }

    private void flushAllWriters() throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
        Iterator<HiveWriter> it = this.allWriters.values().iterator();
        while (it.hasNext()) {
            it.next().flush(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendHeartBeatOnAllWriters() throws InterruptedException {
        Iterator<HiveWriter> it = this.allWriters.values().iterator();
        while (it.hasNext()) {
            it.next().heartBeat();
        }
    }

    private HiveWriter getOrCreateWriter(HiveEndPoint hiveEndPoint) throws HiveWriter.ConnectFailure, InterruptedException {
        try {
            HiveWriter hiveWriter = this.allWriters.get(hiveEndPoint);
            if (hiveWriter == null) {
                LOG.info("Creating Writer to Hive end point : " + hiveEndPoint);
                hiveWriter = HiveUtils.makeHiveWriter(hiveEndPoint, this.callTimeoutPool, this.ugi, this.options);
                if (this.allWriters.size() > this.options.getMaxOpenConnections().intValue() - 1 && retireIdleWriters() == 0) {
                    retireEldestWriter();
                }
                this.allWriters.put(hiveEndPoint, hiveWriter);
            }
            return hiveWriter;
        } catch (HiveWriter.ConnectFailure e) {
            LOG.error("Failed to create HiveWriter for endpoint: " + hiveEndPoint, e);
            throw e;
        }
    }

    private void retireEldestWriter() {
        long currentTimeMillis = System.currentTimeMillis();
        HiveEndPoint hiveEndPoint = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (entry.getValue().getLastUsed() < currentTimeMillis) {
                hiveEndPoint = entry.getKey();
                currentTimeMillis = entry.getValue().getLastUsed();
            }
        }
        try {
            LOG.info("Closing least used Writer to Hive end point : " + hiveEndPoint);
            this.allWriters.remove(hiveEndPoint).close();
        } catch (IOException e) {
            LOG.warn("Failed to close writer for end point: " + hiveEndPoint, e);
        } catch (InterruptedException e2) {
            LOG.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e2);
            Thread.currentThread().interrupt();
        }
    }

    private int retireIdleWriters() {
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (currentTimeMillis - entry.getValue().getLastUsed() > this.options.getIdleTimeout().intValue()) {
                i++;
                arrayList.add(entry.getKey());
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            HiveEndPoint hiveEndPoint = (HiveEndPoint) it.next();
            try {
                LOG.info("Closing idle Writer to Hive end point : {}", hiveEndPoint);
                this.allWriters.remove(hiveEndPoint).close();
            } catch (IOException e) {
                LOG.warn("Failed to close writer for end point: {}. Error: " + hiveEndPoint, e);
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e2);
                Thread.currentThread().interrupt();
            }
        }
        return i;
    }

    public void cleanup() {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                this.sendHeartBeat = false;
                HiveWriter value = entry.getValue();
                LOG.info("Flushing writer to {}", value);
                value.flush(false);
                LOG.info("Closing writer to {}", value);
                value.close();
            } catch (Exception e) {
                LOG.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", e);
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        for (ExecutorService executorService : new ExecutorService[]{this.callTimeoutPool}) {
            executorService.shutdown();
            while (!executorService.isTerminated()) {
                try {
                    executorService.awaitTermination(this.options.getCallTimeOut().intValue(), TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    LOG.warn("shutdown interrupted on " + executorService, e2);
                }
            }
        }
        this.callTimeoutPool = null;
    }
}
