Implementing a Custom Thread Pool

In this article, we'll explore some multithreading concepts by writing our very own custom thread pool to manage several tasks and threads.

When working with multithreaded code, the availability of multi-core systems increases the overall application throughput quite significantly as long as the tasks and the available threads are managed effectively. However, as soon as things go out of control, the associated overhead outweighs the benefits achieved.

In this article, we’ll explore some multithreading concepts by writing our very own custom thread pool to manage several tasks and threads. You can visit this link for more details about thread pool data structure.

Class Structure

In the following section, we list various classes and their corresponding attributes that play a significant role in the overall implementation:

  1. CustomThreadPoolImpl: ThreadPool implementation class

  2. CustomThreadPoolBuilder: The builder class to instantiate a thread pool. The following options are available for customization:

    1. corePoolSize: The minimum number of always alive threads waiting for the tasks to be available.
    2. maxPoolSize: The maximum size to which the thread pool can grow in terms of the number of threads.
    3. keepAlive: The time in milliseconds before surplus threads cease to exist.
  3. threadList: An array list to hold the threads. The initial size should match the corePoolSize.

  4. taskList: A list of tasks to be executed. The taskList is initialized as an unbounded data structure.

public class CustomThreadPoolImpl implements CustomThreadPool {

    private final int corePoolSize;
    private final int maxPoolSize;
    private final int keepAlive;    
   
    private final List<Runnable> taskList;
    private final List<Thread> threadList;
    private final Supplier<Thread> threadSupplier = Thread::new;    
   
    private CustomThreadPoolImpl(int corePoolSize, int maxPoolSize, int keepAlive) {
        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maxPoolSize;
        this.keepAlive = keepAlive;  
        // should we use a thread-safe data strcuture?
        taskList = new ArrayList<>();
        threadList = new ArrayList<>(corePoolSize);  
        init();
    }   

    private void init() {
        Stream.generate(threadSupplier).limit(corePoolSize).forEach(thread -> {
            threadList.add(thread);
            thread.start();
     });
    }
   
   // builder and other boilerplate code
}

Implementation Approach

Once the thread pool is instantiated via the CustomThreadPoolBuilder, the first thing is to instantiate all the threads to accept the tasks. The instantiation is done via the init method in which threads matching the corePoolSize is created and set in the threadList data structure.

After this step, we have the threads available for accepting the tasks, but we need a mechanism to notify these threads about the availability of a new task. For this purpose, we have the following options:

  1. Every call to submit, in addition to adding the task to taskList should also notify threads in threadList for the availability of new tasks.
  2. There should be a callback with the taskList to notify threads in the threadList for the availability of new tasks. Every time taskList receives a new task, it should trigger the callback to notify threadList for the same.
  3. The threads in the threadList should monitor taskList for the availability of new tasks.

Out of these three options, option-3 seem to be the most suitable as there are a few issues with the first two options:

  1. Additional overhead for identification of a free thread that can accept the newly submitted task.
  2. Option 2 will require us to customize  taskList and threadList for the callback mechanism.
  3. Explicitly handle the thread safety in case the tasks are submitted parallelly by multiple threads.

But option 3 does not impose any such restriction. Moreover, this allows us to keep the task selection responsibility with the thread itself that can execute the same.

In addition to this, we should also consider the following:

  1. How do we ensure the execution order of the tasks? The tasks in the taskList should be executed in first-in-first-out order.
  2. How do we check which thread is free in the threadList?
  3. If all the threads are already in use, how do we ensure we can buffer the additionally submitted tasks?
  4. An optional feature could be to retry any failed task?

Custom Thread

As discussed previously, a suitable option is to allow all the threads in  threadList to monitor the taskList. As the native java.lang.Thread does not have such functionality; we’ll have to enrich it by subclassing it with the required features.

private static final class CustomThread extends Thread{
    private final List<Runnable> taskList;
    private CustomThread(List<Runnable> taskList) {
        this.taskList = taskList;
    }

    @Override
    public void run() {
        Iterator<Runnable> taskIterator = taskList.iterator();
        while (true) {
            while (taskIterator.hasNext()) {
                Runnable task = taskIterator.next();
                taskIterator.remove();
                task.run();
            }
        }
    }
}

The above-mentioned implementation gives us a good starting point to understand the structure of our CustomThread, enhanced with the capability to monitor the taskList:

  1. Every CustomThread instance is passed the list of runnable tasks i.e. taskList as a constructor argument.
  2. The thread, when started, will monitor the list indefinitely. It will get and remove new tasks from the list and will trigger the corresponding execution.

But, there are a few issues associated:

  1. The taskList being shared by multiple threads has to be thread-safe.
  2. In case there are no tasks available, the while(true) loop is a mere wastage of CPU cycles.
  3. Now is a good time to think about ordering the taskList.

So considering these points, let’s migrate the taskList to be a LinkedBlockingQueue which is an apt data structure as it is:

  1. Ordered - being a queue, it natively supports FIFO ordering.
  2. Thread-safe and Blocking data structure - this will help avoid CPU cycle waste if no task is available.
  3. Dynamically sized to allow buffering of tasks.

Here is the updated CustomThread class:

private static final class CustomThread extends Thread {
   private final BlockingQueue<Runnable> taskList;

