From bcdf541c8c6ccc0070ab011a909f244f501676d6 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 12:58:19 +0100 Subject: [PATCH 1/8] Refactor setup_queue() - Stop sharing set objects across threads - Use a second queue to signal when producer threads are done - Use a single consumer thread to check dependencies and kick off new producers Signed-off-by: Aanand Prasad --- compose/parallel.py | 64 ++++++++++++++++++++++++++++++--------------- compose/service.py | 3 +++ 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index c629a1ab..79699236 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -1,6 +1,7 @@ from __future__ import absolute_import from __future__ import unicode_literals +import logging import operator import sys from threading import Thread @@ -14,6 +15,9 @@ from compose.cli.signals import ShutdownException from compose.utils import get_output_stream +log = logging.getLogger(__name__) + + def parallel_execute(objects, func, get_name, msg, get_deps=None): """Runs func on objects in parallel while ensuring that func is ran on object only after it is ran on all its dependencies. @@ -73,35 +77,53 @@ def setup_queue(objects, func, get_deps, get_name): get_deps = _no_deps results = Queue() - started = set() # objects being processed - finished = set() # objects which have been processed + output = Queue() - def do_op(obj): + def consumer(): + started = set() # objects being processed + finished = set() # objects which have been processed + + def ready(obj): + """ + Returns true if obj is ready to be processed: + - all dependencies have been processed + - obj is not already being processed + """ + return obj not in started and all( + dep not in objects or dep in finished + for dep in get_deps(obj) + ) + + while len(finished) < len(objects): + for obj in filter(ready, objects): + log.debug('Starting producer thread for {}'.format(obj)) + t = Thread(target=producer, args=(obj,)) + t.daemon = True + t.start() + started.add(obj) + + try: + event = results.get(timeout=1) + except Empty: + continue + + obj = event[0] + log.debug('Finished processing: {}'.format(obj)) + finished.add(obj) + output.put(event) + + def producer(obj): try: result = func(obj) results.put((obj, result, None)) except Exception as e: results.put((obj, None, e)) - finished.add(obj) - feed() + t = Thread(target=consumer) + t.daemon = True + t.start() - def ready(obj): - # Is object ready for performing operation - return obj not in started and all( - dep not in objects or dep in finished - for dep in get_deps(obj) - ) - - def feed(): - for obj in filter(ready, objects): - started.add(obj) - t = Thread(target=do_op, args=(obj,)) - t.daemon = True - t.start() - - feed() - return results + return output class ParallelStreamWriter(object): diff --git a/compose/service.py b/compose/service.py index ed45f078..05cfc7c6 100644 --- a/compose/service.py +++ b/compose/service.py @@ -135,6 +135,9 @@ class Service(object): self.networks = networks or {} self.options = options + def __repr__(self): + return ''.format(self.name) + def containers(self, stopped=False, one_off=False, filters={}): filters.update({'label': self.labels(one_off=one_off)}) From 141b96bb312d85753de2189227941512bd42f33e Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 17:46:13 +0100 Subject: [PATCH 2/8] Abort operations if their dependencies fail Signed-off-by: Aanand Prasad --- compose/parallel.py | 102 +++++++++++++++++++++--------------- tests/unit/parallel_test.py | 73 ++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 43 deletions(-) create mode 100644 tests/unit/parallel_test.py diff --git a/compose/parallel.py b/compose/parallel.py index 79699236..745d4635 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -32,7 +32,7 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): for obj in objects: writer.initialize(get_name(obj)) - q = setup_queue(objects, func, get_deps, get_name) + q = setup_queue(objects, func, get_deps) done = 0 errors = {} @@ -54,6 +54,8 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): elif isinstance(exception, APIError): errors[get_name(obj)] = exception.explanation writer.write(get_name(obj), 'error') + elif isinstance(exception, UpstreamError): + writer.write(get_name(obj), 'error') else: errors[get_name(obj)] = exception error_to_reraise = exception @@ -72,60 +74,74 @@ def _no_deps(x): return [] -def setup_queue(objects, func, get_deps, get_name): +def setup_queue(objects, func, get_deps): if get_deps is None: get_deps = _no_deps results = Queue() output = Queue() - def consumer(): - started = set() # objects being processed - finished = set() # objects which have been processed - - def ready(obj): - """ - Returns true if obj is ready to be processed: - - all dependencies have been processed - - obj is not already being processed - """ - return obj not in started and all( - dep not in objects or dep in finished - for dep in get_deps(obj) - ) - - while len(finished) < len(objects): - for obj in filter(ready, objects): - log.debug('Starting producer thread for {}'.format(obj)) - t = Thread(target=producer, args=(obj,)) - t.daemon = True - t.start() - started.add(obj) - - try: - event = results.get(timeout=1) - except Empty: - continue - - obj = event[0] - log.debug('Finished processing: {}'.format(obj)) - finished.add(obj) - output.put(event) - - def producer(obj): - try: - result = func(obj) - results.put((obj, result, None)) - except Exception as e: - results.put((obj, None, e)) - - t = Thread(target=consumer) + t = Thread(target=queue_consumer, args=(objects, func, get_deps, results, output)) t.daemon = True t.start() return output +def queue_producer(obj, func, results): + try: + result = func(obj) + results.put((obj, result, None)) + except Exception as e: + results.put((obj, None, e)) + + +def queue_consumer(objects, func, get_deps, results, output): + started = set() # objects being processed + finished = set() # objects which have been processed + failed = set() # objects which either failed or whose dependencies failed + + while len(finished) + len(failed) < len(objects): + pending = set(objects) - started - finished - failed + log.debug('Pending: {}'.format(pending)) + + for obj in pending: + deps = get_deps(obj) + + if any(dep in failed for dep in deps): + log.debug('{} has upstream errors - not processing'.format(obj)) + output.put((obj, None, UpstreamError())) + failed.add(obj) + elif all( + dep not in objects or dep in finished + for dep in deps + ): + log.debug('Starting producer thread for {}'.format(obj)) + t = Thread(target=queue_producer, args=(obj, func, results)) + t.daemon = True + t.start() + started.add(obj) + + try: + event = results.get(timeout=1) + except Empty: + continue + + obj, _, exception = event + if exception is None: + log.debug('Finished processing: {}'.format(obj)) + finished.add(obj) + else: + log.debug('Failed: {}'.format(obj)) + failed.add(obj) + + output.put(event) + + +class UpstreamError(Exception): + pass + + class ParallelStreamWriter(object): """Write out messages for operations happening in parallel. diff --git a/tests/unit/parallel_test.py b/tests/unit/parallel_test.py new file mode 100644 index 00000000..6be56015 --- /dev/null +++ b/tests/unit/parallel_test.py @@ -0,0 +1,73 @@ +from __future__ import absolute_import +from __future__ import unicode_literals + +import six +from docker.errors import APIError + +from compose.parallel import parallel_execute + + +web = 'web' +db = 'db' +data_volume = 'data_volume' +cache = 'cache' + +objects = [web, db, data_volume, cache] + +deps = { + web: [db, cache], + db: [data_volume], + data_volume: [], + cache: [], +} + + +def test_parallel_execute(): + results = parallel_execute( + objects=[1, 2, 3, 4, 5], + func=lambda x: x * 2, + get_name=six.text_type, + msg="Doubling", + ) + + assert sorted(results) == [2, 4, 6, 8, 10] + + +def test_parallel_execute_with_deps(): + log = [] + + def process(x): + log.append(x) + + parallel_execute( + objects=objects, + func=process, + get_name=lambda obj: obj, + msg="Processing", + get_deps=lambda obj: deps[obj], + ) + + assert sorted(log) == sorted(objects) + + assert log.index(data_volume) < log.index(db) + assert log.index(db) < log.index(web) + assert log.index(cache) < log.index(web) + + +def test_parallel_execute_with_upstream_errors(): + log = [] + + def process(x): + if x is data_volume: + raise APIError(None, None, "Something went wrong") + log.append(x) + + parallel_execute( + objects=objects, + func=process, + get_name=lambda obj: obj, + msg="Processing", + get_deps=lambda obj: deps[obj], + ) + + assert log == [cache] From af9526fb820f40a8b7eafb16d29f990b1696f4fe Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 18:30:28 +0100 Subject: [PATCH 3/8] Move queue logic out of parallel_execute() Signed-off-by: Aanand Prasad --- compose/parallel.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index 745d4635..8172d8ea 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -32,22 +32,13 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): for obj in objects: writer.initialize(get_name(obj)) - q = setup_queue(objects, func, get_deps) + events = parallel_execute_stream(objects, func, get_deps) - done = 0 errors = {} results = [] error_to_reraise = None - while done < len(objects): - try: - obj, result, exception = q.get(timeout=1) - except Empty: - continue - # See https://github.com/docker/compose/issues/189 - except thread.error: - raise ShutdownException() - + for obj, result, exception in events: if exception is None: writer.write(get_name(obj), 'done') results.append(result) @@ -59,7 +50,6 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): else: errors[get_name(obj)] = exception error_to_reraise = exception - done += 1 for obj_name, error in errors.items(): stream.write("\nERROR: for {} {}\n".format(obj_name, error)) @@ -74,7 +64,7 @@ def _no_deps(x): return [] -def setup_queue(objects, func, get_deps): +def parallel_execute_stream(objects, func, get_deps): if get_deps is None: get_deps = _no_deps @@ -85,7 +75,17 @@ def setup_queue(objects, func, get_deps): t.daemon = True t.start() - return output + done = 0 + + while done < len(objects): + try: + yield output.get(timeout=1) + done += 1 + except Empty: + continue + # See https://github.com/docker/compose/issues/189 + except thread.error: + raise ShutdownException() def queue_producer(obj, func, results): From 3720b50c3b8c5534c0b139962f7f6d95dd32a066 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 18:48:07 +0100 Subject: [PATCH 4/8] Extract get_deps test helper Signed-off-by: Aanand Prasad --- tests/unit/parallel_test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/unit/parallel_test.py b/tests/unit/parallel_test.py index 6be56015..889af4e2 100644 --- a/tests/unit/parallel_test.py +++ b/tests/unit/parallel_test.py @@ -22,6 +22,10 @@ deps = { } +def get_deps(obj): + return deps[obj] + + def test_parallel_execute(): results = parallel_execute( objects=[1, 2, 3, 4, 5], @@ -44,7 +48,7 @@ def test_parallel_execute_with_deps(): func=process, get_name=lambda obj: obj, msg="Processing", - get_deps=lambda obj: deps[obj], + get_deps=get_deps, ) assert sorted(log) == sorted(objects) @@ -67,7 +71,7 @@ def test_parallel_execute_with_upstream_errors(): func=process, get_name=lambda obj: obj, msg="Processing", - get_deps=lambda obj: deps[obj], + get_deps=get_deps, ) assert log == [cache] From ffab27c0496769fade7f2aa32bd86f66a3c9c0e5 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 18:53:16 +0100 Subject: [PATCH 5/8] Test events coming out of parallel_execute_stream in error case Signed-off-by: Aanand Prasad --- tests/unit/parallel_test.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/unit/parallel_test.py b/tests/unit/parallel_test.py index 889af4e2..9ed1b362 100644 --- a/tests/unit/parallel_test.py +++ b/tests/unit/parallel_test.py @@ -5,6 +5,8 @@ import six from docker.errors import APIError from compose.parallel import parallel_execute +from compose.parallel import parallel_execute_stream +from compose.parallel import UpstreamError web = 'web' @@ -75,3 +77,14 @@ def test_parallel_execute_with_upstream_errors(): ) assert log == [cache] + + events = [ + (obj, result, type(exception)) + for obj, result, exception + in parallel_execute_stream(objects, process, get_deps) + ] + + assert (cache, None, type(None)) in events + assert (data_volume, None, APIError) in events + assert (db, None, UpstreamError) in events + assert (web, None, UpstreamError) in events From 54b6fc42195da8f7ca1b45828e49ce5e378baee0 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 18:54:02 +0100 Subject: [PATCH 6/8] Refactor so there's only one queue Signed-off-by: Aanand Prasad --- compose/parallel.py | 79 +++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 45 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index 8172d8ea..b3ca0153 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -69,24 +69,33 @@ def parallel_execute_stream(objects, func, get_deps): get_deps = _no_deps results = Queue() - output = Queue() - t = Thread(target=queue_consumer, args=(objects, func, get_deps, results, output)) - t.daemon = True - t.start() + started = set() # objects being processed + finished = set() # objects which have been processed + failed = set() # objects which either failed or whose dependencies failed - done = 0 + while len(finished) + len(failed) < len(objects): + for event in feed_queue(objects, func, get_deps, results, started, finished, failed): + yield event - while done < len(objects): try: - yield output.get(timeout=1) - done += 1 + event = results.get(timeout=1) except Empty: continue # See https://github.com/docker/compose/issues/189 except thread.error: raise ShutdownException() + obj, _, exception = event + if exception is None: + log.debug('Finished processing: {}'.format(obj)) + finished.add(obj) + else: + log.debug('Failed: {}'.format(obj)) + failed.add(obj) + + yield event + def queue_producer(obj, func, results): try: @@ -96,46 +105,26 @@ def queue_producer(obj, func, results): results.put((obj, None, e)) -def queue_consumer(objects, func, get_deps, results, output): - started = set() # objects being processed - finished = set() # objects which have been processed - failed = set() # objects which either failed or whose dependencies failed +def feed_queue(objects, func, get_deps, results, started, finished, failed): + pending = set(objects) - started - finished - failed + log.debug('Pending: {}'.format(pending)) - while len(finished) + len(failed) < len(objects): - pending = set(objects) - started - finished - failed - log.debug('Pending: {}'.format(pending)) + for obj in pending: + deps = get_deps(obj) - for obj in pending: - deps = get_deps(obj) - - if any(dep in failed for dep in deps): - log.debug('{} has upstream errors - not processing'.format(obj)) - output.put((obj, None, UpstreamError())) - failed.add(obj) - elif all( - dep not in objects or dep in finished - for dep in deps - ): - log.debug('Starting producer thread for {}'.format(obj)) - t = Thread(target=queue_producer, args=(obj, func, results)) - t.daemon = True - t.start() - started.add(obj) - - try: - event = results.get(timeout=1) - except Empty: - continue - - obj, _, exception = event - if exception is None: - log.debug('Finished processing: {}'.format(obj)) - finished.add(obj) - else: - log.debug('Failed: {}'.format(obj)) + if any(dep in failed for dep in deps): + log.debug('{} has upstream errors - not processing'.format(obj)) + yield (obj, None, UpstreamError()) failed.add(obj) - - output.put(event) + elif all( + dep not in objects or dep in finished + for dep in deps + ): + log.debug('Starting producer thread for {}'.format(obj)) + t = Thread(target=queue_producer, args=(obj, func, results)) + t.daemon = True + t.start() + started.add(obj) class UpstreamError(Exception): From 5450a67c2d75192b962c3c36cf73a417af4386b3 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 19:06:07 +0100 Subject: [PATCH 7/8] Hold state in an object Signed-off-by: Aanand Prasad --- compose/parallel.py | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index b3ca0153..f400b223 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -64,18 +64,30 @@ def _no_deps(x): return [] +class State(object): + def __init__(self, objects): + self.objects = objects + + self.started = set() # objects being processed + self.finished = set() # objects which have been processed + self.failed = set() # objects which either failed or whose dependencies failed + + def is_done(self): + return len(self.finished) + len(self.failed) >= len(self.objects) + + def pending(self): + return set(self.objects) - self.started - self.finished - self.failed + + def parallel_execute_stream(objects, func, get_deps): if get_deps is None: get_deps = _no_deps results = Queue() + state = State(objects) - started = set() # objects being processed - finished = set() # objects which have been processed - failed = set() # objects which either failed or whose dependencies failed - - while len(finished) + len(failed) < len(objects): - for event in feed_queue(objects, func, get_deps, results, started, finished, failed): + while not state.is_done(): + for event in feed_queue(objects, func, get_deps, results, state): yield event try: @@ -89,10 +101,10 @@ def parallel_execute_stream(objects, func, get_deps): obj, _, exception = event if exception is None: log.debug('Finished processing: {}'.format(obj)) - finished.add(obj) + state.finished.add(obj) else: log.debug('Failed: {}'.format(obj)) - failed.add(obj) + state.failed.add(obj) yield event @@ -105,26 +117,26 @@ def queue_producer(obj, func, results): results.put((obj, None, e)) -def feed_queue(objects, func, get_deps, results, started, finished, failed): - pending = set(objects) - started - finished - failed +def feed_queue(objects, func, get_deps, results, state): + pending = state.pending() log.debug('Pending: {}'.format(pending)) for obj in pending: deps = get_deps(obj) - if any(dep in failed for dep in deps): + if any(dep in state.failed for dep in deps): log.debug('{} has upstream errors - not processing'.format(obj)) yield (obj, None, UpstreamError()) - failed.add(obj) + state.failed.add(obj) elif all( - dep not in objects or dep in finished + dep not in objects or dep in state.finished for dep in deps ): log.debug('Starting producer thread for {}'.format(obj)) t = Thread(target=queue_producer, args=(obj, func, results)) t.daemon = True t.start() - started.add(obj) + state.started.add(obj) class UpstreamError(Exception): From be27e266da9bbb252e74d71ab22044628c6839d2 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 19:07:40 +0100 Subject: [PATCH 8/8] Reduce queue timeout Signed-off-by: Aanand Prasad --- compose/parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compose/parallel.py b/compose/parallel.py index f400b223..e360ca35 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -91,7 +91,7 @@ def parallel_execute_stream(objects, func, get_deps): yield event try: - event = results.get(timeout=1) + event = results.get(timeout=0.1) except Empty: continue # See https://github.com/docker/compose/issues/189