more cleanup and doc

This commit is contained in:
Siu Kwan Lam 2012-08-08 15:29:16 -07:00
commit 3009d8468f
2 changed files with 50 additions and 29 deletions

View file

@ -17,8 +17,7 @@ from llvm_cbuilder import *
import llvm_cbuilder.shortnames as C
class WorkQueue(CStruct):
'''
Structure for workqueue for parallel-ufunc.
'''structure for workqueue for parallel-ufunc.
'''
_fields_ = [
@ -29,8 +28,7 @@ class WorkQueue(CStruct):
def Lock(self):
'''
Inline the lock procedure.
'''inline the lock procedure.
'''
with self.parent.loop() as loop:
with loop.condition() as setcond:
@ -45,8 +43,7 @@ class WorkQueue(CStruct):
pass
def Unlock(self):
'''
Inline the unlock procedure.
'''inline the unlock procedure.
'''
unlocked = self.parent.constant(self.lock.type, 0)
locked = self.parent.constant(self.lock.type, 1)
@ -61,8 +58,7 @@ class WorkQueue(CStruct):
class ContextCommon(CStruct):
'''
Structure for thread-shared context information in parallel-ufunc.
'''structure for thread-shared context information in parallel-ufunc.
'''
_fields_ = [
# loop ufunc args
@ -77,8 +73,7 @@ class ContextCommon(CStruct):
]
class Context(CStruct):
'''
Structure for thread-specific context information in parallel-ufunc.
'''structure for thread-specific context information in parallel-ufunc.
'''
_fields_ = [
('common', C.pointer(ContextCommon.llvm_type())),
@ -87,7 +82,11 @@ class Context(CStruct):
]
class ParallelUFunc(CDefinition):
'''
'''the generic parallel vectorize mechanism
Can be specialized to the maximum number of threads on the platform.
Platform dependent threading function is implemented in
def _dispatch_worker(self, worker, contexts, num_thread):
@ -107,7 +106,8 @@ class ParallelUFunc(CDefinition):
@classmethod
def specialize(cls, num_thread):
print num_thread
'''specialize to the maximum # of thread
'''
cls._name_ = 'parallel_ufunc_%d' % num_thread
cls.ThreadCount = num_thread
@ -172,8 +172,7 @@ class ParallelUFunc(CDefinition):
self.ret()
def _populate_workqueues(self, workqueues, N, ChunkSize, num_thread):
'''
Loop over all threads and populate the workqueue for each of them.
'''loop over all threads and populate the workqueue for each of them.
'''
ONE = self.constant(num_thread.type, 1)
with self.for_range(num_thread) as (loop, i):
@ -186,8 +185,7 @@ class ParallelUFunc(CDefinition):
last_wq.last.assign(N)
def _populate_context(self, contexts, common, num_thread):
'''
Loop over all threads and populate contexts for each of them.
'''loop over all threads and populate contexts for each of them.
'''
ONE = self.constant(num_thread.type, 1)
with self.for_range(num_thread) as (loop, i):
@ -198,8 +196,7 @@ class ParallelUFunc(CDefinition):
self.constant_null(cur_ctxt.completed.type))
class ParallelUFuncPosixMixin(object):
'''
Implements _dispatch_worker to use pthread.
'''ParallelUFunc mixin that implements _dispatch_worker to use pthread.
'''
def _dispatch_worker(self, worker, contexts, num_thread):
api = PThreadAPI(self)
@ -219,7 +216,10 @@ class ParallelUFuncPosixMixin(object):
api.pthread_join(threads[i], NULL)
class UFuncCore(CDefinition):
'''
'''core work of a ufunc worker thread
Subclass to implement UFuncCore._do_work
Generates the workqueue handling and work stealing and invoke
the work function for each work item.
'''
@ -242,8 +242,7 @@ class UFuncCore(CDefinition):
self.ret()
def _do_workqueue(self, common, workqueue, tid, completed):
'''
Process local workqueue.
'''process local workqueue.
'''
ZERO = self.constant_null(C.int)
@ -264,8 +263,7 @@ class UFuncCore(CDefinition):
completed += self.constant(completed.type, 1)
def _do_work_stealing(self, common, tid, completed):
'''
Steal work from other workqueues.
'''steal work from other workqueues.
'''
# self.debug("start work stealing", tid)
steal_continue = self.var(C.int, 1)
@ -283,8 +281,7 @@ class UFuncCore(CDefinition):
def _do_work_stealing_innerloop(self, common, steal_continue, tid,
completed):
'''
Loop over all other threads and try to steal work.
'''loop over all other threads and try to steal work.
'''
with self.for_range(common.num_thread) as (loop, i):
with self.ifelse( i != tid ) as ifelse:
@ -296,10 +293,10 @@ class UFuncCore(CDefinition):
def _do_work_stealing_check(self, common, otherqueue, steal_continue, tid,
completed):
'''
Check the workqueue for any remaining work and steal it.
'''check the workqueue for any remaining work and steal it.
'''
otherqueue.Lock()
# Acquired
ONE = self.constant(otherqueue.last.type, 1)
STEAL_CONTINUE = self.constant(steal_continue.type, 1)
with self.ifelse(otherqueue.next < otherqueue.last) as ifelse:
@ -318,14 +315,18 @@ class UFuncCore(CDefinition):
with ifelse.otherwise():
otherqueue.Unlock()
# Released
def _do_work(self, common, item, tid):
'''
Prepare to call the actual work function.
'''prepare to call the actual work function
Implementation depends on number and type of arguments.
'''
raise NotImplementedError
class SpecializedParallelUFunc(CDefinition):
'''a generic ufunc that wraps ParallelUFunc, UFuncCore and the workload
'''
_argtys_ = [
('args', C.pointer(C.char_p)),
('dimensions', C.pointer(C.intp)),
@ -343,12 +344,16 @@ class SpecializedParallelUFunc(CDefinition):
@classmethod
def specialize(cls, pufunc_def, core_def, func_def):
'''specialize to a combination of ParallelUFunc, UFuncCore and workload
'''
cls._name_ = 'specialized_%s_%s_%s'% (pufunc_def, core_def, func_def)
cls.PUFuncDef = pufunc_def
cls.CoreDef = core_def
cls.FuncDef = func_def
class PThreadAPI(CExternal):
'''external declaration of pthread API
'''
pthread_t = C.void_p
pthread_create = Type.function(C.int,