When working with multithreaded code, the availability of multi-core systems increases the overall application throughput quite significantly as long as the tasks and the available threads are managed effectively. However, as soon as things go out of control, the associated overhead outweighs the benefits achieved.
In this article, we'll explore some multithreading concepts by writing our very own custom thread pool to manage several tasks and threads. You can visit the below-mentioned link for more details about thread pool data structure:
In the following section, we list various classes and their corresponding attributes that play a significant role in the overall implementation:
- CustomThreadPoolImpl: ThreadPool implementation class
- CustomThreadPoolBuilder: The builder class to instantiate a thread pool. The following options are available for customization:
corePoolSize: The minimum number of always alive threads waiting for the tasks to be available.
maxPoolSize: The maximum size to which the thread pool can grow in terms of the number of threads.
keepAlive: The time in milliseconds before surplus threads cease to exist.
threadList: An array list to hold the threads. The initial size should match the corePoolSize.
taskList: A list of tasks to be executed. The taskList is initialized as an unbounded data structure.
Once the thread pool is instantiated via the CustomThreadPoolBuilder, the first thing is to instantiate all the threads to accept the tasks. The instantiation is done via the init method in which threads matching the corePoolSize is created and set in the
threadList data structure.
After this step, we have the threads available for accepting the tasks, but we need a mechanism to notify these threads about the availability of a new task. For this purpose, we have the following options:
- Every call to
submit, in addition to adding the task to
taskListshould also notify threads in
threadListfor the availability of new tasks.
- There should be a callback with the
taskListto notify threads in the
threadListfor the availability of new tasks. Every time
taskListreceives a new task, it should trigger the callback to notify
threadListfor the same.
- The threads in the
taskListfor the availability of new tasks.
Out of these three options, option-3 seem to be the most suitable as there are a few issues with the first two options:
- Additional overhead for identification of a free thread that can accept the newly submitted task.
- Option 2 will require us to customize
threadListfor the callback mechanism.
- Explicitly handle the thread safety in case the tasks are submitted parallelly by multiple threads.
But option 3 does not impose any such restriction. Moreover, this allows us to keep the task selection responsibility with the thread itself that can execute the same.
In addition to this, we should also consider the following:
- How do we ensure the execution order of the tasks? The tasks in the
taskListshould be executed in first-in-first-out order.
- How do we check which thread is free in the
- If all the threads are already in use, how do we ensure we can buffer the additionally submitted tasks?
- An optional feature could be to retry any failed task?
As discussed previously, a suitable option is to allow all the threads in
threadList to monitor the
taskList. As the native java.lang.Thread does not have such functionality; we'll have to enrich it by subclassing it with the required features.
The above-mentioned implementation gives us a good starting point to understand the structure of our
CustomThread, enhanced with the capability to monitor the
CustomThreadinstance is passed the list of runnable tasks i.e.
taskListas a constructor argument.
- The thread, when started, will monitor the list indefinitely. It will get and remove new tasks from the list and will trigger the corresponding execution.
But, there are a few issues associated:
taskListbeing shared by multiple threads has to be thread-safe.
- In case there are no tasks available, the
while(true)loop is a mere wastage of CPU cycles.
- Now is a good time to think about ordering the
So considering these points, let's migrate the
taskList to be a
LinkedBlockingQueue which is an apt data structure as it is:
- Ordered - being a queue, it natively supports FIFO ordering.
- Thread-safe and Blocking data structure - this will help avoid CPU cycle waste if no task is available.
- Dynamically sized to allow buffering of tasks.
Here is the updated
CustomThread class now serves its purpose, but there is an important missing aspect that we'll discuss in the next section.
When does the while(true) loop terminate?
run method of CustomThread class will indefinitely monitor
taskList for the availability of new tasks, which might not be a desirable state to be in. There has to be some external trigger to indicate that the thread pool can be shutdown gracefully with the following options:
- Waiting for the existing running tasks to complete. The tasks currently waiting for execution are also submitted when a thread is available.
- It should terminate immediately, halting all the running tasks and not triggering any pending tasks.
In both cases, it should not accept any new task when shutdown is triggered.
To allow this feature, the CustomThread class is updated to include an AtomicBoolean -
run flag passed during its instantiation. The same is updated from the
shutdownNow methods of the CustomThreadPoolImpl class allowing the API users to control the behavior.
shutdown method will ensure that
taskList does not accept any new task. Also, setting the
run flag to false will cause the individual threads in the
threadList to stop monitoring the
In addition to this, the
shutdownNow method also tries to interrupt any thread which is currently executing a task. Also, any task waiting for the thread allocation will be returned to allow the API user to keep track of un-submitted tasks.
As we are done with the CustomThread implementation, we can direct our attention to the primary thread pool feature, which is to accept a task for execution. As the task is to be run in a separate thread, the task has to be either of Runnable or Callable types.
To be able to accommodate both the types and also returning a result (of type future) in case of Callable tasks, we wrap the input tasks to a FutureTask and submit the same to the
taskList. As the threads in the
threadList are already monitoring the tasks for availability; as soon as one is available, a thread should pick it up and execute it.
This approach looks ok on a high level, but we still have to consider the corePoolSize, maxPoolSize, and the run flag here.
Now to include these three invariants in the logic, we need to consider the following points:
- In case the
runflag is set to false, we discard the newly submitted task as it indicates that a shutdown is already triggered. An exception is thrown in this case.
taskListsize is greater than equal to
corePoolSizebut less than
maxPooSize- meaning all the threads are already busy processing the tasks, we can create a new thread and add it to the
threadList. In addition, we add the task to the
taskList. Theoretically, this newly created thread should pick this latest addition to the
- In case the
taskListsize has already reached its maximum allowed capacity i.e.
maxPoolSize, this denotes that the maximum number of allowed threads are already spawned. In this situation, we simply add the task to the
taskList, and the same will wait to be picked by some thread.
These are handled in the
handle method as mentioned below:
ToDo - How do we downsize the thread pool when the number of pending execution tasks is less than
corePoolSize- to be continued in next post.
With this, we have almost covered the set of minimum features required for a working thread pool implementation except for one - An option to allow the API users to wait for the completion of all the tasks.
The await termination method is responsible for providing a mechanism to the API users to wait for all the tasks to complete. Additionally, to avoid waiting too long in case of long-running tasks, it also accepts a timeOut value to let the method return with pending tasks.
One thing to note here is that the
awaitTermination method can only be called after the shutdown method is triggered; otherwise, it will throw an exception.
With this, we can conclude our custom implementation of the thread pool. The implementation only describes the core concepts related to the thread pool and multithreading and is nowhere production-ready, so user discretion is advised before using it. The code is available in the Github repository.
That is all for this post. If you want to share any feedback, please drop me an email, or contact me on any social platforms. I’ll try to respond at the earliest. Also, please consider subscribing for regular updates.
jvm aware newsletter
join the newsletter to receive the latest updates in your inbox.