   private CustomThread(BlockingQueue<Runnable> taskList) {
      this.taskList = taskList;
   }

   @Override
   public void run() {
      try {
         while (true) {
            // this will wait until a task is available
            taskList.take().run();
         }
      } catch (InterruptedException interruptedException) {
         throw new RuntimeException(interruptedException);
      }
   }
}

While the CustomThread class now serves its purpose, but there is an important missing aspect that we’ll discuss in the next section.

When does the while(true) loop terminate?

The run method of CustomThread class will indefinitely monitor taskList for the availability of new tasks, which might not be a desirable state to be in. There has to be some external trigger to indicate that the thread pool can be shutdown gracefully with the following options:

  1. Waiting for the existing running tasks to complete. The tasks currently waiting for execution are also submitted when a thread is available.
  2. It should terminate immediately, halting all the running tasks and not triggering any pending tasks.

In both cases, it should not accept any new task when shutdown is triggered.

To allow this feature, the CustomThread class is updated to include an AtomicBoolean - run flag passed during its instantiation. The same is updated from the shutdown and shutdownNow methods of the CustomThreadPoolImpl class allowing the API users to control the behavior.

The shutdown method will ensure that taskList does not accept any new task. Also, setting the run flag to false will cause the individual threads in the threadList to stop monitoring the taskList.

In addition to this, the shutdownNow method also tries to interrupt any thread which is currently executing a task. Also, any task waiting for the thread allocation will be returned to allow the API user to keep track of un-submitted tasks.

Submitting tasks

As we are done with the CustomThread implementation, we can direct our attention to the primary thread pool feature, which is to accept a task for execution. As the task is to be run in a separate thread, the task has to be either of Runnable or Callable types.

To be able to accommodate both the types and also returning a result (of type future) in case of Callable tasks, we wrap the input tasks to a FutureTask and submit the same to the taskList. As the threads in the threadList are already monitoring the tasks for availability; as soon as one is available, a thread should pick it up and execute it.

@Override
public <T> Future<T> submit(Callable<T> task) {
    RunnableFuture<T> future = new FutureTask<>(task);
    taskList.add(future);
    return future;
}
@Override
public Future<?> submit(Runnable task) {
    RunnableFuture<?> future = new FutureTask<>(task, null);
    taskList.add(future);
    return future;
}

This approach looks ok on a high level, but we still have to consider the corePoolSize, maxPoolSize, and the run flag here.

Now to include these three invariants in the logic, we need to consider the following points:

  1. In case the run flag is set to false, we discard the newly submitted task as it indicates that a shutdown is already triggered. An exception is thrown in this case.
  2. Otherwise:
    1. If taskList size is greater than equal to corePoolSize but less than maxPooSize - meaning all the threads are already busy processing the tasks, we can create a new thread and add it to the threadList. In addition, we add the task to the taskList. Theoretically, this newly created thread should pick this latest addition to the taskList.
    2. In case the taskList size has already reached its maximum allowed capacity i.e. maxPoolSize, this denotes that the maximum number of allowed threads are already spawned. In this situation, we simply add the task to the taskList, and the same will wait to be picked by some thread.

These are handled in the handle method as mentioned below:

private void handle(RunnableFuture<?> futureTask){
    if(taskList.size() < corePoolSize){
        taskList.add(futureTask);
    }else if(taskList.size() < maxPoolSize){
        // create a new thread for the threadList
        CustomThread customThread = new CustomThread(taskList, run);
        threadList.add(customThread);
        customThread.start();
        taskList.add(futureTask);
    }else{
        // maxPool size already reached
        // simply submit the task
        taskList.add(futureTask);
    }
}

ToDo - How do we downsize the thread pool when the number of pending execution tasks is less than corePoolSize - to be continued in next post.

With this, we have almost covered the set of minimum features required for a working thread pool implementation except for one - An option to allow the API users to wait for the completion of all the tasks.

Await Termination

The await termination method is responsible for providing a mechanism to the API users to wait for all the tasks to complete. Additionally, to avoid waiting too long in case of long-running tasks, it also accepts a timeOut value to let the method return with pending tasks.

One thing to note here is that the awaitTermination method can only be called after the shutdown method is triggered; otherwise, it will throw an exception.

public boolean awaitTermination(long timeoutInMilli) throws InterruptedException {
    if (this.run.get()){
        throw new IllegalStateException("threadPool need to be shutdown first before awaitTermination is called");
    }

    long startTime = System.currentTimeMillis();
    while (System.currentTimeMillis() - startTime <= timeoutInMilli) {
        Optional<Thread> aliveThread = threadList.stream().filter(Thread::isAlive).findAny();
        if(aliveThread.isEmpty()){
            return true;
        }
        Thread.onSpinWait();
    }
    // timeout reached but tasks are still not complete
    return false;
}

With this, we can conclude our custom implementation of the thread pool. The implementation only describes the core concepts related to the thread pool and multithreading and is nowhere production-ready, so user discretion is advised before using it. The code is available in the Github repository.

That is all for this post. If you want to share any feedback, please drop me an email, or contact me on any social platforms. I’ll try to respond at the earliest. Also, please consider subscribing for regular updates.

Be notified of new posts. Subscribe to the RSS feed.