diff --git a/llvm_cbuilder/builder.py b/llvm_cbuilder/builder.py index 16f6daf..ee308d6 100644 --- a/llvm_cbuilder/builder.py +++ b/llvm_cbuilder/builder.py @@ -40,6 +40,10 @@ def _change_block_temporarily(builder, bb): yield builder.position_at_end(origbb) +@contextlib.contextmanager +def _change_block_temporarily_dummy(*args): + yield + class _IfElse(object): def __init__(self, parent, cond): self.parent = parent @@ -58,14 +62,17 @@ class _IfElse(object): yield - self._to_close.extend([self._bbif, self._bbelse]) + self._to_close.extend([self._bbif, self._bbelse, builder.basic_block]) @contextlib.contextmanager def otherwise(self): - self.parent.builder.position_at_end(self._bbelse) + builder = self.parent.builder + builder.position_at_end(self._bbelse) yield + self._to_close.append(builder.basic_block) def close(self): + self._to_close.append(self.parent.builder.basic_block) bbend = self.parent.function.append_basic_block('if.end') builder = self.parent.builder for bb in self._to_close: @@ -111,7 +118,11 @@ class _Loop(object): self.parent.builder.branch(self._bbcond) def close(self): - self.parent.builder.position_at_end(self._bbend) + builder = self.parent.builder +# if not _is_block_terminated(builder.basic_block): +# with _change_block_temporarily(builder, builder.basic_block): +# builder.branch(self._bbend) + builder.position_at_end(self._bbend) class CBuilder(object): ''' @@ -148,7 +159,10 @@ class CBuilder(object): def debug(self, *args): type_mapper = { + 'i8' : '%c', + 'i16': '%hd', 'i32': '%d', + 'i64': '%ld', 'double': '%e', } itemsfmt = [] @@ -185,9 +199,23 @@ class CBuilder(object): else: return CVar(self, ptr) + def var_copy(self, val, name=''): + return self.var(val.type, val, name=name) + def array(self, ty, count, name=''): - with _change_block_temporarily(self.builder, self.declare_block): - if not isinstance(count, lc.Value): + if isinstance(count, int) or isinstance(count, lc.Constant): + contexthelper = _change_block_temporarily + else: + contexthelper = _change_block_temporarily_dummy + + with contexthelper(self.builder, self.declare_block): + is_cstruct = _is_cstruct(ty) + if is_cstruct: + cstruct = ty + ty = ty.llvm_type() + if isinstance(count, CValue): + count = count.value + elif not isinstance(count, lc.Value): count = self.constant(lc.Type.int(), count).value ptr = self.builder.alloca_array(ty, count, name=name) return CArray(self, ptr) @@ -341,6 +369,40 @@ class CBuilder(object): def alignment(self, ty): return self.target_data.abi_alignment(ty) + def unreachable(self): + self.builder.unreachable() + +class CDefinition(CBuilder): + ''' + Inherit this class to for defining functions + ''' + _name_ = 'unamed' # name of the function; should overide in subclass + _retty_ = lc.Type.void() # return type; can overide in subclass + _argtys_ = [] # a list of tuple(name, type); can overide in subclass + + @classmethod + def define(cls, module, **kws): + functype = lc.Type.function(cls._retty_, [v for k, v in cls._argtys_]) + func = module.get_or_insert_function(functype, name=cls._name_) + if not func.is_declaration: # already defined? + raise NameError('Function %s has already been defined' % cls.name) + + # Name all arguments + for i, (name, _) in enumerate(cls._argtys_): + func.args[i].name = name + + # Create builder and populate body + cbuilder = cls(func) + cbuilder.body(*cbuilder.args, **kws) + cbuilder.close() + return func + + def body(self): + ''' + Overide this function to define the body. + ''' + raise NotImplementedError + class CValue(object): ''' = Signess = @@ -475,13 +537,13 @@ class CValue(object): return make(builder.bitcast(self.value, ty)) elif self.is_int: if _is_int(ty): - if self.type.width > ty.width: + if self.type.width < ty.width: if not unsigned: return make(self.parent.builder.sext(self.value, ty)) else: return make(self.parent.builder.zext(self.value, ty)) else: - return make(self.parent.trunc(self.value, ty)) + return make(self.parent.builder.trunc(self.value, ty)) elif _is_real(ty): if not unsigned: return make(self.parent.builder.sitofp(self.value, ty)) @@ -543,7 +605,46 @@ class CValue(object): def _ensure_is_pointer(self): if not self.is_pointer: - raise TypeError("Must be a pointer") + raise TypeError("Must be a pointer; got %s" % self.type) + + def __getitem__(self, idx): + self._ensure_is_pointer() + if not isinstance(idx, CValue): + idx = self.parent.constant(lc.Type.int(), idx) + bldr = self.parent.builder + ptr = bldr.gep(self.value, [idx.value]) + return CVar(self.parent, ptr) + + def load(self, volatile=False): + self._ensure_is_pointer() + loaded = self.parent.builder.load(self.value, volatile=volatile) + return CTemp(self.parent, loaded) + + def store(self, val, volatile=False): + self._ensure_is_pointer() + self.parent.builder.store(val.value, self.value, volatile=volatile) + + def atomic_load(self, ordering, align=None, crossthread=True): + self._ensure_is_pointer() + if align is None: + align = self.parent.alignment(self.type.pointee) + inst = self.parent.builder.atomic_load(self.value, ordering, align, + crossthread=crossthread) + return CTemp(self.parent, inst) + + def atomic_store(self, value, ordering, align=None, crossthread=True): + self._ensure_is_pointer() + if align is None: + align = self.parent.alignment(self.type.pointee) + self.parent.builder.atomic_store(value.ptr, self.value, ordering, + align=align, crossthread=crossthread) + + def atomic_cmpxchg(self, old, new, ordering, crossthread=True): + self._ensure_is_pointer() + inst = self.parent.builder.atomic_cmpxchg(self.value, old.value, + new.value, ordering, + crossthread=crossthread) + return CTemp(self.parent, inst) class CFunc(CValue): @@ -552,8 +653,14 @@ class CFunc(CValue): self.function = func def __call__(self, *args): - arg_value = _list_values(args) - res = self.parent.builder.call(self.function, arg_value) + arg_values = _list_values(args) + ftype = self.function.type.pointee + for i, (exp, got) in enumerate(zip(ftype.args, arg_values)): + if exp != got.type: + raise TypeError("At call to %s, " + "argument %d mismatch: %s != %s" + % (self.function.name, i, exp, got.type)) + res = self.parent.builder.call(self.function, arg_values) return CTemp(self.parent, res) @property @@ -601,7 +708,6 @@ class CVar(CValue): def __imod__(self, rhs): return self._inplace_binop('mod')(rhs) - def _inplace_bitwise(self, op): def wrapped(rhs): res = self._use_bitwise(op)(rhs) @@ -629,46 +735,23 @@ class CVar(CValue): return self.parent.builder.load(self.ptr) def assign(self, val): + self._ensure_same_type(val) self.parent.builder.store(val.value, self.ptr) @property def type(self): return self.ptr.type.pointee - def load(self, volatile=False): - self._ensure_is_pointer() - loaded = self.parent.builder.load(self.value, volatile=volatile) - return CTemp(self.parent, loaded) - - def store(self, val, volatile=False): - self._ensure_is_pointer() - self.parent.builder.store(val.value, self.value, volatile=volatile) - - def atomic_load(self, ordering, align=None, crossthread=True): - self._ensure_is_pointer() - if align is None: - align = self.parent.alignment(self.type.pointee) - inst = self.parent.builder.atomic_load(self.value, ordering, align, - crossthread=crossthread) - return CTemp(self.parent, inst) - - def atomic_store(self, value, ordering, align=None, crossthread=True): - self._ensure_is_pointer() - if align is None: - align = self.parent.alignment(self.type.pointee) - self.parent.builder.atomic_store(value.value, self.value, ordering, - align=align, crossthread=crossthread) - - def atomic_cmpxchg(self, old, new, ordering, crossthread=True): - self._ensure_is_pointer() - inst = self.parent.builder.atomic_cmpxchg(self.value, old.value, - new.value, ordering, - crossthread=crossthread) - return CTemp(self.parent, inst) - def reference(self): return CTemp(self.parent, self.ptr) + def as_struct(self, cstruct_class, volatile=False): + if _is_pointer(self.type): + ptr = self.parent.builder.load(self.ptr, volatile=volatile) + return cstruct_class(self.parent, ptr) + else: + return cstruct_class(self.parent, self.ptr) + class CArray(CValue): def __init__(self, parent, base): super(CArray, self).__init__(parent) @@ -678,19 +761,23 @@ class CArray(CValue): def value(self): return self.base_ptr + def reference(self): + return CTemp(self.parent, self.base_ptr) + @property def type(self): return self.base_ptr.type - def __getitem__(self, idx): - self._ensure_is_pointer() - builder = self.parent.builder - if isinstance(idx, CValue): - idx = idx.value - elif not isinstance(idx, lc.Value): - idx = self.parent.constant(lc.Type.int(), idx).value - ptr = builder.gep(self.value, [idx]) - return CVar(self.parent, ptr) +## Moved to CValue +# def __getitem__(self, idx): +# self._ensure_is_pointer() +# builder = self.parent.builder +# if isinstance(idx, CValue): +# idx = idx.value +# elif not isinstance(idx, lc.Value): +# idx = self.parent.constant(lc.Type.int(), idx).value +# ptr = builder.gep(self.value, [idx]) +# return CVar(self.parent, ptr) class CStruct(CValue): @classmethod @@ -700,8 +787,32 @@ class CStruct(CValue): def __init__(self, parent, ptr): super(CStruct, self).__init__(parent) makeind = lambda x: self.parent.constant(lc.Type.int(), x).value + self.ptr = ptr for i, (fd, _) in enumerate(self._fields_): gep = self.parent.builder.gep(ptr, [makeind(0), makeind(i)]) + gep.name = "%s.%s" % (type(self).__name__, fd) setattr(self, fd, CVar(self.parent, gep)) + def reference(self): + return CTemp(self.parent, self.ptr) + +class CExternal(object): + def __init__(self, cbuilder): + is_func = lambda x: isinstance(x, lc.FunctionType) + non_magic = lambda s: not ( s.startswith('__') and s.endswith('__') ) + + to_declare = [] + for fname in filter(non_magic, vars(type(self))): + ftype = getattr(self, fname) + if is_func(ftype): + to_declare.append((fname, ftype)) + + mod = cbuilder.function.module + for fname, ftype in to_declare: + func = mod.get_or_insert_function(ftype, name=fname) + if func.type.pointee != ftype: + raise NameError("Function has already been declared " + "with a different type: %s != %s" + % (func.type, ftype) ) + setattr(self, fname, CFunc(cbuilder, func)) diff --git a/llvm_cbuilder/executor.py b/llvm_cbuilder/executor.py index a2f558e..bd7d78c 100644 --- a/llvm_cbuilder/executor.py +++ b/llvm_cbuilder/executor.py @@ -31,7 +31,7 @@ MAP_CTYPES = { class CExecutor(object): def __init__(self, mod_or_engine): if isinstance(mod_or_engine, Module): - self.engine = le.EngineBuilder.new(mod_or_engine).create() + self.engine = le.EngineBuilder.new(mod_or_engine).opt(3).create() else: self.engine = mod_or_engine diff --git a/llvm_cbuilder/shortnames.py b/llvm_cbuilder/shortnames.py index d3fc5af..beaaad3 100644 --- a/llvm_cbuilder/shortnames.py +++ b/llvm_cbuilder/shortnames.py @@ -16,6 +16,7 @@ double = Type.double() pointer = Type.pointer void_p = pointer(char) +char_p = pointer(char) # platform dependent diff --git a/parallel_vectorize.py b/parallel_vectorize.py new file mode 100644 index 0000000..1f83ad5 --- /dev/null +++ b/parallel_vectorize.py @@ -0,0 +1,318 @@ +from llvm.core import * +from llvm.passes import * + +from llvm_cbuilder import * +import llvm_cbuilder.shortnames as C + +class WorkQueue(CStruct): + _fields_ = [ + ('next', C.intp), + ('last', C.intp), + ('lock', C.int), + ] + + + def Lock(self): + with self.parent.loop() as loop: + with loop.condition() as setcond: + unlocked = self.parent.constant(self.lock.type, 0) + locked = self.parent.constant(self.lock.type, 1) + + res = self.lock.reference().atomic_cmpxchg(unlocked, locked, + ordering='acquire') + setcond( res != unlocked ) + + with loop.body(): + pass + + def Unlock(self): + unlocked = self.parent.constant(self.lock.type, 0) + locked = self.parent.constant(self.lock.type, 1) + + res = self.lock.reference().atomic_cmpxchg(locked, unlocked, + ordering='release') + + with self.parent.ifelse( res != locked ) as ifelse: + with ifelse.then(): + # This shall kill the program + self.parent.unreachable() + + +class ContextCommon(CStruct): + _fields_ = [ + # loop ufunc args + ('args', C.pointer(C.char_p)), + ('dimensions', C.pointer(C.intp)), + ('steps', C.pointer(C.intp)), + ('data', C.void_p), + # specifics for work queues + ('func', C.void_p), + ('num_thread', C.int), + ('workqueues', C.pointer(WorkQueue.llvm_type())), + ] + +class Context(CStruct): + _fields_ = [ + ('common', C.pointer(ContextCommon.llvm_type())), + ('id', C.int), + ('completed', C.intp), + ] + +class ParallelUFunc(CDefinition): + _name_ = 'parallel_ufunc' + _retty_ = C.void + _argtys_ = [ + ('func', C.void_p), + ('worker', C.void_p), + ('args', C.pointer(C.char_p)), + ('dimensions', C.pointer(C.intp)), + ('steps', C.pointer(C.intp)), + ('data', C.void_p), + ] + + def body(self, func, worker, args, dimensions, steps, data, ThreadCount=1): + + + common = self.var(ContextCommon, name='common') + workqueues = self.array(WorkQueue, ThreadCount, name='workqueues') + contexts = self.array(Context, ThreadCount, name='contexts') + + num_thread = self.var(C.int, ThreadCount, name='num_thread') + + common.args.assign(args) + common.dimensions.assign(dimensions) + common.steps.assign(steps) + common.data.assign(data) + common.func.assign(func) + common.num_thread.assign(num_thread.cast(C.int)) + common.workqueues.assign(workqueues.reference()) + + N = dimensions[0] + ChunkSize = self.var_copy(N / num_thread.cast(N.type)) + ChunkSize_NULL = self.constant_null(ChunkSize.type) + with self.ifelse(ChunkSize == ChunkSize_NULL) as ifelse: + with ifelse.then(): + ChunkSize.assign(self.constant(ChunkSize.type, 1)) + num_thread.assign(N.cast(num_thread.type)) + + self._populate_workqueues(workqueues, N, ChunkSize, num_thread) + self._populate_context(contexts, common, num_thread) + + self._dispatch_worker(worker, contexts, num_thread) + + # Check for race condition + total_completed = self.var(C.intp, 0, name='total_completed') + for t in range(ThreadCount): + cur_ctxt = contexts[t].as_struct(Context) + total_completed += cur_ctxt.completed + self.debug(cur_ctxt.id, 'completed', cur_ctxt.completed) + + with self.ifelse( total_completed == N ) as ifelse: + with ifelse.then(): + self.debug("All is well!") + with ifelse.otherwise(): + self.debug("ERROR: race occurred!") + + self.ret() + + def _populate_workqueues(self, workqueues, N, ChunkSize, num_thread): + i = self.var(num_thread.type, 0, name='i') + ONE = self.constant(i.type, 1) + with self.loop() as loop: + with loop.condition() as setcond: + setcond( i < num_thread ) + with loop.body(): + cur_wq = workqueues[i].as_struct(WorkQueue) + cur_wq.next.assign(i.cast(ChunkSize.type) * ChunkSize) + cur_wq.last.assign((i + ONE).cast(ChunkSize.type) * ChunkSize) + cur_wq.lock.assign(self.constant(C.int, 0)) + # increment + i += ONE + # end loop + last_wq = workqueues[num_thread - ONE].as_struct(WorkQueue) + last_wq.last.assign(N) + + def _populate_context(self, contexts, common, num_thread): + i = self.var(num_thread.type, 0, name='i') + ONE = self.constant(i.type, 1) + with self.loop() as loop: + with loop.condition() as setcond: + setcond( i < num_thread ) + with loop.body(): + cur_ctxt = contexts[i].as_struct(Context) + cur_ctxt.common.assign(common.reference()) + cur_ctxt.id.assign(i) + cur_ctxt.completed.assign(self.constant_null( + cur_ctxt.completed.type)) + # increment + i += ONE + + +class ParallelUFuncPosix(ParallelUFunc): + def _dispatch_worker(self, worker, contexts, num_thread): + api = PThreadAPI(self) + NULL = self.constant_null(C.void_p) + + threads = self.array(api.pthread_t, num_thread, name='threads') + + # self.debug("launch threads") + # TODO error handling + + i = self.var(num_thread.type, 0, name='i') + ONE = self.constant(i.type, 1) + with self.loop() as loop: + with loop.condition() as setcond: + setcond( i < num_thread ) + with loop.body(): + api.pthread_create(threads[i].reference(), NULL, worker, + contexts[i].reference().cast(C.void_p)) + i += ONE + + # self.debug("join threads") + i.assign(self.constant_null(i.type)) + with self.loop() as loop: + with loop.condition() as setcond: + setcond( i < num_thread ) + with loop.body(): + api.pthread_join(threads[i], NULL) + i += ONE + # self.debug("closing") + +class UFuncCore(CDefinition): + _name_ = 'ufunc_worker' + _argtys_ = [ + ('context', C.pointer(Context.llvm_type())), + ] + + def body(self, context): + context = context.as_struct(Context) + common = context.common.as_struct(ContextCommon) + tid = context.id + + # self.debug("start thread", tid, "/", common.num_thread) + workqueue = common.workqueues[tid].as_struct(WorkQueue) + + self._do_workqueue(common, workqueue, tid, context.completed) + self._do_work_stealing(common, tid, context.completed) # optional + + self.ret() + + def _do_workqueue(self, common, workqueue, tid, completed): + ''' + Process local workqueue. + ''' + ZERO = self.constant_null(C.int) + + with self.loop() as loop: + with loop.condition() as setcond: + setcond(ZERO == ZERO) # loop forever + + with loop.body(): + workqueue.Lock() + # Critical section + item = self.var_copy(workqueue.next, name='item') + workqueue.next += self.constant(item.type, 1) + last = self.var_copy(workqueue.last, name='last') + # Release + workqueue.Unlock() + + with self.ifelse( item >= last ) as ifelse: + with ifelse.then(): + loop.break_loop() + + self._do_work(common, item, tid, completed) + + def _do_work_stealing(self, common, tid, completed): + ''' + Steal work from other workqueues + ''' + # self.debug("start work stealing", tid) + incomplete_thread_ct = self.var(C.int, 1) + ITC_NULL = self.constant_null(incomplete_thread_ct.type) + with self.loop() as loop: + with loop.condition() as setcond: + setcond( incomplete_thread_ct != ITC_NULL ) + + with loop.body(): + incomplete_thread_ct.assign(ITC_NULL) + self._do_work_stealing_innerloop(common, incomplete_thread_ct, + tid, completed) + + def _do_work_stealing_innerloop(self, common, incomplete_thread_ct, tid, + completed): + i = self.var(C.int, 0) + with self.loop() as loop: + with loop.condition() as setcond: + setcond( i < common.num_thread ) + + with loop.body(): + with self.ifelse( i != tid ) as ifelse: + with ifelse.then(): + otherqueue = common.workqueues[i].as_struct(WorkQueue) + self._do_work_stealing_check(common, otherqueue, + incomplete_thread_ct, tid, + completed) + + # increment + i += self.constant(i.type, 1) + + def _do_work_stealing_check(self, common, otherqueue, + incomplete_thread_ct, tid, completed): + otherqueue.Lock() + ONE = self.constant(otherqueue.last.type, 1) + ITC_ONE = self.constant(incomplete_thread_ct.type, 1) + with self.ifelse(otherqueue.next < otherqueue.last) as ifelse: + with ifelse.then(): + otherqueue.last -= ONE + item = self.var_copy(otherqueue.last) + + otherqueue.Unlock() + # Released + + self._do_work(common, item, tid, completed) + + # Mark incomplete thread + incomplete_thread_ct.assign(ITC_ONE) + + with ifelse.otherwise(): + otherqueue.Unlock() + + def _do_work(self, common, item, tid, completed): + # self.debug(" tid", tid, "item", item) + completed += self.constant(completed.type, 1) + + +class PThreadAPI(CExternal): + pthread_t = C.void_p + + pthread_create = Type.function(C.int, + [C.pointer(pthread_t), # thread_t + C.void_p, # thread attr + C.void_p, # function + C.void_p]) # arg + + pthread_join = Type.function(C.int, [C.void_p, C.void_p]) + + +def _main(): + NUM_THREAD = 2 + module = Module.new(__name__) + + fpm = FunctionPassManager.new(module) + PassManagerBuilder.new().populate(fpm) + + f1 = ParallelUFuncPosix.define(module, ThreadCount=NUM_THREAD) + f2 = UFuncCore.define(module) + #print(module) +# print(f1) + print(f2) + module.verify() + + fpm.run(f1) + fpm.run(f2) + print('optimized'.center(80,'-')) + print(f2) + +if __name__ == '__main__': + _main() + diff --git a/test_parallel_vectorize.py b/test_parallel_vectorize.py new file mode 100644 index 0000000..b3ffb5c --- /dev/null +++ b/test_parallel_vectorize.py @@ -0,0 +1,73 @@ +from parallel_vectorize import * + +''' +void parallel_ufunc(void * func, void * worker, + char **args, npy_intp *dimensions, + npy_intp *steps, void *data) +''' +class Tester(CDefinition): + _name_ = 'tester' + + def body(self): + # depends + module = self.function.module + + ThreadCount = 2 + ArgCount = 2 + WorkCount = 1000000 + + parallel_ufunc = CFunc(self, ParallelUFuncPosix.define(module, + ThreadCount=ThreadCount)) + worker = CFunc(self, UFuncCore.define(module)) + + # real work + NULL = self.constant_null(C.void_p) + + args = self.array(C.char_p, 2, name='args') + + for t in range(ThreadCount): + args_for_thread = self.array(C.double, 4) + args[t].assign(args_for_thread.cast(C.char_p)) + + dims = self.array(C.intp, 1, name='dims') + dims[0].assign(self.constant(C.intp, WorkCount)) + + steps = self.array(C.intp, ArgCount, name='steps') + + for c in range(ArgCount): + steps[c].assign(self.constant(C.intp, 8)) + + parallel_ufunc(NULL, worker.cast(C.void_p), args, dims, steps, NULL) + + self.ret() + +def main(): + NUM_THREAD = 2 + module = Module.new(__name__) + + mpm = PassManager.new() + pmbuilder = PassManagerBuilder.new() + pmbuilder.opt_level = 3 + pmbuilder.populate(mpm) + + fntester = Tester.define(module) + +# print(module) + module.verify() + + mpm.run(module) + + print('optimized'.center(80,'-')) + print(module) + + # run + exe = CExecutor(module) + exe.engine.get_pointer_to_function(fntester) + func = exe.get_ctype_function(fntester, 'void') + + func() + + +if __name__ == '__main__': + main() +