From 03c3d4c768198bea7eedcde79c01177441e8a0c1 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Wed, 22 Jul 2015 15:09:24 +0100 Subject: [PATCH 1/4] generator -> iterator Signed-off-by: Aanand Prasad --- compose/cli/multiplexer.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/compose/cli/multiplexer.py b/compose/cli/multiplexer.py index 849dbd66..02e39aa1 100644 --- a/compose/cli/multiplexer.py +++ b/compose/cli/multiplexer.py @@ -13,8 +13,13 @@ STOP = object() class Multiplexer(object): - def __init__(self, generators): - self.generators = generators + """ + Create a single iterator from several iterators by running all of them in + parallel and yielding results as they come in. + """ + + def __init__(self, iterators): + self.iterators = iterators self.queue = Queue() def loop(self): @@ -31,12 +36,12 @@ class Multiplexer(object): pass def _init_readers(self): - for generator in self.generators: - t = Thread(target=_enqueue_output, args=(generator, self.queue)) + for iterator in self.iterators: + t = Thread(target=_enqueue_output, args=(iterator, self.queue)) t.daemon = True t.start() -def _enqueue_output(generator, queue): - for item in generator: +def _enqueue_output(iterator, queue): + for item in iterator: queue.put(item) From 27378704df946bd4f3bd994f750916c04e0dc139 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Wed, 22 Jul 2015 14:09:12 +0100 Subject: [PATCH 2/4] Isolate STOP logic in multiplexer module Signed-off-by: Aanand Prasad --- compose/cli/log_printer.py | 3 +-- compose/cli/multiplexer.py | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index ce7e1065..9c5d35e1 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -4,7 +4,7 @@ import sys from itertools import cycle -from .multiplexer import Multiplexer, STOP +from .multiplexer import Multiplexer from . import colors from .utils import split_buffer @@ -61,7 +61,6 @@ class LogPrinter(object): exit_code = container.wait() yield color_fn("%s exited with code %s\n" % (container.name, exit_code)) - yield STOP def _generate_prefix(self, container): """ diff --git a/compose/cli/multiplexer.py b/compose/cli/multiplexer.py index 02e39aa1..ab7482e1 100644 --- a/compose/cli/multiplexer.py +++ b/compose/cli/multiplexer.py @@ -45,3 +45,5 @@ class Multiplexer(object): def _enqueue_output(iterator, queue): for item in iterator: queue.put(item) + + queue.put(STOP) From a9942b512a4fc6b04c334c821804a955a6c45ec0 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Wed, 22 Jul 2015 14:08:46 +0100 Subject: [PATCH 3/4] Wait for all containers to exit when running 'up' interactively Signed-off-by: Aanand Prasad --- compose/cli/multiplexer.py | 7 +++---- tests/unit/multiplexer_test.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) create mode 100644 tests/unit/multiplexer_test.py diff --git a/compose/cli/multiplexer.py b/compose/cli/multiplexer.py index ab7482e1..34b55133 100644 --- a/compose/cli/multiplexer.py +++ b/compose/cli/multiplexer.py @@ -7,8 +7,6 @@ except ImportError: from queue import Queue, Empty # Python 3.x -# Yield STOP from an input generator to stop the -# top-level loop without processing any more input. STOP = object() @@ -20,16 +18,17 @@ class Multiplexer(object): def __init__(self, iterators): self.iterators = iterators + self._num_running = len(iterators) self.queue = Queue() def loop(self): self._init_readers() - while True: + while self._num_running > 0: try: item = self.queue.get(timeout=0.1) if item is STOP: - break + self._num_running -= 1 else: yield item except Empty: diff --git a/tests/unit/multiplexer_test.py b/tests/unit/multiplexer_test.py new file mode 100644 index 00000000..100b8f0c --- /dev/null +++ b/tests/unit/multiplexer_test.py @@ -0,0 +1,28 @@ +import unittest + +from compose.cli.multiplexer import Multiplexer + + +class MultiplexerTest(unittest.TestCase): + def test_no_iterators(self): + mux = Multiplexer([]) + self.assertEqual([], list(mux.loop())) + + def test_empty_iterators(self): + mux = Multiplexer([ + (x for x in []), + (x for x in []), + ]) + + self.assertEqual([], list(mux.loop())) + + def test_aggregates_output(self): + mux = Multiplexer([ + (x for x in [0, 2, 4]), + (x for x in [1, 3, 5]), + ]) + + self.assertEqual( + [0, 1, 2, 3, 4, 5], + sorted(list(mux.loop())), + ) From 80d90a745ad9816817a14f4aa35c3d9a1a2136b4 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Wed, 22 Jul 2015 14:47:59 +0100 Subject: [PATCH 4/4] Make sure an exception in any iterator gets raised in the main thread Signed-off-by: Aanand Prasad Conflicts: compose/cli/multiplexer.py --- compose/cli/multiplexer.py | 16 +++++++++++----- tests/unit/multiplexer_test.py | 17 +++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/compose/cli/multiplexer.py b/compose/cli/multiplexer.py index 34b55133..955af632 100644 --- a/compose/cli/multiplexer.py +++ b/compose/cli/multiplexer.py @@ -26,7 +26,11 @@ class Multiplexer(object): while self._num_running > 0: try: - item = self.queue.get(timeout=0.1) + item, exception = self.queue.get(timeout=0.1) + + if exception: + raise exception + if item is STOP: self._num_running -= 1 else: @@ -42,7 +46,9 @@ class Multiplexer(object): def _enqueue_output(iterator, queue): - for item in iterator: - queue.put(item) - - queue.put(STOP) + try: + for item in iterator: + queue.put((item, None)) + queue.put((STOP, None)) + except Exception as e: + queue.put((None, e)) diff --git a/tests/unit/multiplexer_test.py b/tests/unit/multiplexer_test.py index 100b8f0c..d565d39d 100644 --- a/tests/unit/multiplexer_test.py +++ b/tests/unit/multiplexer_test.py @@ -26,3 +26,20 @@ class MultiplexerTest(unittest.TestCase): [0, 1, 2, 3, 4, 5], sorted(list(mux.loop())), ) + + def test_exception(self): + class Problem(Exception): + pass + + def problematic_iterator(): + yield 0 + yield 2 + raise Problem(":(") + + mux = Multiplexer([ + problematic_iterator(), + (x for x in [1, 3, 5]), + ]) + + with self.assertRaises(Problem): + list(mux.loop())