Prioritize tasks in a Thread Pool Executor

In this post, we'll explore how to prioritize tasks submitted to a ThreadPool.

While java.util.concurrent.Executors provides a lot of factory and utility methods to create ThreadPoolExecutor with various combinations of underlying resources (threadFactory, poolSize etc), the default configurations might not be sufficient for every use-case.

Consider the use-case where you want to prioritize the tasks submitted so that they are executed based on their priority and not the order of their submission. But as the default configuration uses a LinkedBlockingQueue (as mentioned below), this will not serve the purpose.

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                // the queue to use for holding tasks before they are executed. 
                                new LinkedBlockingQueue<Runnable>()));
}

But what if we simply replace the LinkedBlockingQueue with a PriorityQueue? That should be sufficient - right? Let’s try that:

  1. Create a runnable task to be submitted, which is comparable based on the priority field:

    class Task implements Comparable<Task>, Runnable{
     private final String name;
     private final int priority;
    
     Task(String name, int priority) {
         this.name = name;
         this.priority = priority;
     }
    
     @Override
     public void run() {
         try {
             System.out.println(this.name + " triggered successfully");
             Thread.sleep(2000);
             System.out.println(this.name + " completed successfully");
         }catch (InterruptedException interruptedException){
             Thread.currentThread().interrupt();
             throw new RuntimeException(interruptedException);
         }
     }
    
     @Override
     public int compareTo(Task that) {
         return Integer.compare(this.priority, that.priority);
     }
    }
    
  2. Create a ThreadPoolExecutor instance with PriorityBlockingQueue and submit the tasks to it:

    ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, // core pool size
                 1, // maximum pool size
                 1000,  // keep alive time
                 TimeUnit.SECONDS,  // time unit
                 new PriorityBlockingQueue<Runnable>() // worker queue
    );
    
    tpe.submit(new Task("T5", 5));
    tpe.submit(new Task("T4", 4));
    tpe.submit(new Task("T3", 3));
    tpe.submit(new Task("T2", 2));
    tpe.submit(new Task("T1", 1));
    

You might think that the tasks will be executed based on the priority but that is not the case (at-least not straightforward). The execution fails with the following exception:

java.lang.ClassCastException: class java.util.concurrent.FutureTask cannot be cast to class java.lang.Comparable (java.util.concurrent.FutureTask and java.lang.Comparable are in module java.base of loader 'bootstrap')

This is due to the fact that the runnable instance (instance of Task in this case) which is submitted for execution is first converted to an instance of FutureTask by the submit method:

// java.util.concurrent.AbstractExecutorService
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
// java.util.concurrent.AbstractExecutorService
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value); // <-- this is failing
}

Now this FutureTask instance when submitted to the PriorityBlockingQueue (in case of thread unavailability due to an existing executing task) will cause the above mentioned exception as this FutureTask instance is not Comparable - expected by the PriorityBlockingQueue.

So how do we recover from this? Looks like there is some more customization that we need to implement. First, we need to make sure that that the RunnableFuture returned by newTaskFor method should implement Comparable interface. This way we should be able to add the resultant tasks in a PriorityQueue.

Next thing that we can do is drop the Comparable implementation from the Task, as we will no longer be submitting the Task instances directly to the PriorityQueue but the same will be wrapped inside our output of newTaskFor method.

Here is the complete code for the same:


// Task updated to remove Comparable implementation
class Task implements Runnable{
    private final String name;
    private final int priority;

    Task(String name, int priority) {
        this.name = name;
        this.priority = priority;
    }

    @Override
    public void run(){
        try {
            System.out.println(this.name + " triggered successfully");
            Thread.sleep(2000);
            System.out.println(this.name + " completed successfully");
        }catch (InterruptedException interruptedException){
            Thread.currentThread().interrupt();
            throw new RuntimeException(interruptedException);
        }
    }

    public int getPriority() {
        return priority;
    }
}

// A custom single ThreadPoolExecutor executor that overrides the newTaskFor method
// to result a Comparable FutureTask instance.
class CustomSingleThreadPoolExecutor extends ThreadPoolExecutor{
    public CustomSingleThreadPoolExecutor(BlockingQueue<Runnable> workQueue) {
        super(1, 1, 1000, TimeUnit.SECONDS, workQueue);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new CustomFutureTask<>(runnable);
    }
}

// A comparable FutureTask
class CustomFutureTask<T> extends FutureTask<T> implements Comparable<CustomFutureTask<T>>{

    private final Task task;
    public CustomFutureTask(Runnable task) {
        super(task, null);
        this.task = (Task) task;
    }

    @Override
    public int compareTo(CustomFutureTask that) {
        return Integer.compare(this.task.getPriority(), that.task.getPriority());
    }
}

Now when you execute this code, if there are more than 1 task waiting in the priority queue for the thread allocation, those will be assigned the same as per their priority:

ThreadPoolExecutor tpe = new CustomSingleThreadPoolExecutor(new PriorityBlockingQueue<Runnable>());
tpe.submit(new Task("T5", 5));
tpe.submit(new Task("T4", 4));
tpe.submit(new Task("T3", 3));
tpe.submit(new Task("T2", 2));
tpe.submit(new Task("T1", 1));

And the output for the same:

T5 triggered successfully
T5 completed successfully
T1 triggered successfully
T1 completed successfully
T2 triggered successfully
T2 completed successfully
T3 triggered successfully
T3 completed successfully
T4 triggered successfully
T4 completed successfully

The sample code and spock test cases are available in the github repo.

Be notified of new posts. Subscribe to the RSS feed.