/*
 * Decompiled with CFR 0.152.
 */
package org.apache.helix.messaging.handling;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigScope;
import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.messaging.handling.GroupMessageHandler;
import org.apache.helix.messaging.handling.HelixTask;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.monitoring.ParticipantMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;

public class HelixTaskExecutor
implements MessageListener {
    public static final int DEFAULT_PARALLEL_TASKS = 40;
    protected final Map<String, Future<HelixTaskResult>> _taskMap;
    private final Object _lock;
    private final StatusUpdateUtil _statusUpdateUtil;
    private final ParticipantMonitor _monitor;
    public static final String MAX_THREADS = "maxThreads";
    final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap = new ConcurrentHashMap();
    final ConcurrentHashMap<String, ExecutorService> _threadpoolMap = new ConcurrentHashMap();
    private static Logger LOG = Logger.getLogger(HelixTaskExecutor.class);
    Map<String, Integer> _resourceThreadpoolSizeMap = new ConcurrentHashMap<String, Integer>();
    final GroupMessageHandler _groupMsgHandler;

    public HelixTaskExecutor() {
        this._taskMap = new ConcurrentHashMap<String, Future<HelixTaskResult>>();
        this._groupMsgHandler = new GroupMessageHandler();
        this._lock = new Object();
        this._statusUpdateUtil = new StatusUpdateUtil();
        this._monitor = new ParticipantMonitor();
        this.startMonitorThread();
    }

    public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) {
        this.registerMessageHandlerFactory(type, factory, 40);
    }

    public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize) {
        if (!this._handlerFactoryMap.containsKey(type)) {
            if (!type.equalsIgnoreCase(factory.getMessageType())) {
                throw new HelixException("Message factory type mismatch. Type: " + type + " factory : " + factory.getMessageType());
            }
            this._handlerFactoryMap.put(type, factory);
            this._threadpoolMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
            LOG.info((Object)("Adding msg factory for type " + type + " threadpool size " + threadpoolSize));
        } else {
            LOG.error((Object)("Ignoring duplicate msg handler factory for type " + type));
        }
    }

    public ParticipantMonitor getParticipantMonitor() {
        return this._monitor;
    }

    private void startMonitorThread() {
    }

    void checkResourceConfig(String resourceName, HelixManager manager) {
        if (!this._resourceThreadpoolSizeMap.containsKey(resourceName)) {
            int threadpoolSize = -1;
            ConfigAccessor configAccessor = manager.getConfigAccessor();
            if (configAccessor != null) {
                ConfigScope scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource(resourceName).build();
                String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
                try {
                    if (threadpoolSizeStr != null) {
                        threadpoolSize = Integer.parseInt(threadpoolSizeStr);
                    }
                }
                catch (Exception e) {
                    LOG.error((Object)"", (Throwable)e);
                }
            }
            if (threadpoolSize > 0) {
                String key = Message.MessageType.STATE_TRANSITION.toString() + "." + resourceName;
                this._threadpoolMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
                LOG.info((Object)("Adding per resource threadpool for resource " + resourceName + " with size " + threadpoolSize));
            }
            this._resourceThreadpoolSizeMap.put(resourceName, threadpoolSize);
        }
    }

    ExecutorService findExecutorServiceForMsg(Message message) {
        String key;
        String resourceName;
        ExecutorService executorService = this._threadpoolMap.get(message.getMsgType());
        if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()) && (resourceName = message.getResourceName()) != null && this._threadpoolMap.containsKey(key = message.getMsgType() + "." + resourceName)) {
            LOG.info((Object)("Find per-resource thread pool with key " + key));
            executorService = this._threadpoolMap.get(key);
        }
        return executorService;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void scheduleTask(Message message, MessageHandler handler, NotificationContext notificationContext) {
        assert (handler != null);
        Object object = this._lock;
        synchronized (object) {
            try {
                String taskId = message.getMsgId() + "/" + message.getPartitionName();
                if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString())) {
                    this.checkResourceConfig(message.getResourceName(), notificationContext.getManager());
                }
                LOG.info((Object)("Scheduling message: " + taskId));
                this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", notificationContext.getManager().getHelixDataAccessor());
                HelixTask task = new HelixTask(message, notificationContext, handler, this);
                if (!this._taskMap.containsKey(taskId)) {
                    LOG.info((Object)("Message:" + taskId + " handling task scheduled"));
                    Future<HelixTaskResult> future = this.findExecutorServiceForMsg(message).submit(task);
                    this._taskMap.put(taskId, future);
                } else {
                    this._statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, "Message handling task already sheduled for " + taskId, notificationContext.getManager().getHelixDataAccessor());
                }
            }
            catch (Exception e) {
                LOG.error((Object)("Error while executing task." + message), (Throwable)e);
                this._statusUpdateUtil.logError(message, HelixTaskExecutor.class, e, "Error while executing task " + e, notificationContext.getManager().getHelixDataAccessor());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelTask(Message message, NotificationContext notificationContext) {
        Object object = this._lock;
        synchronized (object) {
            String taskId = message.getMsgId() + "/" + message.getPartitionName();
            if (this._taskMap.containsKey(taskId)) {
                this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Trying to cancel the future for " + taskId, notificationContext.getManager().getHelixDataAccessor());
                Future<HelixTaskResult> future = this._taskMap.get(taskId);
                if (future.cancel(true)) {
                    this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled " + taskId, notificationContext.getManager().getHelixDataAccessor());
                    this._taskMap.remove(taskId);
                } else {
                    this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "false when trying to cancel the message " + taskId, notificationContext.getManager().getHelixDataAccessor());
                }
            } else {
                this._statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, "Future not found when trying to cancel " + taskId, notificationContext.getManager().getHelixDataAccessor());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void reportCompletion(Message message) {
        Object object = this._lock;
        synchronized (object) {
            String taskId = message.getMsgId() + "/" + message.getPartitionName();
            LOG.info((Object)("message finished: " + taskId + ", took " + (new Date().getTime() - message.getExecuteStartTimeStamp())));
            if (this._taskMap.containsKey(taskId)) {
                this._taskMap.remove(taskId);
            } else {
                LOG.warn((Object)("message " + taskId + "not found in task map"));
            }
        }
    }

    private void updateMessageState(List<Message> readMsgs, HelixDataAccessor accessor, String instanceName) {
        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
        ArrayList<PropertyKey> readMsgKeys = new ArrayList<PropertyKey>();
        for (Message msg : readMsgs) {
            readMsgKeys.add(msg.getKey(keyBuilder, instanceName));
        }
        accessor.setChildren(readMsgKeys, readMsgs);
    }

    @Override
    public void onMessage(String instanceName, List<Message> messages, NotificationContext changeContext) {
        if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
            LOG.info((Object)"Get FINALIZE notification");
            for (MessageHandlerFactory messageHandlerFactory : this._handlerFactoryMap.values()) {
                messageHandlerFactory.reset();
            }
            for (Future future : this._taskMap.values()) {
                future.cancel(true);
            }
            this._taskMap.clear();
            return;
        }
        HelixManager manager = changeContext.getManager();
        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        if (messages == null || messages.size() == 0) {
            LOG.info((Object)"No Messages to process");
            return;
        }
        Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
        ArrayList<MessageHandler> handlers = new ArrayList<MessageHandler>();
        ArrayList<Message> readMsgs = new ArrayList<Message>();
        String sessionId = manager.getSessionId();
        List<String> curResourceNames = helixDataAccessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
        ArrayList<PropertyKey> createCurStateKeys = new ArrayList<PropertyKey>();
        ArrayList<CurrentState> metaCurStates = new ArrayList<CurrentState>();
        HashSet<String> createCurStateNames = new HashSet<String>();
        changeContext.add("TASK_EXECUTOR", this);
        for (Message message : messages) {
            String resourceName;
            if (message.getMsgType().equalsIgnoreCase(Message.MessageType.NO_OP.toString())) {
                LOG.info((Object)("Dropping NO-OP message. mid: " + message.getId() + ", from: " + message.getMsgSrc()));
                helixDataAccessor.removeProperty(message.getKey(keyBuilder, instanceName));
                continue;
            }
            String tgtSessionId = message.getTgtSessionId();
            if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
                String warningMessage = "SessionId does NOT match. expected sessionId: " + sessionId + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + message.getMsgId();
                LOG.warn((Object)warningMessage);
                helixDataAccessor.removeProperty(message.getKey(keyBuilder, instanceName));
                this._statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, helixDataAccessor);
                continue;
            }
            if (Message.MessageState.NEW != message.getMsgState()) {
                LOG.trace((Object)("Message already read. mid: " + message.getMsgId()));
                continue;
            }
            try {
                List<MessageHandler> createHandlers = this.createMessageHandlers(message, changeContext);
                if (createHandlers.isEmpty()) continue;
                handlers.addAll(createHandlers);
            }
            catch (Exception e) {
                LOG.error((Object)("Failed to create message handler for " + message.getMsgId()), (Throwable)e);
                String error = "Failed to create message handler for " + message.getMsgId() + ", exception: " + e;
                this._statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, error, helixDataAccessor);
                message.setMsgState(Message.MessageState.UNPROCESSABLE);
                helixDataAccessor.updateProperty(message.getKey(keyBuilder, instanceName), message);
                continue;
            }
            message.setMsgState(Message.MessageState.READ);
            message.setReadTimeStamp(new Date().getTime());
            message.setExecuteSessionId(changeContext.getManager().getSessionId());
            this._statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", helixDataAccessor);
            readMsgs.add(message);
            if (message.isControlerMsg() || !message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()) || curResourceNames.contains(resourceName = message.getResourceName()) || createCurStateNames.contains(resourceName)) continue;
            createCurStateNames.add(resourceName);
            createCurStateKeys.add(keyBuilder.currentState(instanceName, sessionId, resourceName));
            CurrentState metaCurState = new CurrentState(resourceName);
            metaCurState.setBucketSize(message.getBucketSize());
            metaCurState.setStateModelDefRef(message.getStateModelDef());
            metaCurState.setSessionId(sessionId);
            metaCurState.setGroupMessageMode(message.getGroupMessageMode());
            String ftyName = message.getStateModelFactoryName();
            if (ftyName != null) {
                metaCurState.setStateModelFactoryName(ftyName);
            } else {
                metaCurState.setStateModelFactoryName("DEFAULT");
            }
            metaCurStates.add(metaCurState);
        }
        if (createCurStateKeys.size() > 0) {
            try {
                helixDataAccessor.createChildren(createCurStateKeys, metaCurStates);
            }
            catch (Exception e) {
                LOG.error((Object)e);
            }
        }
        if (readMsgs.size() > 0) {
            this.updateMessageState(readMsgs, helixDataAccessor, instanceName);
            for (MessageHandler handler : handlers) {
                this.scheduleTask(handler._message, handler, changeContext);
            }
        }
    }

    private MessageHandler createMessageHandler(Message message, NotificationContext changeContext) {
        String msgType = message.getMsgType().toString();
        MessageHandlerFactory handlerFactory = this._handlerFactoryMap.get(msgType);
        if (handlerFactory == null) {
            LOG.warn((Object)("Fail to find message handler factory for type: " + msgType + " mid:" + message.getMsgId()));
            return null;
        }
        return handlerFactory.createHandler(message, changeContext);
    }

    private List<MessageHandler> createMessageHandlers(Message message, NotificationContext changeContext) {
        ArrayList<MessageHandler> handlers = new ArrayList<MessageHandler>();
        if (!message.getGroupMessageMode()) {
            LOG.info((Object)("Creating handler for message " + message.getMsgId() + "/" + message.getPartitionName()));
            MessageHandler handler = this.createMessageHandler(message, changeContext);
            if (handler != null) {
                handlers.add(handler);
            }
        } else {
            this._groupMsgHandler.put(message);
            List<String> partitionNames = message.getPartitionNames();
            for (String partitionName : partitionNames) {
                Message subMsg = new Message(message.getRecord());
                subMsg.setPartitionName(partitionName);
                subMsg.setAttribute(Message.Attributes.PARENT_MSG_ID, message.getId());
                LOG.info((Object)("Creating handler for group message " + subMsg.getMsgId() + "/" + partitionName));
                MessageHandler handler = this.createMessageHandler(subMsg, changeContext);
                if (handler == null) continue;
                handlers.add(handler);
            }
        }
        return handlers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutDown() {
        LOG.info((Object)"shutting down TaskExecutor");
        Object object = this._lock;
        synchronized (object) {
            for (String msgType : this._threadpoolMap.keySet()) {
                List<Runnable> tasksLeft = this._threadpoolMap.get(msgType).shutdownNow();
                LOG.info((Object)(tasksLeft.size() + " tasks are still in the threadpool for msgType " + msgType));
            }
            for (String msgType : this._threadpoolMap.keySet()) {
                try {
                    if (this._threadpoolMap.get(msgType).awaitTermination(200L, TimeUnit.MILLISECONDS)) continue;
                    LOG.warn((Object)(msgType + " is not fully termimated in 200 MS"));
                    System.out.println(msgType + " is not fully termimated in 200 MS");
                }
                catch (InterruptedException e) {
                    LOG.error((Object)"Interrupted", (Throwable)e);
                }
            }
        }
        this._monitor.shutDown();
        LOG.info((Object)"shutdown finished");
    }

    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(40);
        Future<HelixTaskResult> future = pool.submit(new Callable<HelixTaskResult>(){

            @Override
            public HelixTaskResult call() throws Exception {
                System.out.println("CMTaskExecutor.main(...).new Callable() {...}.call()");
                return null;
            }
        });
        future = pool.submit(new HelixTask(null, null, null, null));
        Thread.currentThread().join();
        System.out.println(future.isDone());
    }
}

