In this article, we'll explore some multithreading concepts by writing our very own custom thread pool to manage several tasks and threads.
When working with multithreaded code, the availability of multi-core systems increases the overall application throughput quite significantly as long as the tasks and the available threads are managed effectively. However, as soon as things go out of control, the associated overhead outweighs the benefits achieved.
In this article, we’ll explore some multithreading concepts by writing our very own custom thread pool to manage several tasks and threads. You can visit this link for more details about thread pool data structure.
In the following section, we list various classes and their corresponding attributes that play a significant role in the overall implementation:
CustomThreadPoolImpl: ThreadPool implementation class
CustomThreadPoolBuilder: The builder class to instantiate a thread pool. The following options are available for customization:
corePoolSize
: The minimum number of always alive threads waiting for the tasks to be available.maxPoolSize
: The maximum size to which the thread pool can grow in terms of the number of threads.keepAlive
: The time in milliseconds before surplus threads cease to exist.threadList
: An array list to hold the threads. The initial size should match the corePoolSize.
taskList
: A list of tasks to be executed. The taskList is initialized as an unbounded data structure.
public class CustomThreadPoolImpl implements CustomThreadPool {
private final int corePoolSize;
private final int maxPoolSize;
private final int keepAlive;
private final List<Runnable> taskList;
private final List<Thread> threadList;
private final Supplier<Thread> threadSupplier = Thread::new;
private CustomThreadPoolImpl(int corePoolSize, int maxPoolSize, int keepAlive) {
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
this.keepAlive = keepAlive;
// should we use a thread-safe data strcuture?
taskList = new ArrayList<>();
threadList = new ArrayList<>(corePoolSize);
init();
}
private void init() {
Stream.generate(threadSupplier).limit(corePoolSize).forEach(thread -> {
threadList.add(thread);
thread.start();
});
}
// builder and other boilerplate code
}
Once the thread pool is instantiated via the CustomThreadPoolBuilder, the first thing is to instantiate all the threads to accept the tasks. The instantiation is done via the init method in which threads matching the corePoolSize is created and set in the threadList
data structure.
After this step, we have the threads available for accepting the tasks, but we need a mechanism to notify these threads about the availability of a new task. For this purpose, we have the following options:
submit
, in addition to adding the task to taskList
should also notify threads in threadList
for the availability of new tasks.taskList
to notify threads in the threadList
for the availability of new tasks. Every time taskList
receives a new task, it should trigger the callback to notify threadList
for the same.threadList
should monitor taskList
for the availability of new tasks.Out of these three options, option-3 seem to be the most suitable as there are a few issues with the first two options:
taskList
and threadList
for the callback mechanism.But option 3 does not impose any such restriction. Moreover, this allows us to keep the task selection responsibility with the thread itself that can execute the same.
In addition to this, we should also consider the following:
taskList
should be executed in first-in-first-out order.threadList
?As discussed previously, a suitable option is to allow all the threads in threadList
to monitor the taskList
. As the native java.lang.Thread does not have such functionality; we’ll have to enrich it by subclassing it with the required features.
private static final class CustomThread extends Thread{
private final List<Runnable> taskList;
private CustomThread(List<Runnable> taskList) {
this.taskList = taskList;
}
@Override
public void run() {
Iterator<Runnable> taskIterator = taskList.iterator();
while (true) {
while (taskIterator.hasNext()) {
Runnable task = taskIterator.next();
taskIterator.remove();
task.run();
}
}
}
}
The above-mentioned implementation gives us a good starting point to understand the structure of our CustomThread
, enhanced with the capability to monitor the taskList
:
CustomThread
instance is passed the list of runnable tasks i.e. taskList
as a constructor argument.But, there are a few issues associated:
taskList
being shared by multiple threads has to be thread-safe.while(true)
loop is a mere wastage of CPU cycles.taskList
.So considering these points, let’s migrate the taskList
to be a LinkedBlockingQueue
which is an apt data structure as it is:
Here is the updated CustomThread
class:
private static final class CustomThread extends Thread {
private final BlockingQueue<Runnable> taskList;
private CustomThread(BlockingQueue<Runnable> taskList) {
this.taskList = taskList;
}
@Override
public void run() {
try {
while (true) {
// this will wait until a task is available
taskList.take().run();
}
} catch (InterruptedException interruptedException) {
throw new RuntimeException(interruptedException);
}
}
}
While the CustomThread
class now serves its purpose, but there is an important missing aspect that we’ll discuss in the next section.
The run
method of CustomThread class will indefinitely monitor taskList
for the availability of new tasks, which might not be a desirable state to be in. There has to be some external trigger to indicate that the thread pool can be shutdown gracefully with the following options:
In both cases, it should not accept any new task when shutdown is triggered.
To allow this feature, the CustomThread class is updated to include an AtomicBoolean - run
flag passed during its instantiation. The same is updated from the shutdown
and shutdownNow
methods of the CustomThreadPoolImpl class allowing the API users to control the behavior.
The shutdown
method will ensure that taskList
does not accept any new task. Also, setting the run
flag to false will cause the individual threads in the threadList
to stop monitoring the taskList
.
In addition to this, the shutdownNow
method also tries to interrupt any thread which is currently executing a task. Also, any task waiting for the thread allocation will be returned to allow the API user to keep track of un-submitted tasks.
As we are done with the CustomThread implementation, we can direct our attention to the primary thread pool feature, which is to accept a task for execution. As the task is to be run in a separate thread, the task has to be either of Runnable or Callable types.
To be able to accommodate both the types and also returning a result (of type future) in case of Callable tasks, we wrap the input tasks to a FutureTask and submit the same to the taskList
. As the threads in the threadList
are already monitoring the tasks for availability; as soon as one is available, a thread should pick it up and execute it.
@Override
public <T> Future<T> submit(Callable<T> task) {
RunnableFuture<T> future = new FutureTask<>(task);
taskList.add(future);
return future;
}
@Override
public Future<?> submit(Runnable task) {
RunnableFuture<?> future = new FutureTask<>(task, null);
taskList.add(future);
return future;
}
This approach looks ok on a high level, but we still have to consider the corePoolSize, maxPoolSize, and the run flag here.
Now to include these three invariants in the logic, we need to consider the following points:
run
flag is set to false, we discard the newly submitted task as it indicates that a shutdown is already triggered. An exception is thrown in this case.taskList
size is greater than equal to corePoolSize
but less than maxPooSize
- meaning all the threads are already busy processing the tasks, we can create a new thread and add it to the threadList
. In addition, we add the task to the taskList
. Theoretically, this newly created thread should pick this latest addition to the taskList
.taskList
size has already reached its maximum allowed capacity i.e. maxPoolSize
, this denotes that the maximum number of allowed threads are already spawned. In this situation, we simply add the task to the taskList
, and the same will wait to be picked by some thread.These are handled in the handle
method as mentioned below:
private void handle(RunnableFuture<?> futureTask){
if(taskList.size() < corePoolSize){
taskList.add(futureTask);
}else if(taskList.size() < maxPoolSize){
// create a new thread for the threadList
CustomThread customThread = new CustomThread(taskList, run);
threadList.add(customThread);
customThread.start();
taskList.add(futureTask);
}else{
// maxPool size already reached
// simply submit the task
taskList.add(futureTask);
}
}
ToDo - How do we downsize the thread pool when the number of pending execution tasks is less than
corePoolSize
- to be continued in next post.
With this, we have almost covered the set of minimum features required for a working thread pool implementation except for one - An option to allow the API users to wait for the completion of all the tasks.
The await termination method is responsible for providing a mechanism to the API users to wait for all the tasks to complete. Additionally, to avoid waiting too long in case of long-running tasks, it also accepts a timeOut value to let the method return with pending tasks.
One thing to note here is that the awaitTermination
method can only be called after the shutdown method is triggered; otherwise, it will throw an exception.
public boolean awaitTermination(long timeoutInMilli) throws InterruptedException {
if (this.run.get()){
throw new IllegalStateException("threadPool need to be shutdown first before awaitTermination is called");
}
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime <= timeoutInMilli) {
Optional<Thread> aliveThread = threadList.stream().filter(Thread::isAlive).findAny();
if(aliveThread.isEmpty()){
return true;
}
Thread.onSpinWait();
}
// timeout reached but tasks are still not complete
return false;
}
With this, we can conclude our custom implementation of the thread pool. The implementation only describes the core concepts related to the thread pool and multithreading and is nowhere production-ready, so user discretion is advised before using it. The code is available in the Github repository.
That is all for this post. If you want to share any feedback, please drop me an email, or contact me on any social platforms. I’ll try to respond at the earliest. Also, please consider subscribing for regular updates.
Be notified of new posts. Subscribe to the RSS feed.