Karajan:Lightweight threading
From Java CoG Kit
Contents |
Motivation
Lightweight threading in Karajan was one of the intial design goals. It has been present in the interpreter since the very first version. The reason for it was the realization that scalability of (Java) native threads was limited for the problems that Karajan was trying to address, namely the ability to manage well over one thousand concurrent jobs. The problem with Java threads is not as much a certain default thread stack size, but the fact that the stack size is static and does not scale based on actual need. A small stack size may cause deep calls to fail, while a large stack size is wasteful if a large number of threads are created.
As an example, assume a machine with 512MB of free RAM. With a default stack size of 128KB, the entire memory would be exhausted only by the stacks of 4096 concurrent threads (ignoring any other memory requirements).
Lightweight threading allows a reduced number of Java threads to be used to simulate a potentially large number of Karajan threads. Karajan threads also require their own stack space, but the Karajan stack is dynamically allocated from the heap as needed. This allows Karajan to scale easily to much larger number of threads, by about 2 orders of magnitude.
Implementation
It is difficult to pinpoint a monolithic entity in Karajan that could make up for "the intepreter". Instead a number of separate components act together to achieve the result.
Elements
The basic unit of execution in Karajan is the "element". An element responds to certain events and can emit its own events. The main "execution" pattern of an element consists of the reception of a "Start" even and the emission of either a "Completed" or "Failed" event. This model is effectively used to model the execution of the entire workflow. Concurrent execution of an element is acheived by sending it more than one "Start" event. The other patterns result accordingly. For example sequential works as follows:
- receive Start event
- send Start to first sub-element
- receive Completed (from first sub-element)
- send Start to second sub-element
- receive Completed (from second sub-element)
- ...
- receive Completed from last sub-element
- send Completed event (to element that sent the initial Start)
By contrast parallel works as follows:
- receive Start event
- send to each sub-element a Start
- receive Completed from all sub-elements
- send Completed
Event Bus (Queue)
The appearence of concurrency is achieved through the use of an event queue that invokes event handling methods on elements. By default it uses a number of native threads equal to the number of processors, but it can scale the number of threads if bad behaved elements compromise latency by consuming large amounts of time in processing events. This creates a ballance between cooperative multi-threading and preemptive multi-threading. It also introduces a fairly significant speed overhead. Fortunately it is very effective with elements that wait for external events to occur (such as notifications from a server). By using an asynchronous model at all levels of the libraries, the end result is a scalable system with a generally negligible speed overhead in the case of I/O bound problems (such as remote processing). However the system performs very badly at CPU-bound processing, if done exclusively from within.
Stack
The event model is insufficient as a complete execution model. It is necessary to also maintain an internal state of elements. In the above examples, sequential needs to keep track of the current element that is being executed such that when it receives a "Completed" even from it, it will know which element to start next. The state is maintained through a stack. The execution of each element is associated with one and only one frame on the stack. The following list illustrates in more detail how sequential executes:
- receive Start event which contains the reference to a stack
- add a new frame on the stack
- set index to 1 in the top frame
- send (Start, stack) to element with index=index
- receive (Completed, stack)
- increment index on top stack frame
- if index <= numberOfSubElements go to step 4
- discard the top frame from the stack
- look up caller on the top stack frame and send (Completed, stack) to it
Parallel behaves slightly differently:
- receive (Start, stack)
- add new frame
- set count to numberOfSubElements on top stack frame
- for each sub element:
- make a shallow copy of the stack (different thread)
- send (Start, copyOfStack) to element
- receive (Completed, someCopyOfStack)
- decrement count
- if count is 0:
- discard top frame
- look up caller on the top frame and send (Completed, someCopyOfStack) to it
The shallow copy part guarantees that the stack received by parallel is equivalent to any of the copies when Completed is sent. It also guarantees that any new frames created by sub-elements in different "threads" will be completely independent of each other.
