From 3009d8468f3ca85650852d3777ca1c547c57c06d Mon Sep 17 00:00:00 2001 From: Siu Kwan Lam Date: Wed, 8 Aug 2012 15:29:16 -0700 Subject: [PATCH] more cleanup and doc --- README.md | 16 +++++++++++ parallel_vectorize.py | 63 +++++++++++++++++++++++-------------------- 2 files changed, 50 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index cab0f28..a63039a 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,23 @@ +llvm_cbuilder +------------- + A llvm-py Builder wrapper for writing in slightly higher-level constructs. This is aiming for two usecases: + 1. Emit LLVM code in a more human-readable way; + 2. Writing low-level code that you can't do it properly/portably with C, e.g template (generic), atomic operations, memory ordering... + +Parallel Vectorize +------------------ + +`parallel_vectorize.py` implements a set of code generator that bases on +llvm_cbuilder to create specialized parallel ufunc. + +See `test_parallel_vectorize_numpy*.py` for testing and demo of the code +with the new `numpy.fromfunc()`. + + diff --git a/parallel_vectorize.py b/parallel_vectorize.py index 4c8daab..f26ad46 100644 --- a/parallel_vectorize.py +++ b/parallel_vectorize.py @@ -17,8 +17,7 @@ from llvm_cbuilder import * import llvm_cbuilder.shortnames as C class WorkQueue(CStruct): - ''' - Structure for workqueue for parallel-ufunc. + '''structure for workqueue for parallel-ufunc. ''' _fields_ = [ @@ -29,8 +28,7 @@ class WorkQueue(CStruct): def Lock(self): - ''' - Inline the lock procedure. + '''inline the lock procedure. ''' with self.parent.loop() as loop: with loop.condition() as setcond: @@ -45,8 +43,7 @@ class WorkQueue(CStruct): pass def Unlock(self): - ''' - Inline the unlock procedure. + '''inline the unlock procedure. ''' unlocked = self.parent.constant(self.lock.type, 0) locked = self.parent.constant(self.lock.type, 1) @@ -61,8 +58,7 @@ class WorkQueue(CStruct): class ContextCommon(CStruct): - ''' - Structure for thread-shared context information in parallel-ufunc. + '''structure for thread-shared context information in parallel-ufunc. ''' _fields_ = [ # loop ufunc args @@ -77,8 +73,7 @@ class ContextCommon(CStruct): ] class Context(CStruct): - ''' - Structure for thread-specific context information in parallel-ufunc. + '''structure for thread-specific context information in parallel-ufunc. ''' _fields_ = [ ('common', C.pointer(ContextCommon.llvm_type())), @@ -87,7 +82,11 @@ class Context(CStruct): ] class ParallelUFunc(CDefinition): - ''' + '''the generic parallel vectorize mechanism + + Can be specialized to the maximum number of threads on the platform. + + Platform dependent threading function is implemented in def _dispatch_worker(self, worker, contexts, num_thread): @@ -107,7 +106,8 @@ class ParallelUFunc(CDefinition): @classmethod def specialize(cls, num_thread): - print num_thread + '''specialize to the maximum # of thread + ''' cls._name_ = 'parallel_ufunc_%d' % num_thread cls.ThreadCount = num_thread @@ -172,8 +172,7 @@ class ParallelUFunc(CDefinition): self.ret() def _populate_workqueues(self, workqueues, N, ChunkSize, num_thread): - ''' - Loop over all threads and populate the workqueue for each of them. + '''loop over all threads and populate the workqueue for each of them. ''' ONE = self.constant(num_thread.type, 1) with self.for_range(num_thread) as (loop, i): @@ -186,8 +185,7 @@ class ParallelUFunc(CDefinition): last_wq.last.assign(N) def _populate_context(self, contexts, common, num_thread): - ''' - Loop over all threads and populate contexts for each of them. + '''loop over all threads and populate contexts for each of them. ''' ONE = self.constant(num_thread.type, 1) with self.for_range(num_thread) as (loop, i): @@ -198,8 +196,7 @@ class ParallelUFunc(CDefinition): self.constant_null(cur_ctxt.completed.type)) class ParallelUFuncPosixMixin(object): - ''' - Implements _dispatch_worker to use pthread. + '''ParallelUFunc mixin that implements _dispatch_worker to use pthread. ''' def _dispatch_worker(self, worker, contexts, num_thread): api = PThreadAPI(self) @@ -219,7 +216,10 @@ class ParallelUFuncPosixMixin(object): api.pthread_join(threads[i], NULL) class UFuncCore(CDefinition): - ''' + '''core work of a ufunc worker thread + + Subclass to implement UFuncCore._do_work + Generates the workqueue handling and work stealing and invoke the work function for each work item. ''' @@ -242,8 +242,7 @@ class UFuncCore(CDefinition): self.ret() def _do_workqueue(self, common, workqueue, tid, completed): - ''' - Process local workqueue. + '''process local workqueue. ''' ZERO = self.constant_null(C.int) @@ -264,8 +263,7 @@ class UFuncCore(CDefinition): completed += self.constant(completed.type, 1) def _do_work_stealing(self, common, tid, completed): - ''' - Steal work from other workqueues. + '''steal work from other workqueues. ''' # self.debug("start work stealing", tid) steal_continue = self.var(C.int, 1) @@ -283,8 +281,7 @@ class UFuncCore(CDefinition): def _do_work_stealing_innerloop(self, common, steal_continue, tid, completed): - ''' - Loop over all other threads and try to steal work. + '''loop over all other threads and try to steal work. ''' with self.for_range(common.num_thread) as (loop, i): with self.ifelse( i != tid ) as ifelse: @@ -296,10 +293,10 @@ class UFuncCore(CDefinition): def _do_work_stealing_check(self, common, otherqueue, steal_continue, tid, completed): - ''' - Check the workqueue for any remaining work and steal it. + '''check the workqueue for any remaining work and steal it. ''' otherqueue.Lock() + # Acquired ONE = self.constant(otherqueue.last.type, 1) STEAL_CONTINUE = self.constant(steal_continue.type, 1) with self.ifelse(otherqueue.next < otherqueue.last) as ifelse: @@ -318,14 +315,18 @@ class UFuncCore(CDefinition): with ifelse.otherwise(): otherqueue.Unlock() + # Released def _do_work(self, common, item, tid): - ''' - Prepare to call the actual work function. + '''prepare to call the actual work function + + Implementation depends on number and type of arguments. ''' raise NotImplementedError class SpecializedParallelUFunc(CDefinition): + '''a generic ufunc that wraps ParallelUFunc, UFuncCore and the workload + ''' _argtys_ = [ ('args', C.pointer(C.char_p)), ('dimensions', C.pointer(C.intp)), @@ -343,12 +344,16 @@ class SpecializedParallelUFunc(CDefinition): @classmethod def specialize(cls, pufunc_def, core_def, func_def): + '''specialize to a combination of ParallelUFunc, UFuncCore and workload + ''' cls._name_ = 'specialized_%s_%s_%s'% (pufunc_def, core_def, func_def) cls.PUFuncDef = pufunc_def cls.CoreDef = core_def cls.FuncDef = func_def class PThreadAPI(CExternal): + '''external declaration of pthread API + ''' pthread_t = C.void_p pthread_create = Type.function(C.int,