diff --git a/llvm_cbuilder/builder.py b/llvm_cbuilder/builder.py index ee308d6..eaa159e 100644 --- a/llvm_cbuilder/builder.py +++ b/llvm_cbuilder/builder.py @@ -5,6 +5,7 @@ import contextlib import llvm.core as lc import llvm.ee as le +from llvm import LLVMException def _is_int(ty): return isinstance(ty, lc.IntegerType) @@ -245,6 +246,44 @@ class CBuilder(object): yield cb cb.close() + @contextlib.contextmanager + def forever(self): + with self.loop() as loop: + with loop.condition() as setcond: + NULL = self.constant_null(lc.Type.int()) + setcond( NULL == NULL ) + with loop.body(): + yield loop + + @contextlib.contextmanager + def for_range(self, *args): + def check_arg(x): + if isinstance(x, int): + return self.constant(lc.Type.int(), x) + if not x.is_int: + raise TypeError(x, "All args must be of integer type.") + return x + + if len(args) == 3: + start, stop, step = map(check_arg, args) + elif len(args) == 2: + start, stop = map(check_arg, args) + step = self.constant(start.type, 1) + elif len(args) == 1: + stop = check_arg(args[0]) + start = self.constant(stop.type, 0) + step = self.constant(stop.type, 1) + else: + raise TypeError("Invalid # of arguments: 1, 2 or 3") + + idx = self.var_copy(start) + with self.loop() as loop: + with loop.condition() as setcond: + setcond( idx < stop ) + with loop.body(): + yield loop, idx + idx += step + def position_at_end(self, bb): self.basic_block = bb self.builder.position_at_end(bb) @@ -270,13 +309,18 @@ class CBuilder(object): def constant_string(self, string): mod = self.function.module - name = '.conststr.%x' % hash(string) + name = '.conststr.%x_%x' % (hash(string), len(string)) content = lc.Constant.stringz(string) - globalstr = mod.add_global_variable(content.type, name=name) - globalstr.initializer = content - ptr = mod.add_global_variable(lc.Type.pointer(content.type.element), - name=name+".ptr") - return CTemp(self, globalstr.bitcast(lc.Type.pointer(content.type.element))) + try: + globalstr = mod.get_global_variable_named(name) + except LLVMException: + globalstr = mod.add_global_variable(content.type, name=name) + globalstr.initializer = content +# ptr = mod.add_global_variable(lc.Type.pointer(content.type.element), +# name=name+".ptr") + return CTemp(self, globalstr.bitcast( + lc.Type.pointer(content.type.element))) + def get_intrinsic(self, intrinsic_id, tys): lfunc = lc.Function.intrinsic(self.function.module, intrinsic_id, tys) diff --git a/parallel_vectorize.py b/parallel_vectorize.py index 1f83ad5..25176b8 100644 --- a/parallel_vectorize.py +++ b/parallel_vectorize.py @@ -1,3 +1,15 @@ +''' +This file implements the code-generator for parallel-vectorize. + +ParallelUFunc is the platform independent base class for generating +the thread dispatcher. This thread dispatcher launches threads +that execute the generated function of UFuncCore. +UFuncCore is subclassed to specialize for the input/output types. +The actual workload is invoked inside the function generated by UFuncCore. +UFuncCore also defines a work-stealing mechanism that allows idle threads +to steal works from other threads. +''' + from llvm.core import * from llvm.passes import * @@ -5,14 +17,21 @@ from llvm_cbuilder import * import llvm_cbuilder.shortnames as C class WorkQueue(CStruct): + ''' + Structure for workqueue for parallel-ufunc. + ''' + _fields_ = [ - ('next', C.intp), - ('last', C.intp), - ('lock', C.int), + ('next', C.intp), # next index of work item + ('last', C.intp), # last index of work item (exlusive) + ('lock', C.int), # for locking the workqueue ] def Lock(self): + ''' + Inline the lock procedure. + ''' with self.parent.loop() as loop: with loop.condition() as setcond: unlocked = self.parent.constant(self.lock.type, 0) @@ -26,6 +45,9 @@ class WorkQueue(CStruct): pass def Unlock(self): + ''' + Inline the unlock procedure. + ''' unlocked = self.parent.constant(self.lock.type, 0) locked = self.parent.constant(self.lock.type, 1) @@ -39,6 +61,9 @@ class WorkQueue(CStruct): class ContextCommon(CStruct): + ''' + Structure for thread-shared context information in parallel-ufunc. + ''' _fields_ = [ # loop ufunc args ('args', C.pointer(C.char_p)), @@ -52,6 +77,9 @@ class ContextCommon(CStruct): ] class Context(CStruct): + ''' + Structure for thread-specific context information in parallel-ufunc. + ''' _fields_ = [ ('common', C.pointer(ContextCommon.llvm_type())), ('id', C.int), @@ -71,14 +99,14 @@ class ParallelUFunc(CDefinition): ] def body(self, func, worker, args, dimensions, steps, data, ThreadCount=1): - - + # Setup variables common = self.var(ContextCommon, name='common') workqueues = self.array(WorkQueue, ThreadCount, name='workqueues') contexts = self.array(Context, ThreadCount, name='contexts') num_thread = self.var(C.int, ThreadCount, name='num_thread') + # Initialize ContextCommon common.args.assign(args) common.dimensions.assign(dimensions) common.steps.assign(steps) @@ -87,6 +115,11 @@ class ParallelUFunc(CDefinition): common.num_thread.assign(num_thread.cast(C.int)) common.workqueues.assign(workqueues.reference()) + # Determine chunksize, initial count of work-items per thread. + # If total_work >= num_thread, equally divide the works. + # If total_work % num_thread != 0, the last thread does all remaining works. + # If total_work < num_thread, each thread does one work, + # and set num_thread to total_work N = dimensions[0] ChunkSize = self.var_copy(N / num_thread.cast(N.type)) ChunkSize_NULL = self.constant_null(ChunkSize.type) @@ -95,60 +128,70 @@ class ParallelUFunc(CDefinition): ChunkSize.assign(self.constant(ChunkSize.type, 1)) num_thread.assign(N.cast(num_thread.type)) + # Populate workqueue for all threads self._populate_workqueues(workqueues, N, ChunkSize, num_thread) + + # Populate contexts for all threads self._populate_context(contexts, common, num_thread) + # Dispatch worker threads self._dispatch_worker(worker, contexts, num_thread) + ## DEBUG ONLY ## # Check for race condition - total_completed = self.var(C.intp, 0, name='total_completed') - for t in range(ThreadCount): - cur_ctxt = contexts[t].as_struct(Context) - total_completed += cur_ctxt.completed - self.debug(cur_ctxt.id, 'completed', cur_ctxt.completed) + if True: + total_completed = self.var(C.intp, 0, name='total_completed') + for t in range(ThreadCount): + cur_ctxt = contexts[t].as_struct(Context) + total_completed += cur_ctxt.completed + self.debug(cur_ctxt.id, 'completed', cur_ctxt.completed) - with self.ifelse( total_completed == N ) as ifelse: - with ifelse.then(): - self.debug("All is well!") - with ifelse.otherwise(): - self.debug("ERROR: race occurred!") + with self.ifelse( total_completed == N ) as ifelse: + with ifelse.then(): + self.debug("All is well!") + with ifelse.otherwise(): + self.debug("ERROR: race occurred! Trigger segfault") + self.unreachable() + # Return self.ret() def _populate_workqueues(self, workqueues, N, ChunkSize, num_thread): - i = self.var(num_thread.type, 0, name='i') - ONE = self.constant(i.type, 1) - with self.loop() as loop: - with loop.condition() as setcond: - setcond( i < num_thread ) - with loop.body(): - cur_wq = workqueues[i].as_struct(WorkQueue) - cur_wq.next.assign(i.cast(ChunkSize.type) * ChunkSize) - cur_wq.last.assign((i + ONE).cast(ChunkSize.type) * ChunkSize) - cur_wq.lock.assign(self.constant(C.int, 0)) - # increment - i += ONE + ''' + Loop over all threads and populate the workqueue for each of them. + ''' + ONE = self.constant(num_thread.type, 1) + with self.for_range(num_thread) as (loop, i): + cur_wq = workqueues[i].as_struct(WorkQueue) + cur_wq.next.assign(i.cast(ChunkSize.type) * ChunkSize) + cur_wq.last.assign((i + ONE).cast(ChunkSize.type) * ChunkSize) + cur_wq.lock.assign(self.constant(C.int, 0)) # end loop last_wq = workqueues[num_thread - ONE].as_struct(WorkQueue) last_wq.last.assign(N) def _populate_context(self, contexts, common, num_thread): - i = self.var(num_thread.type, 0, name='i') - ONE = self.constant(i.type, 1) - with self.loop() as loop: - with loop.condition() as setcond: - setcond( i < num_thread ) - with loop.body(): - cur_ctxt = contexts[i].as_struct(Context) - cur_ctxt.common.assign(common.reference()) - cur_ctxt.id.assign(i) - cur_ctxt.completed.assign(self.constant_null( - cur_ctxt.completed.type)) - # increment - i += ONE + ''' + Loop over all threads and populate contexts for each of them. + ''' + ONE = self.constant(num_thread.type, 1) + with self.for_range(num_thread) as (loop, i): + cur_ctxt = contexts[i].as_struct(Context) + cur_ctxt.common.assign(common.reference()) + cur_ctxt.id.assign(i) + cur_ctxt.completed.assign( + self.constant_null(cur_ctxt.completed.type)) + def _dispatch_worker(self, worker, contexts, num_thread): + ''' + Dispatch worker threads. + ''' + raise NotImplementedError class ParallelUFuncPosix(ParallelUFunc): + ''' + Implements _dispatch_worker to use pthread. + ''' def _dispatch_worker(self, worker, contexts, num_thread): api = PThreadAPI(self) NULL = self.constant_null(C.void_p) @@ -158,27 +201,19 @@ class ParallelUFuncPosix(ParallelUFunc): # self.debug("launch threads") # TODO error handling - i = self.var(num_thread.type, 0, name='i') - ONE = self.constant(i.type, 1) - with self.loop() as loop: - with loop.condition() as setcond: - setcond( i < num_thread ) - with loop.body(): - api.pthread_create(threads[i].reference(), NULL, worker, - contexts[i].reference().cast(C.void_p)) - i += ONE + ONE = self.constant(num_thread.type, 1) + with self.for_range(num_thread) as (loop, i): + api.pthread_create(threads[i].reference(), NULL, worker, + contexts[i].reference().cast(C.void_p)) - # self.debug("join threads") - i.assign(self.constant_null(i.type)) - with self.loop() as loop: - with loop.condition() as setcond: - setcond( i < num_thread ) - with loop.body(): - api.pthread_join(threads[i], NULL) - i += ONE - # self.debug("closing") + with self.for_range(num_thread) as (loop, i): + api.pthread_join(threads[i], NULL) class UFuncCore(CDefinition): + ''' + Generates the workqueue handling and work stealing and invoke + the work function for each work item. + ''' _name_ = 'ufunc_worker' _argtys_ = [ ('context', C.pointer(Context.llvm_type())), @@ -203,64 +238,61 @@ class UFuncCore(CDefinition): ''' ZERO = self.constant_null(C.int) - with self.loop() as loop: - with loop.condition() as setcond: - setcond(ZERO == ZERO) # loop forever + with self.forever() as loop: + workqueue.Lock() + # Critical section + item = self.var_copy(workqueue.next, name='item') + workqueue.next += self.constant(item.type, 1) + last = self.var_copy(workqueue.last, name='last') + # Release + workqueue.Unlock() - with loop.body(): - workqueue.Lock() - # Critical section - item = self.var_copy(workqueue.next, name='item') - workqueue.next += self.constant(item.type, 1) - last = self.var_copy(workqueue.last, name='last') - # Release - workqueue.Unlock() + with self.ifelse( item >= last ) as ifelse: + with ifelse.then(): + loop.break_loop() - with self.ifelse( item >= last ) as ifelse: - with ifelse.then(): - loop.break_loop() - - self._do_work(common, item, tid, completed) + self._do_work(common, item, tid) + completed += self.constant(completed.type, 1) def _do_work_stealing(self, common, tid, completed): ''' - Steal work from other workqueues + Steal work from other workqueues. ''' # self.debug("start work stealing", tid) - incomplete_thread_ct = self.var(C.int, 1) - ITC_NULL = self.constant_null(incomplete_thread_ct.type) + steal_continue = self.var(C.int, 1) + STEAL_STOP = self.constant_null(steal_continue.type) + + # Loop until all workqueues are done. with self.loop() as loop: with loop.condition() as setcond: - setcond( incomplete_thread_ct != ITC_NULL ) + setcond( steal_continue != STEAL_STOP ) with loop.body(): - incomplete_thread_ct.assign(ITC_NULL) - self._do_work_stealing_innerloop(common, incomplete_thread_ct, - tid, completed) + steal_continue.assign(STEAL_STOP) + self._do_work_stealing_innerloop(common, steal_continue, tid, + completed) - def _do_work_stealing_innerloop(self, common, incomplete_thread_ct, tid, + def _do_work_stealing_innerloop(self, common, steal_continue, tid, completed): - i = self.var(C.int, 0) - with self.loop() as loop: - with loop.condition() as setcond: - setcond( i < common.num_thread ) + ''' + Loop over all other threads and try to steal work. + ''' + with self.for_range(common.num_thread) as (loop, i): + with self.ifelse( i != tid ) as ifelse: + with ifelse.then(): + otherqueue = common.workqueues[i].as_struct(WorkQueue) + self._do_work_stealing_check(common, otherqueue, + steal_continue, tid, + completed) - with loop.body(): - with self.ifelse( i != tid ) as ifelse: - with ifelse.then(): - otherqueue = common.workqueues[i].as_struct(WorkQueue) - self._do_work_stealing_check(common, otherqueue, - incomplete_thread_ct, tid, - completed) - - # increment - i += self.constant(i.type, 1) - - def _do_work_stealing_check(self, common, otherqueue, - incomplete_thread_ct, tid, completed): + def _do_work_stealing_check(self, common, otherqueue, steal_continue, tid, + completed): + ''' + Check the workqueue for any remaining work and steal it. + ''' otherqueue.Lock() ONE = self.constant(otherqueue.last.type, 1) - ITC_ONE = self.constant(incomplete_thread_ct.type, 1) + STEAL_CONTINUE = self.constant(steal_continue.type, 1) with self.ifelse(otherqueue.next < otherqueue.last) as ifelse: with ifelse.then(): otherqueue.last -= ONE @@ -269,17 +301,20 @@ class UFuncCore(CDefinition): otherqueue.Unlock() # Released - self._do_work(common, item, tid, completed) + self._do_work(common, item, tid) + completed += self.constant(completed.type, 1) # Mark incomplete thread - incomplete_thread_ct.assign(ITC_ONE) + steal_continue.assign(STEAL_CONTINUE) with ifelse.otherwise(): otherqueue.Unlock() - def _do_work(self, common, item, tid, completed): - # self.debug(" tid", tid, "item", item) - completed += self.constant(completed.type, 1) + def _do_work(self, common, item, tid): + ''' + Prepare to call the actual work function. + ''' + raise NotImplementedError class PThreadAPI(CExternal): @@ -294,25 +329,3 @@ class PThreadAPI(CExternal): pthread_join = Type.function(C.int, [C.void_p, C.void_p]) -def _main(): - NUM_THREAD = 2 - module = Module.new(__name__) - - fpm = FunctionPassManager.new(module) - PassManagerBuilder.new().populate(fpm) - - f1 = ParallelUFuncPosix.define(module, ThreadCount=NUM_THREAD) - f2 = UFuncCore.define(module) - #print(module) -# print(f1) - print(f2) - module.verify() - - fpm.run(f1) - fpm.run(f2) - print('optimized'.center(80,'-')) - print(f2) - -if __name__ == '__main__': - _main() - diff --git a/test_parallel_vectorize.py b/test_parallel_vectorize.py index b3ffb5c..fbcb133 100644 --- a/test_parallel_vectorize.py +++ b/test_parallel_vectorize.py @@ -1,11 +1,40 @@ from parallel_vectorize import * -''' -void parallel_ufunc(void * func, void * worker, - char **args, npy_intp *dimensions, - npy_intp *steps, void *data) -''' + +class Work_D_D(CDefinition): + _name_ = 'copy_d_d' + _retty_ = C.double + _argtys_ = [ + ('inval', C.double), + ] + def body(self, inval): + self.ret(inval / self.constant(inval.type, 2.345)) + +class UFuncCore_D_D(UFuncCore): + ''' + Specialize UFuncCore for double input, double output. + ''' + def _do_work(self, common, item, tid): + ufunc_type = Type.function(C.double, [C.double]) + ufunc_ptr = CFunc(self, common.func.cast(C.pointer(ufunc_type)).value) + + inbase = common.args[0] + outbase = common.args[1] + + instep = common.steps[0] + outstep = common.steps[0] + + indata = inbase[item * instep].reference().cast(C.pointer(C.double)) + outdata = outbase[item * outstep].reference().cast(C.pointer(C.double)) + + res = ufunc_ptr(indata.load()) + outdata.store(res) + + class Tester(CDefinition): + ''' + Generate test. + ''' _name_ = 'tester' def body(self): @@ -14,20 +43,23 @@ class Tester(CDefinition): ThreadCount = 2 ArgCount = 2 - WorkCount = 1000000 + WorkCount = 10000 parallel_ufunc = CFunc(self, ParallelUFuncPosix.define(module, ThreadCount=ThreadCount)) - worker = CFunc(self, UFuncCore.define(module)) + ufunc_core = CFunc(self, UFuncCore_D_D.define(module)) + worker = CFunc(self, Work_D_D.define(module)) # real work NULL = self.constant_null(C.void_p) args = self.array(C.char_p, 2, name='args') + args_double = [] for t in range(ThreadCount): - args_for_thread = self.array(C.double, 4) + args_for_thread = self.array(C.double, WorkCount) args[t].assign(args_for_thread.cast(C.char_p)) + args_double.append(args_for_thread) dims = self.array(C.intp, 1, name='dims') dims[0].assign(self.constant(C.intp, WorkCount)) @@ -37,7 +69,28 @@ class Tester(CDefinition): for c in range(ArgCount): steps[c].assign(self.constant(C.intp, 8)) - parallel_ufunc(NULL, worker.cast(C.void_p), args, dims, steps, NULL) + # populate data + inbase = args_double[0] + + i = self.var(C.intp, 0) + with self.loop() as loop: + with loop.condition() as setcond: + setcond( i < self.constant(C.intp, WorkCount) ) + with loop.body(): + inbase[i].assign(i.cast(C.double)) + i += self.constant(C.intp, 1) + + # call parallel ufunc + parallel_ufunc(worker.cast(C.void_p), ufunc_core.cast(C.void_p), args, + dims, steps, NULL) + + # check error + outbase = args_double[-1] + with self.for_range(self.constant(C.intp, WorkCount)) as (loop, i): + test = outbase[i] != (inbase[i] / self.constant(C.double, 2.345)) + with self.ifelse( test ) as ifelse: + with ifelse.then(): + self.debug("Invalid data at i =", i, outbase[i], inbase[i]) self.ret() @@ -61,6 +114,7 @@ def main(): print(module) # run + print('run') exe = CExecutor(module) exe.engine.get_pointer_to_function(fntester) func = exe.get_ctype_function(fntester, 'void')