/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.drpc;

import backtype.storm.generated.DRPCRequest;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.drpc.Drpc;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClearThread
extends RunnableCallback {
    private static final Logger LOG = LoggerFactory.getLogger(ClearThread.class);
    private final int REQUEST_TIMEOUT_SECS;
    private static final int TIMEOUT_CHECK_SECS = 5;
    private Drpc drpcService;

    public ClearThread(Drpc drpc) {
        this.drpcService = drpc;
        this.REQUEST_TIMEOUT_SECS = JStormUtils.parseInt(this.drpcService.getConf().get("drpc.request.timeout.secs"), 60);
        LOG.info("Drpc timeout seconds is " + this.REQUEST_TIMEOUT_SECS);
    }

    @Override
    public void run() {
        for (Map.Entry<String, Integer> e : this.drpcService.getIdtoStart().entrySet()) {
            if (TimeUtils.time_delta(e.getValue()) <= this.REQUEST_TIMEOUT_SECS) continue;
            String id = e.getKey();
            LOG.warn("Timeout DRPC request id: {} start at {}", (Object)id, (Object)e.getValue());
            ConcurrentLinkedQueue<DRPCRequest> queue = this.drpcService.acquireQueue(this.drpcService.getIdtoFunction().get(id));
            queue.remove(this.drpcService.getIdtoRequest().get(id));
            Semaphore s = this.drpcService.getIdtoSem().get(id);
            if (s != null) {
                s.release();
            }
            this.drpcService.cleanup(id);
            LOG.info("Clear request " + id);
        }
        JStormUtils.sleepMs(10L);
    }

    @Override
    public Object getResult() {
        return 5;
    }
}

