{ "metadata": { "name": "parallel_vectorize" }, "nbformat": 2, "worksheets": [ { "cells": [ { "cell_type": "markdown", "source": [ "Parallel Vectorize", "------------------", "", "The `parallel_vectorize.py` module contains a set of llvmpy code generators", "for creating mulithreaded _ufunc_. ", "It depends on the new `numpy.fromfunc` for turning arbitrary function pointers into _ufunc_.", "", "From LLVM Function", "------------------", "", "The `parallel_vectorize_from_func` method generates multithreaded _ufunc_ from LLVM functions.", "", "First, we will implement a workload function:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from llvm_cbuilder import *", "from llvm_cbuilder import shortnames as C", "from llvm.core import *", "", "# Implement a workload", "class Square(CDefinition):", " _name_ = 'square'", " _retty_ = C.double # 1 output: double", " _argtys_ = [('x', C.double)] # 1 input: double", " ", " def body(self, x):", " self.ret(x * x)", "", "m = Module.new('my_module')", "llvm_square = Square()(m) # Generate a llvm function", "print(llvm_square) " ], "language": "python", "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "", "define double @square(double %x) {", "decl:", " %x1 = alloca double", " br label %body", "", "body: ; preds = %decl", " store double %x, double* %x1", " %0 = load double* %x1", " %1 = load double* %x1", " %2 = fmul double %0, %1", " ret double %2", "}", "" ] } ], "prompt_number": 1 }, { "cell_type": "markdown", "source": [ "Then, we will generate a _ufunc_ from `llvm_square`:" ] }, { "cell_type": "code", "collapsed": true, "input": [ "from llvm.ee import *", "engine = EngineBuilder.new(m).create() # Generate JIT engine", "", "from parallel_vectorize import parallel_vectorize_from_func", "ufunc_square = parallel_vectorize_from_func(llvm_square, engine) # Generate UFunc" ], "language": "python", "outputs": [], "prompt_number": 2 }, { "cell_type": "markdown", "source": [ "We are ready to use `ufunc_square` as a regular _ufunc_." ] }, { "cell_type": "code", "collapsed": false, "input": [ "import numpy as np", "A = np.arange(10., dtype=np.double)", "ufunc_square(A)" ], "language": "python", "outputs": [ { "output_type": "pyout", "prompt_number": 3, "text": [ "array([ 0., 1., 4., 9., 16., 25., 36., 49., 64., 81.])" ] } ], "prompt_number": 3 }, { "cell_type": "markdown", "source": [ "Here's another example that uses three inputs:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "class SumOfThree(CDefinition):", " _name_ = 'sum.of.three'", " _retty_ = C.int", " _argtys_ = [('x', C.int),", " ('y', C.int),", " ('z', C.int)]", " def body(self, x, y, z):", " self.ret( x + y + z )", "", "llvm_sum3 = SumOfThree()(m)", "ufunc_sum3 = parallel_vectorize_from_func(llvm_sum3, engine)", "A = np.arange(10, dtype=np.int32)", "B = A * 10", "C = A * 100", "ufunc_sum3(A, B, C)" ], "language": "python", "outputs": [ { "output_type": "pyout", "prompt_number": 4, "text": [ "array([ 0, 111, 222, 333, 444, 555, 666, 777, 888, 999], dtype=int32)" ] } ], "prompt_number": 4 }, { "cell_type": "markdown", "source": [ "* * *", "", "Internals", "---------", "", "There are four functions behind each multithreaded _ufunc_.", "", "1. the workload function (user defined);", "2. the thread worker function (`UFuncCoreGeneric`);", "3. the thread manager function (`ParallelUFuncPlatform`);", "4. the ufunc entry point function (`SpecializedParallelUFunc`).", "", "**UFuncCoreGeneric** specializes to a llvm function type.", "**It currently understands simple builtin scalar types (integers, float, double) only as arguments and return-type for the workload function.**", "It sends work items to the workload function and performs work-stealing when it has finished its own workqueue.", "Work-stealing uses atomic compare-exchange (or CAS) instruction to acquire ownership of a workqueue.", "Work-stealing is implemented in the `UFuncCore._do_work_stealing`.", "It can be disabled on platform that does not support atomic operations.", "", "**ParallelUFuncPlatform** specializes to the maximum number of threads. ", "It divides all works equally among all threads.", "Each thread executes the function generated by `UFuncCoreGeneric` once.", "", "**SpecializedParallelUFunc** is the specialized _ufunc_ entry point for a specific combination of ", "workload, UFuncCoreGeneric and ParallelUFuncPlatform.", "", "Here's an example that uses `SpecializedParallelUFunc` directly for the `SumOfThree` workload." ] }, { "cell_type": "code", "collapsed": false, "input": [ "import parallel_vectorize as pv", "# specialize", "def_spuf = pv.SpecializedParallelUFunc(pv.ParallelUFuncPlatform(num_thread=2),", " pv.UFuncCoreGeneric(llvm_sum3.type.pointee),", " CFuncRef(llvm_sum3))", "# define", "spuf = def_spuf(m)", "print(spuf.name)" ], "language": "python", "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "specialized_parallel_ufunc_2_ufunc_worker.i32.i32.i32.i32_sum.of.three" ] } ], "prompt_number": 5 }, { "cell_type": "markdown", "source": [ "`CFuncRef` also accepts arbitrary function pointer as long as the function type is provided." ] }, { "cell_type": "code", "collapsed": false, "input": [ "# specialize", "fnty = llvm_sum3.type.pointee", "sum3ptr = engine.get_pointer_to_function(llvm_sum3)", "print(\"as function pointer: %x\" % sum3ptr)", "def_spuf = pv.SpecializedParallelUFunc(pv.ParallelUFuncPlatform(num_thread=2),", " pv.UFuncCoreGeneric(fnty),", " CFuncRef('sum3.as.ptr', fnty, sum3ptr)) # name, type, ptr", "# define", "spuf = def_spuf(m)", "print(spuf.name)" ], "language": "python", "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "as function pointer: 7f0bfc090740", "specialized_parallel_ufunc_2_ufunc_worker.i32.i32.i32.i32_sum3.as.ptr" ] } ], "prompt_number": 6 } ] } ] }