Got test_parallel_vectorize working.

This commit is contained in:
Siu Kwan Lam 2012-08-06 11:35:03 -07:00
commit 1594e5456a
3 changed files with 253 additions and 142 deletions

View file

@ -1,3 +1,15 @@
'''
This file implements the code-generator for parallel-vectorize.
ParallelUFunc is the platform independent base class for generating
the thread dispatcher. This thread dispatcher launches threads
that execute the generated function of UFuncCore.
UFuncCore is subclassed to specialize for the input/output types.
The actual workload is invoked inside the function generated by UFuncCore.
UFuncCore also defines a work-stealing mechanism that allows idle threads
to steal works from other threads.
'''
from llvm.core import *
from llvm.passes import *
@ -5,14 +17,21 @@ from llvm_cbuilder import *
import llvm_cbuilder.shortnames as C
class WorkQueue(CStruct):
'''
Structure for workqueue for parallel-ufunc.
'''
_fields_ = [
('next', C.intp),
('last', C.intp),
('lock', C.int),
('next', C.intp), # next index of work item
('last', C.intp), # last index of work item (exlusive)
('lock', C.int), # for locking the workqueue
]
def Lock(self):
'''
Inline the lock procedure.
'''
with self.parent.loop() as loop:
with loop.condition() as setcond:
unlocked = self.parent.constant(self.lock.type, 0)
@ -26,6 +45,9 @@ class WorkQueue(CStruct):
pass
def Unlock(self):
'''
Inline the unlock procedure.
'''
unlocked = self.parent.constant(self.lock.type, 0)
locked = self.parent.constant(self.lock.type, 1)
@ -39,6 +61,9 @@ class WorkQueue(CStruct):
class ContextCommon(CStruct):
'''
Structure for thread-shared context information in parallel-ufunc.
'''
_fields_ = [
# loop ufunc args
('args', C.pointer(C.char_p)),
@ -52,6 +77,9 @@ class ContextCommon(CStruct):
]
class Context(CStruct):
'''
Structure for thread-specific context information in parallel-ufunc.
'''
_fields_ = [
('common', C.pointer(ContextCommon.llvm_type())),
('id', C.int),
@ -71,14 +99,14 @@ class ParallelUFunc(CDefinition):
]
def body(self, func, worker, args, dimensions, steps, data, ThreadCount=1):
# Setup variables
common = self.var(ContextCommon, name='common')
workqueues = self.array(WorkQueue, ThreadCount, name='workqueues')
contexts = self.array(Context, ThreadCount, name='contexts')
num_thread = self.var(C.int, ThreadCount, name='num_thread')
# Initialize ContextCommon
common.args.assign(args)
common.dimensions.assign(dimensions)
common.steps.assign(steps)
@ -87,6 +115,11 @@ class ParallelUFunc(CDefinition):
common.num_thread.assign(num_thread.cast(C.int))
common.workqueues.assign(workqueues.reference())
# Determine chunksize, initial count of work-items per thread.
# If total_work >= num_thread, equally divide the works.
# If total_work % num_thread != 0, the last thread does all remaining works.
# If total_work < num_thread, each thread does one work,
# and set num_thread to total_work
N = dimensions[0]
ChunkSize = self.var_copy(N / num_thread.cast(N.type))
ChunkSize_NULL = self.constant_null(ChunkSize.type)
@ -95,60 +128,70 @@ class ParallelUFunc(CDefinition):
ChunkSize.assign(self.constant(ChunkSize.type, 1))
num_thread.assign(N.cast(num_thread.type))
# Populate workqueue for all threads
self._populate_workqueues(workqueues, N, ChunkSize, num_thread)
# Populate contexts for all threads
self._populate_context(contexts, common, num_thread)
# Dispatch worker threads
self._dispatch_worker(worker, contexts, num_thread)
## DEBUG ONLY ##
# Check for race condition
total_completed = self.var(C.intp, 0, name='total_completed')
for t in range(ThreadCount):
cur_ctxt = contexts[t].as_struct(Context)
total_completed += cur_ctxt.completed
self.debug(cur_ctxt.id, 'completed', cur_ctxt.completed)
if True:
total_completed = self.var(C.intp, 0, name='total_completed')
for t in range(ThreadCount):
cur_ctxt = contexts[t].as_struct(Context)
total_completed += cur_ctxt.completed
self.debug(cur_ctxt.id, 'completed', cur_ctxt.completed)
with self.ifelse( total_completed == N ) as ifelse:
with ifelse.then():
self.debug("All is well!")
with ifelse.otherwise():
self.debug("ERROR: race occurred!")
with self.ifelse( total_completed == N ) as ifelse:
with ifelse.then():
self.debug("All is well!")
with ifelse.otherwise():
self.debug("ERROR: race occurred! Trigger segfault")
self.unreachable()
# Return
self.ret()
def _populate_workqueues(self, workqueues, N, ChunkSize, num_thread):
i = self.var(num_thread.type, 0, name='i')
ONE = self.constant(i.type, 1)
with self.loop() as loop:
with loop.condition() as setcond:
setcond( i < num_thread )
with loop.body():
cur_wq = workqueues[i].as_struct(WorkQueue)
cur_wq.next.assign(i.cast(ChunkSize.type) * ChunkSize)
cur_wq.last.assign((i + ONE).cast(ChunkSize.type) * ChunkSize)
cur_wq.lock.assign(self.constant(C.int, 0))
# increment
i += ONE
'''
Loop over all threads and populate the workqueue for each of them.
'''
ONE = self.constant(num_thread.type, 1)
with self.for_range(num_thread) as (loop, i):
cur_wq = workqueues[i].as_struct(WorkQueue)
cur_wq.next.assign(i.cast(ChunkSize.type) * ChunkSize)
cur_wq.last.assign((i + ONE).cast(ChunkSize.type) * ChunkSize)
cur_wq.lock.assign(self.constant(C.int, 0))
# end loop
last_wq = workqueues[num_thread - ONE].as_struct(WorkQueue)
last_wq.last.assign(N)
def _populate_context(self, contexts, common, num_thread):
i = self.var(num_thread.type, 0, name='i')
ONE = self.constant(i.type, 1)
with self.loop() as loop:
with loop.condition() as setcond:
setcond( i < num_thread )
with loop.body():
cur_ctxt = contexts[i].as_struct(Context)
cur_ctxt.common.assign(common.reference())
cur_ctxt.id.assign(i)
cur_ctxt.completed.assign(self.constant_null(
cur_ctxt.completed.type))
# increment
i += ONE
'''
Loop over all threads and populate contexts for each of them.
'''
ONE = self.constant(num_thread.type, 1)
with self.for_range(num_thread) as (loop, i):
cur_ctxt = contexts[i].as_struct(Context)
cur_ctxt.common.assign(common.reference())
cur_ctxt.id.assign(i)
cur_ctxt.completed.assign(
self.constant_null(cur_ctxt.completed.type))
def _dispatch_worker(self, worker, contexts, num_thread):
'''
Dispatch worker threads.
'''
raise NotImplementedError
class ParallelUFuncPosix(ParallelUFunc):
'''
Implements _dispatch_worker to use pthread.
'''
def _dispatch_worker(self, worker, contexts, num_thread):
api = PThreadAPI(self)
NULL = self.constant_null(C.void_p)
@ -158,27 +201,19 @@ class ParallelUFuncPosix(ParallelUFunc):
# self.debug("launch threads")
# TODO error handling
i = self.var(num_thread.type, 0, name='i')
ONE = self.constant(i.type, 1)
with self.loop() as loop:
with loop.condition() as setcond:
setcond( i < num_thread )
with loop.body():
api.pthread_create(threads[i].reference(), NULL, worker,
contexts[i].reference().cast(C.void_p))
i += ONE
ONE = self.constant(num_thread.type, 1)
with self.for_range(num_thread) as (loop, i):
api.pthread_create(threads[i].reference(), NULL, worker,
contexts[i].reference().cast(C.void_p))
# self.debug("join threads")
i.assign(self.constant_null(i.type))
with self.loop() as loop:
with loop.condition() as setcond:
setcond( i < num_thread )
with loop.body():
api.pthread_join(threads[i], NULL)
i += ONE
# self.debug("closing")
with self.for_range(num_thread) as (loop, i):
api.pthread_join(threads[i], NULL)
class UFuncCore(CDefinition):
'''
Generates the workqueue handling and work stealing and invoke
the work function for each work item.
'''
_name_ = 'ufunc_worker'
_argtys_ = [
('context', C.pointer(Context.llvm_type())),
@ -203,64 +238,61 @@ class UFuncCore(CDefinition):
'''
ZERO = self.constant_null(C.int)
with self.loop() as loop:
with loop.condition() as setcond:
setcond(ZERO == ZERO) # loop forever
with self.forever() as loop:
workqueue.Lock()
# Critical section
item = self.var_copy(workqueue.next, name='item')
workqueue.next += self.constant(item.type, 1)
last = self.var_copy(workqueue.last, name='last')
# Release
workqueue.Unlock()
with loop.body():
workqueue.Lock()
# Critical section
item = self.var_copy(workqueue.next, name='item')
workqueue.next += self.constant(item.type, 1)
last = self.var_copy(workqueue.last, name='last')
# Release
workqueue.Unlock()
with self.ifelse( item >= last ) as ifelse:
with ifelse.then():
loop.break_loop()
with self.ifelse( item >= last ) as ifelse:
with ifelse.then():
loop.break_loop()
self._do_work(common, item, tid, completed)
self._do_work(common, item, tid)
completed += self.constant(completed.type, 1)
def _do_work_stealing(self, common, tid, completed):
'''
Steal work from other workqueues
Steal work from other workqueues.
'''
# self.debug("start work stealing", tid)
incomplete_thread_ct = self.var(C.int, 1)
ITC_NULL = self.constant_null(incomplete_thread_ct.type)
steal_continue = self.var(C.int, 1)
STEAL_STOP = self.constant_null(steal_continue.type)
# Loop until all workqueues are done.
with self.loop() as loop:
with loop.condition() as setcond:
setcond( incomplete_thread_ct != ITC_NULL )
setcond( steal_continue != STEAL_STOP )
with loop.body():
incomplete_thread_ct.assign(ITC_NULL)
self._do_work_stealing_innerloop(common, incomplete_thread_ct,
tid, completed)
steal_continue.assign(STEAL_STOP)
self._do_work_stealing_innerloop(common, steal_continue, tid,
completed)
def _do_work_stealing_innerloop(self, common, incomplete_thread_ct, tid,
def _do_work_stealing_innerloop(self, common, steal_continue, tid,
completed):
i = self.var(C.int, 0)
with self.loop() as loop:
with loop.condition() as setcond:
setcond( i < common.num_thread )
'''
Loop over all other threads and try to steal work.
'''
with self.for_range(common.num_thread) as (loop, i):
with self.ifelse( i != tid ) as ifelse:
with ifelse.then():
otherqueue = common.workqueues[i].as_struct(WorkQueue)
self._do_work_stealing_check(common, otherqueue,
steal_continue, tid,
completed)
with loop.body():
with self.ifelse( i != tid ) as ifelse:
with ifelse.then():
otherqueue = common.workqueues[i].as_struct(WorkQueue)
self._do_work_stealing_check(common, otherqueue,
incomplete_thread_ct, tid,
completed)
# increment
i += self.constant(i.type, 1)
def _do_work_stealing_check(self, common, otherqueue,
incomplete_thread_ct, tid, completed):
def _do_work_stealing_check(self, common, otherqueue, steal_continue, tid,
completed):
'''
Check the workqueue for any remaining work and steal it.
'''
otherqueue.Lock()
ONE = self.constant(otherqueue.last.type, 1)
ITC_ONE = self.constant(incomplete_thread_ct.type, 1)
STEAL_CONTINUE = self.constant(steal_continue.type, 1)
with self.ifelse(otherqueue.next < otherqueue.last) as ifelse:
with ifelse.then():
otherqueue.last -= ONE
@ -269,17 +301,20 @@ class UFuncCore(CDefinition):
otherqueue.Unlock()
# Released
self._do_work(common, item, tid, completed)
self._do_work(common, item, tid)
completed += self.constant(completed.type, 1)
# Mark incomplete thread
incomplete_thread_ct.assign(ITC_ONE)
steal_continue.assign(STEAL_CONTINUE)
with ifelse.otherwise():
otherqueue.Unlock()
def _do_work(self, common, item, tid, completed):
# self.debug(" tid", tid, "item", item)
completed += self.constant(completed.type, 1)
def _do_work(self, common, item, tid):
'''
Prepare to call the actual work function.
'''
raise NotImplementedError
class PThreadAPI(CExternal):
@ -294,25 +329,3 @@ class PThreadAPI(CExternal):
pthread_join = Type.function(C.int, [C.void_p, C.void_p])
def _main():
NUM_THREAD = 2
module = Module.new(__name__)
fpm = FunctionPassManager.new(module)
PassManagerBuilder.new().populate(fpm)
f1 = ParallelUFuncPosix.define(module, ThreadCount=NUM_THREAD)
f2 = UFuncCore.define(module)
#print(module)
# print(f1)
print(f2)
module.verify()
fpm.run(f1)
fpm.run(f2)
print('optimized'.center(80,'-'))
print(f2)
if __name__ == '__main__':
_main()