/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.curator.framework.recipes.queue;

import com.google.common.base.Preconditions;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.api.ACLBackgroundPathAndBytesable;
import com.netflix.curator.framework.api.BackgroundPathable;
import com.netflix.curator.framework.api.CuratorEvent;
import com.netflix.curator.framework.api.CuratorEventType;
import com.netflix.curator.framework.api.CuratorListener;
import com.netflix.curator.framework.api.PathAndBytesable;
import com.netflix.curator.framework.api.WatchPathable;
import com.netflix.curator.framework.recipes.queue.ErrorMode;
import com.netflix.curator.framework.recipes.queue.ItemSerializer;
import com.netflix.curator.framework.recipes.queue.MultiItem;
import com.netflix.curator.framework.recipes.queue.QueueConsumer;
import com.netflix.curator.framework.recipes.queue.QueueSerializer;
import com.netflix.curator.utils.ZKPaths;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedQueue<T>
implements Closeable {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final CuratorFramework client;
    private final QueueSerializer<T> serializer;
    private final String queuePath;
    private final Executor executor;
    private final ExecutorService service;
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    private final QueueConsumer<T> consumer;
    private final int minItemsBeforeRefresh;
    private final boolean refreshOnWatch;
    private final boolean isProducerOnly;
    private final AtomicBoolean refreshOnWatchSignaled = new AtomicBoolean(false);
    private final String lockPath;
    private final AtomicReference<ErrorMode> errorMode = new AtomicReference<ErrorMode>(ErrorMode.REQUEUE);
    private final AtomicInteger putCount = new AtomicInteger(0);
    private final CuratorListener listener = new CuratorListener(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
            if (event.getType() == CuratorEventType.WATCHED) {
                if (event.getWatchedEvent().getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                    DistributedQueue.this.internalNotify();
                }
            } else if (event.getType() == CuratorEventType.CREATE) {
                AtomicInteger atomicInteger = DistributedQueue.this.putCount;
                synchronized (atomicInteger) {
                    DistributedQueue.this.putCount.decrementAndGet();
                    DistributedQueue.this.putCount.notifyAll();
                }
            }
        }
    };
    private static final String QUEUE_ITEM_NAME = "queue-";

    DistributedQueue(CuratorFramework client, QueueConsumer<T> consumer, QueueSerializer<T> serializer, String queuePath, ThreadFactory threadFactory, Executor executor, int minItemsBeforeRefresh, boolean refreshOnWatch, String lockPath) {
        Preconditions.checkNotNull((Object)client);
        Preconditions.checkNotNull(serializer);
        Preconditions.checkNotNull((Object)queuePath);
        Preconditions.checkNotNull((Object)threadFactory);
        Preconditions.checkNotNull((Object)executor);
        this.isProducerOnly = consumer == null;
        this.lockPath = lockPath;
        this.consumer = consumer;
        this.minItemsBeforeRefresh = minItemsBeforeRefresh;
        this.refreshOnWatch = refreshOnWatch;
        this.client = client;
        this.serializer = serializer;
        this.queuePath = queuePath;
        this.executor = executor;
        this.service = Executors.newSingleThreadExecutor(threadFactory);
    }

    public void start() throws Exception {
        if (!this.state.compareAndSet(State.LATENT, State.STARTED)) {
            throw new IllegalStateException();
        }
        try {
            this.client.create().creatingParentsIfNeeded().forPath(this.queuePath);
        }
        catch (KeeperException.NodeExistsException ignore) {
            // empty catch block
        }
        if (this.lockPath != null) {
            try {
                this.client.create().creatingParentsIfNeeded().forPath(this.lockPath);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
        this.client.getCuratorListenable().addListener((Object)this.listener, this.executor);
        if (!this.isProducerOnly) {
            this.service.submit(new Callable<Object>(){

                @Override
                public Object call() {
                    DistributedQueue.this.runLoop();
                    return null;
                }
            });
        }
    }

    public void setErrorMode(ErrorMode newErrorMode) {
        Preconditions.checkNotNull((Object)this.lockPath);
        this.errorMode.set(newErrorMode);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean flushPuts(long waitTime, TimeUnit timeUnit) throws InterruptedException {
        long msWaitRemaining = TimeUnit.MILLISECONDS.convert(waitTime, timeUnit);
        AtomicInteger atomicInteger = this.putCount;
        synchronized (atomicInteger) {
            while (this.putCount.get() > 0) {
                if (msWaitRemaining <= 0L) {
                    return false;
                }
                long startMs = System.currentTimeMillis();
                this.putCount.wait(msWaitRemaining);
                long elapsedMs = System.currentTimeMillis() - startMs;
                msWaitRemaining -= elapsedMs;
            }
        }
        return true;
    }

    @Override
    public void close() throws IOException {
        if (!this.state.compareAndSet(State.STARTED, State.STOPPED)) {
            throw new IllegalStateException();
        }
        this.client.getCuratorListenable().removeListener((Object)this.listener);
        this.service.shutdownNow();
    }

    public void put(T item) throws Exception {
        this.checkState();
        String path = this.makeItemPath();
        this.internalPut(item, null, path);
    }

    public void putMulti(MultiItem<T> items) throws Exception {
        this.checkState();
        String path = this.makeItemPath();
        this.internalPut(null, items, path);
    }

    void internalPut(T item, MultiItem<T> multiItem, String path) throws Exception {
        if (item != null) {
            final AtomicReference<T> ref = new AtomicReference<T>(item);
            multiItem = new MultiItem<T>(){

                @Override
                public T nextItem() throws Exception {
                    return ref.getAndSet(null);
                }
            };
        }
        this.putCount.incrementAndGet();
        byte[] bytes = ItemSerializer.serialize(multiItem, this.serializer);
        ((PathAndBytesable)((ACLBackgroundPathAndBytesable)this.client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).inBackground()).forPath(path, bytes);
    }

    void checkState() throws Exception {
        if (this.state.get() != State.STARTED) {
            throw new IllegalStateException();
        }
    }

    String makeItemPath() {
        return ZKPaths.makePath((String)this.queuePath, (String)QUEUE_ITEM_NAME);
    }

    private synchronized void internalNotify() {
        if (this.refreshOnWatch) {
            this.refreshOnWatchSignaled.set(true);
        }
        this.notifyAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runLoop() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                List children;
                DistributedQueue distributedQueue = this;
                synchronized (distributedQueue) {
                    do {
                        if ((children = (List)((BackgroundPathable)this.client.getChildren().watched()).forPath(this.queuePath)).size() != 0) continue;
                        this.wait();
                    } while (children.size() == 0);
                    this.refreshOnWatchSignaled.set(false);
                }
                if (children.size() <= 0) continue;
                this.processChildren(children);
            }
        }
        catch (Exception e) {
            this.log.error("Exception caught in background handler", (Throwable)e);
        }
    }

    private void processChildren(List<String> children) throws Exception {
        Collections.sort(children);
        boolean isUsingLockSafety = this.lockPath != null;
        int min = this.minItemsBeforeRefresh;
        for (String itemNode : children) {
            if (Thread.currentThread().isInterrupted()) break;
            if (!itemNode.startsWith(QUEUE_ITEM_NAME)) {
                this.log.warn("Foreign node in queue path: " + itemNode);
                continue;
            }
            if (min-- <= 0 && this.refreshOnWatchSignaled.compareAndSet(true, false)) break;
            if (isUsingLockSafety) {
                this.processWithLockSafety(itemNode);
                continue;
            }
            this.processNormally(itemNode);
        }
    }

    private boolean processMessageBytes(String itemNode, byte[] bytes) throws Exception {
        T item;
        MultiItem<T> items;
        try {
            items = ItemSerializer.deserialize(bytes, this.serializer);
        }
        catch (Exception e) {
            this.log.error("Corrupted queue item: " + itemNode, (Throwable)e);
            return false;
        }
        boolean removeItem = true;
        while ((item = items.nextItem()) != null) {
            try {
                this.consumer.consumeMessage(item);
            }
            catch (Exception e) {
                this.log.error("Exception processing queue item: " + itemNode, (Throwable)e);
                if (this.errorMode.get() != ErrorMode.REQUEUE) continue;
                removeItem = false;
                break;
            }
        }
        return removeItem;
    }

    private void processNormally(String itemNode) throws Exception {
        try {
            String itemPath = ZKPaths.makePath((String)this.queuePath, (String)itemNode);
            Stat stat = new Stat();
            byte[] bytes = (byte[])((WatchPathable)this.client.getData().storingStatIn(stat)).forPath(itemPath);
            ((BackgroundPathable)this.client.delete().withVersion(stat.getVersion())).forPath(itemPath);
            this.processMessageBytes(itemNode, bytes);
        }
        catch (KeeperException.NodeExistsException ignore) {
        }
        catch (KeeperException.NoNodeException ignore) {
        }
        catch (KeeperException.BadVersionException badVersionException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processWithLockSafety(String itemNode) throws Exception {
        String lockNodePath = ZKPaths.makePath((String)this.lockPath, (String)itemNode);
        boolean lockCreated = false;
        try {
            ((ACLBackgroundPathAndBytesable)this.client.create().withMode(CreateMode.EPHEMERAL)).forPath(lockNodePath);
            lockCreated = true;
            String itemPath = ZKPaths.makePath((String)this.queuePath, (String)itemNode);
            byte[] bytes = (byte[])this.client.getData().forPath(itemPath);
            if (this.processMessageBytes(itemNode, bytes)) {
                this.client.delete().forPath(itemPath);
            }
        }
        catch (KeeperException.NodeExistsException ignore) {
        }
        catch (KeeperException.NoNodeException ignore) {
        }
        catch (KeeperException.BadVersionException ignore) {
        }
        finally {
            if (lockCreated) {
                this.client.delete().forPath(lockNodePath);
            }
        }
    }

    private static enum State {
        LATENT,
        STARTED,
        STOPPED;

    }
}

