package no.priv.garshol.duke;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Queue;
import no.priv.garshol.duke.matchers.MatchListener;

/* loaded from: input_file:no/priv/garshol/duke/MultithreadProcessor2.class */
public class MultithreadProcessor2 extends Processor {
    private Queue<Task> queue;
    private int threads;
    private int finished;
    private boolean stopped;
    private static int DEFAULT_THREAD_COUNT = 1;
    private MatchListener filter;
    private int count;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:no/priv/garshol/duke/MultithreadProcessor2$MatchingThread.class */
    public class MatchingThread extends Thread {
        private MultithreadProcessor2 processor;

        public MatchingThread(MultithreadProcessor2 multithreadProcessor2, int i) {
            super("MatchingThread " + i);
            this.processor = multithreadProcessor2;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Task task;
            int i = 0;
            while (true) {
                if (MultithreadProcessor2.this.stopped && MultithreadProcessor2.this.queue.isEmpty()) {
                    synchronized (this.processor) {
                        MultithreadProcessor2.access$208(MultithreadProcessor2.this);
                        System.out.println("Thread finished: " + MultithreadProcessor2.this.finished + ", " + i + "tasks");
                    }
                    return;
                }
                synchronized (this.processor) {
                    task = (Task) MultithreadProcessor2.this.queue.poll();
                }
                while (task != null) {
                    i++;
                    this.processor.compareCandidates(task);
                    synchronized (this.processor) {
                        task = (Task) MultithreadProcessor2.this.queue.poll();
                    }
                }
                try {
                    sleep(10L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:no/priv/garshol/duke/MultithreadProcessor2$Task.class */
    public static class Task {
        public Record record;
        public Collection<Record> candidates;

        public Task(Record record, Collection<Record> collection) {
            this.record = record;
            this.candidates = collection;
        }
    }

    public MultithreadProcessor2(Configuration configuration) throws IOException {
        this(configuration, false);
    }

    public MultithreadProcessor2(Configuration configuration, boolean z) throws IOException {
        super(configuration, z);
        this.threads = DEFAULT_THREAD_COUNT;
        this.queue = new ArrayDeque();
        this.stopped = false;
    }

    public void setThreadCount(int i) {
        this.threads = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // no.priv.garshol.duke.Processor
    public void compareCandidates(Record record, Collection<Record> collection, MatchListener matchListener) {
        this.filter = matchListener;
        synchronized (this) {
            this.queue.add(new Task(record, collection));
            this.count++;
            if (this.count % 100 == 0) {
                System.out.println("" + this.count + " records arrived, " + this.queue.size() + " in queue");
            }
        }
    }

    @Override // no.priv.garshol.duke.Processor
    public void deduplicate(Collection<DataSource> collection, int i) throws IOException {
        startThreads();
        super.deduplicate(collection, i);
    }

    protected void compareCandidates(Task task) {
        super.compareCandidates(task.record, task.candidates, this.filter);
    }

    @Override // no.priv.garshol.duke.Processor
    public void close() throws IOException {
        System.out.println("Closing processor");
        this.stopped = true;
        while (this.finished < this.threads) {
            try {
                System.out.println("Waiting, finished " + this.finished + ", queue" + this.queue.size());
                Thread.sleep(10L);
            } catch (InterruptedException e) {
            }
        }
        this.database.close();
    }

    private void startThreads() {
        for (int i = 0; i < this.threads; i++) {
            new MatchingThread(this, i).start();
        }
    }

    static /* synthetic */ int access$208(MultithreadProcessor2 multithreadProcessor2) {
        int i = multithreadProcessor2.finished;
        multithreadProcessor2.finished = i + 1;
        return i;
    }
}
