diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index 29a6159d..fc36a6bc 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import unicode_literals import sys +from collections import namedtuple from itertools import cycle from threading import Thread @@ -15,9 +16,6 @@ from compose.cli.signals import ShutdownException from compose.utils import split_buffer -STOP = object() - - class LogPresenter(object): def __init__(self, prefix_width, color_func): @@ -79,51 +77,74 @@ class LogPrinter(object): queue = Queue() thread_args = queue, self.log_args thread_map = build_thread_map(self.containers, self.presenters, thread_args) - start_producer_thread( + start_producer_thread(( thread_map, self.event_stream, self.presenters, - thread_args) + thread_args)) for line in consume_queue(queue, self.cascade_stop): + remove_stopped_threads(thread_map) + + if not line: + if not thread_map: + return + continue + self.output.write(line) self.output.flush() - # TODO: this needs more logic - # TODO: does consume_queue need to yield Nones to get to this point? - if not thread_map: - return + +def remove_stopped_threads(thread_map): + for container_id, tailer_thread in list(thread_map.items()): + if not tailer_thread.is_alive(): + thread_map.pop(container_id, None) + + +def build_thread(container, presenter, queue, log_args): + tailer = Thread( + target=tail_container_logs, + args=(container, presenter, queue, log_args)) + tailer.daemon = True + tailer.start() + return tailer def build_thread_map(initial_containers, presenters, thread_args): - def build_thread(container): - tailer = Thread( - target=tail_container_logs, - args=(container, presenters.next()) + thread_args) - tailer.daemon = True - tailer.start() - return tailer - return { - container.id: build_thread(container) + container.id: build_thread(container, presenters.next(), *thread_args) for container in initial_containers } +class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')): + + @classmethod + def new(cls, item): + return cls(item, None, None) + + @classmethod + def exception(cls, exc): + return cls(None, None, exc) + + @classmethod + def stop(cls): + return cls(None, True, None) + + def tail_container_logs(container, presenter, queue, log_args): generator = get_log_generator(container) try: for item in generator(container, log_args): - queue.put((item, None)) - - if log_args.get('follow'): - yield presenter.color_func(wait_on_exit(container)) - - queue.put((STOP, None)) - + queue.put(QueueItem.new(presenter.present(container, item))) except Exception as e: - queue.put((None, e)) + queue.put(QueueItem.exception(e)) + return + + if log_args.get('follow'): + queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container)))) + queue.put(QueueItem.stop()) def get_log_generator(container): @@ -156,37 +177,48 @@ def wait_on_exit(container): return "%s exited with code %s\n" % (container.name, exit_code) -def start_producer_thread(thread_map, event_stream, presenters, thread_args): - queue, log_args = thread_args - - def watch_events(): - for event in event_stream: - # TODO: handle start and stop events - pass - - producer = Thread(target=watch_events) +def start_producer_thread(thread_args): + producer = Thread(target=watch_events, args=thread_args) producer.daemon = True producer.start() +def watch_events(thread_map, event_stream, presenters, thread_args): + for event in event_stream: + if event['action'] != 'start': + continue + + if event['id'] in thread_map: + if thread_map[event['id']].is_alive(): + continue + # Container was stopped and started, we need a new thread + thread_map.pop(event['id'], None) + + thread_map[event['id']] = build_thread( + event['container'], + presenters.next(), + *thread_args) + + def consume_queue(queue, cascade_stop): """Consume the queue by reading lines off of it and yielding them.""" while True: try: - item, exception = queue.get(timeout=0.1) + item = queue.get(timeout=0.1) except Empty: - pass + yield None + continue # See https://github.com/docker/compose/issues/189 except thread.error: raise ShutdownException() - if exception: - raise exception + if item.exc: + raise item.exc - if item is STOP: + if item.is_stop: if cascade_stop: raise StopIteration else: continue - yield item + yield item.item diff --git a/compose/cli/main.py b/compose/cli/main.py index 66362168..da622bc1 100644 --- a/compose/cli/main.py +++ b/compose/cli/main.py @@ -35,6 +35,7 @@ from .docopt_command import NoSuchCommand from .errors import UserError from .formatter import ConsoleWarningFormatter from .formatter import Formatter +from .log_printer import build_log_presenters from .log_printer import LogPrinter from .utils import get_version_info from .utils import yesno @@ -277,6 +278,7 @@ class TopLevelCommand(object): def json_format_event(event): event['time'] = event['time'].isoformat() + event.pop('container') return json.dumps(event) for event in self.project.events(): @@ -374,7 +376,6 @@ class TopLevelCommand(object): """ containers = self.project.containers(service_names=options['SERVICE'], stopped=True) - monochrome = options['--no-color'] tail = options['--tail'] if tail is not None: if tail.isdigit(): @@ -387,7 +388,11 @@ class TopLevelCommand(object): 'timestamps': options['--timestamps'] } print("Attaching to", list_containers(containers)) - LogPrinter(containers, monochrome=monochrome, log_args=log_args).run() + log_printer_from_project( + project, + containers, + options['--no-color'], + log_args).run() def pause(self, options): """ @@ -693,7 +698,6 @@ class TopLevelCommand(object): when attached or when containers are already running. (default: 10) """ - monochrome = options['--no-color'] start_deps = not options['--no-deps'] cascade_stop = options['--abort-on-container-exit'] service_names = options['SERVICE'] @@ -704,7 +708,10 @@ class TopLevelCommand(object): raise UserError("--abort-on-container-exit and -d cannot be combined.") with up_shutdown_context(self.project, service_names, timeout, detached): - to_attach = self.project.up( + # start the event stream first so we don't lose any events + event_stream = project.events() + + to_attach = project.up( service_names=service_names, start_deps=start_deps, strategy=convergence_strategy_from_opts(options), @@ -714,8 +721,14 @@ class TopLevelCommand(object): if detached: return - log_args = {'follow': True} - log_printer = build_log_printer(to_attach, service_names, monochrome, cascade_stop, log_args) + + log_printer = log_printer_from_project( + project, + filter_containers_to_service_names(to_attach, service_names), + options['--no-color'], + {'follow': True}, + cascade_stop, + event_stream=event_stream) print("Attaching to", list_containers(log_printer.containers)) log_printer.run() @@ -827,13 +840,30 @@ def run_one_off_container(container_options, project, service, options): sys.exit(exit_code) -def build_log_printer(containers, service_names, monochrome, cascade_stop, log_args): - if service_names: - containers = [ - container - for container in containers if container.service in service_names - ] - return LogPrinter(containers, monochrome=monochrome, cascade_stop=cascade_stop, log_args=log_args) +def log_printer_from_project( + project, + containers, + monochrome, + log_args, + cascade_stop=False, + event_stream=None, +): + return LogPrinter( + containers, + build_log_presenters(project.service_names, monochrome), + event_stream or project.events(), + cascade_stop=cascade_stop, + log_args=log_args) + + +def filter_containers_to_service_names(containers, service_names): + if not service_names: + return containers + + return [ + container + for container in containers if container.service in service_names + ] @contextlib.contextmanager diff --git a/compose/project.py b/compose/project.py index 1169f7db..b40a9c38 100644 --- a/compose/project.py +++ b/compose/project.py @@ -324,6 +324,7 @@ class Project(object): continue # TODO: get labels from the API v1.22 , see github issue 2618 + # TODO: this can fail if the conatiner is removed, wrap in try/except container = Container.from_id(self.client, event['id']) if container.service not in service_names: continue diff --git a/tests/acceptance/cli_test.py b/tests/acceptance/cli_test.py index 825b97be..c2116553 100644 --- a/tests/acceptance/cli_test.py +++ b/tests/acceptance/cli_test.py @@ -1188,7 +1188,7 @@ class CLITestCase(DockerClientTestCase): def test_logs_follow(self): self.base_dir = 'tests/fixtures/echo-services' - self.dispatch(['up', '-d'], None) + self.dispatch(['up', '-d']) result = self.dispatch(['logs', '-f']) @@ -1197,29 +1197,43 @@ class CLITestCase(DockerClientTestCase): assert 'another' in result.stdout assert 'exited with code 0' in result.stdout - def test_logs_unfollow(self): + def test_logs_follow_logs_from_new_containers(self): self.base_dir = 'tests/fixtures/logs-composefile' - self.dispatch(['up', '-d'], None) + self.dispatch(['up', '-d', 'simple']) + + proc = start_process(self.base_dir, ['logs', '-f']) + + self.dispatch(['up', '-d', 'another']) + wait_on_condition(ContainerStateCondition( + self.project.client, + 'logscomposefile_another_1', + running=False)) + + os.kill(proc.pid, signal.SIGINT) + result = wait_on_process(proc, returncode=1) + assert 'test' in result.stdout + + def test_logs_default(self): + self.base_dir = 'tests/fixtures/logs-composefile' + self.dispatch(['up', '-d']) result = self.dispatch(['logs']) - - assert result.stdout.count('\n') >= 1 - assert 'exited with code 0' not in result.stdout + assert 'hello' in result.stdout + assert 'test' in result.stdout + assert 'exited with' not in result.stdout def test_logs_timestamps(self): self.base_dir = 'tests/fixtures/echo-services' - self.dispatch(['up', '-d'], None) - - result = self.dispatch(['logs', '-f', '-t'], None) + self.dispatch(['up', '-d']) + result = self.dispatch(['logs', '-f', '-t']) self.assertRegexpMatches(result.stdout, '(\d{4})-(\d{2})-(\d{2})T(\d{2})\:(\d{2})\:(\d{2})') def test_logs_tail(self): self.base_dir = 'tests/fixtures/logs-tail-composefile' - self.dispatch(['up'], None) - - result = self.dispatch(['logs', '--tail', '2'], None) + self.dispatch(['up']) + result = self.dispatch(['logs', '--tail', '2']) assert result.stdout.count('\n') == 3 def test_kill(self): diff --git a/tests/unit/cli/log_printer_test.py b/tests/unit/cli/log_printer_test.py index 81c69412..7be1d303 100644 --- a/tests/unit/cli/log_printer_test.py +++ b/tests/unit/cli/log_printer_test.py @@ -5,9 +5,11 @@ import pytest import six from six.moves.queue import Queue +from compose.cli.log_printer import build_log_generator +from compose.cli.log_printer import build_log_presenters +from compose.cli.log_printer import build_no_log_generator from compose.cli.log_printer import consume_queue -from compose.cli.log_printer import LogPrinter -from compose.cli.log_printer import STOP +from compose.cli.log_printer import QueueItem from compose.cli.log_printer import wait_on_exit from compose.container import Container from tests import mock @@ -34,72 +36,73 @@ def output_stream(): @pytest.fixture def mock_container(): - def reader(*args, **kwargs): - yield b"hello\nworld" - return build_mock_container(reader) + return mock.Mock(spec=Container, name_without_project='web_1') -@pytest.mark.skipif(True, reason="wip") -class TestLogPrinter(object): +class TestLogPresenter(object): - def test_single_container(self, output_stream, mock_container): - LogPrinter([mock_container], output=output_stream, log_args={'follow': True}).run() + def test_monochrome(self, mock_container): + presenters = build_log_presenters(['foo', 'bar'], True) + presenter = presenters.next() + actual = presenter.present(mock_container, "this line") + assert actual == "web_1 | this line" - output = output_stream.getvalue() - assert 'hello' in output - assert 'world' in output - # Call count is 2 lines + "container exited line" - assert output_stream.flush.call_count == 3 + def test_polychrome(self, mock_container): + presenters = build_log_presenters(['foo', 'bar'], False) + presenter = presenters.next() + actual = presenter.present(mock_container, "this line") + assert '\033[' in actual - def test_single_container_without_stream(self, output_stream, mock_container): - LogPrinter([mock_container], output=output_stream).run() - output = output_stream.getvalue() - assert 'hello' in output - assert 'world' in output - # Call count is 2 lines - assert output_stream.flush.call_count == 2 +def test_wait_on_exit(): + exit_status = 3 + mock_container = mock.Mock( + spec=Container, + name='cname', + wait=mock.Mock(return_value=exit_status)) - def test_monochrome(self, output_stream, mock_container): - LogPrinter([mock_container], output=output_stream, monochrome=True).run() - assert '\033[' not in output_stream.getvalue() + expected = '{} exited with code {}\n'.format(mock_container.name, exit_status) + assert expected == wait_on_exit(mock_container) - def test_polychrome(self, output_stream, mock_container): - LogPrinter([mock_container], output=output_stream).run() - assert '\033[' in output_stream.getvalue() + +def test_build_no_log_generator(mock_container): + mock_container.has_api_logs = False + mock_container.log_driver = 'none' + output, = build_no_log_generator(mock_container, None) + assert "WARNING: no logs are available with the 'none' log driver\n" in output + assert "exited with code" not in output + + +class TestBuildLogGenerator(object): + + def test_no_log_stream(self, mock_container): + mock_container.log_stream = None + mock_container.logs.return_value = iter([b"hello\nworld"]) + log_args = {'follow': True} + + generator = build_log_generator(mock_container, log_args) + assert generator.next() == "hello\n" + assert generator.next() == "world" + mock_container.logs.assert_called_once_with( + stdout=True, + stderr=True, + stream=True, + **log_args) + + def test_with_log_stream(self, mock_container): + mock_container.log_stream = iter([b"hello\nworld"]) + log_args = {'follow': True} + + generator = build_log_generator(mock_container, log_args) + assert generator.next() == "hello\n" + assert generator.next() == "world" def test_unicode(self, output_stream): - glyph = u'\u2022' + glyph = u'\u2022\n' + mock_container.log_stream = iter([glyph.encode('utf-8')]) - def reader(*args, **kwargs): - yield glyph.encode('utf-8') + b'\n' - - container = build_mock_container(reader) - LogPrinter([container], output=output_stream).run() - output = output_stream.getvalue() - if six.PY2: - output = output.decode('utf-8') - - assert glyph in output - - def test_wait_on_exit(self): - exit_status = 3 - mock_container = mock.Mock( - spec=Container, - name='cname', - wait=mock.Mock(return_value=exit_status)) - - expected = '{} exited with code {}\n'.format(mock_container.name, exit_status) - assert expected == wait_on_exit(mock_container) - - def test_generator_with_no_logs(self, mock_container, output_stream): - mock_container.has_api_logs = False - mock_container.log_driver = 'none' - LogPrinter([mock_container], output=output_stream).run() - - output = output_stream.getvalue() - assert "WARNING: no logs are available with the 'none' log driver\n" in output - assert "exited with code" not in output + generator = build_log_generator(mock_container, {}) + assert generator.next() == glyph class TestConsumeQueue(object): @@ -111,7 +114,7 @@ class TestConsumeQueue(object): queue = Queue() error = Problem('oops') - for item in ('a', None), ('b', None), (None, error): + for item in QueueItem.new('a'), QueueItem.new('b'), QueueItem.exception(error): queue.put(item) generator = consume_queue(queue, False) @@ -122,7 +125,7 @@ class TestConsumeQueue(object): def test_item_is_stop_without_cascade_stop(self): queue = Queue() - for item in (STOP, None), ('a', None), ('b', None): + for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'): queue.put(item) generator = consume_queue(queue, False) @@ -131,7 +134,12 @@ class TestConsumeQueue(object): def test_item_is_stop_with_cascade_stop(self): queue = Queue() - for item in (STOP, None), ('a', None), ('b', None): + for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'): queue.put(item) assert list(consume_queue(queue, True)) == [] + + def test_item_is_none_when_timeout_is_hit(self): + queue = Queue() + generator = consume_queue(queue, False) + assert generator.next() is None diff --git a/tests/unit/cli/main_test.py b/tests/unit/cli/main_test.py index 9b24776f..dc527880 100644 --- a/tests/unit/cli/main_test.py +++ b/tests/unit/cli/main_test.py @@ -8,8 +8,8 @@ import pytest from compose import container from compose.cli.errors import UserError from compose.cli.formatter import ConsoleWarningFormatter -from compose.cli.main import build_log_printer from compose.cli.main import convergence_strategy_from_opts +from compose.cli.main import filter_containers_to_service_names from compose.cli.main import setup_console_handler from compose.service import ConvergenceStrategy from tests import mock @@ -32,7 +32,7 @@ def logging_handler(): class TestCLIMainTestCase(object): - def test_build_log_printer(self): + def test_filter_containers_to_service_names(self): containers = [ mock_container('web', 1), mock_container('web', 2), @@ -41,18 +41,18 @@ class TestCLIMainTestCase(object): mock_container('another', 1), ] service_names = ['web', 'db'] - log_printer = build_log_printer(containers, service_names, True, False, {'follow': True}) - assert log_printer.containers == containers[:3] + actual = filter_containers_to_service_names(containers, service_names) + assert actual == containers[:3] - def test_build_log_printer_all_services(self): + def test_filter_containers_to_service_names_all(self): containers = [ mock_container('web', 1), mock_container('db', 1), mock_container('other', 1), ] service_names = [] - log_printer = build_log_printer(containers, service_names, True, False, {'follow': True}) - assert log_printer.containers == containers + actual = filter_containers_to_service_names(containers, service_names) + assert actual == containers class TestSetupConsoleHandlerTestCase(object):