From b7d136ce03957bccde98cae2abdafdc752ed89b8 Mon Sep 17 00:00:00 2001 From: Jakub Zaborowski Date: Wed, 17 Jun 2026 13:22:46 +0200 Subject: [PATCH] Fix docker log line splitting Fix https://github.com/eclipse-score/itf/issues/111 --- score/itf/plugins/docker.py | 55 ++++++++++++++--- test/unit/BUILD | 7 +++ test/unit/test_docker_line_assembler.py | 81 +++++++++++++++++++++++++ 3 files changed, 133 insertions(+), 10 deletions(-) create mode 100644 test/unit/test_docker_line_assembler.py diff --git a/score/itf/plugins/docker.py b/score/itf/plugins/docker.py index ca1cd8f..52abe67 100644 --- a/score/itf/plugins/docker.py +++ b/score/itf/plugins/docker.py @@ -133,6 +133,32 @@ def get_output(self) -> str: return "\n".join(self._output_lines) + ("\n" if self._output_lines else "") +class _LineAssembler: + """Buffer chunked stream data and emit only complete lines. + + Docker streams exec output in byte chunks that do not align with + newlines, so a single line can arrive split across two chunks. A + partial line is held until its terminating newline arrives; call + :meth:`flush` at end of stream to emit any unterminated remainder. + """ + + def __init__(self, emit): + self._emit = emit + self._pending = "" + + def feed(self, text: str) -> None: + """Buffer *text* and emit each line completed by a newline.""" + *lines, self._pending = (self._pending + text).split("\n") + for line in lines: + line = line.rstrip("\r") + if line: + self._emit(line) + + def flush(self) -> None: + """Emit any buffered remainder that was not newline-terminated.""" + self.feed("\n") + + class DockerTarget(Target): def __init__(self, container, network=None): super().__init__() @@ -181,21 +207,27 @@ def execute_async(self, binary_path, args=None, cwd="/", **kwargs) -> DockerAsyn cmd_logger = logging.getLogger(os.path.basename(command.split()[0])) output_lines = [] - def _process_text(text): - for line in text.strip().split("\n"): - if line: - cmd_logger.info(line) - output_lines.append(line) + def _emit(line): + cmd_logger.info(line) + output_lines.append(line) + + # Docker delivers stdout/stderr in chunks that do not align with line + # boundaries, so a single output line can arrive split across two + # chunks. Buffer each stream separately so a partial line is only + # emitted once its terminating newline has been received, instead of + # being logged as two separate records. + stdout_assembler = _LineAssembler(_emit) + stderr_assembler = _LineAssembler(_emit) pid = None for stdout_chunk, stderr_chunk in stream: if stderr_chunk: - _process_text(stderr_chunk.decode()) + stderr_assembler.feed(stderr_chunk.decode()) if stdout_chunk: pid_line, _, remainder = stdout_chunk.decode().partition("\n") pid = int(pid_line.strip()) - if remainder.strip(): - _process_text(remainder) + if remainder: + stdout_assembler.feed(remainder) break if pid is None: @@ -204,9 +236,12 @@ def _process_text(text): def _async_log(log_stream): for stdout_chunk, stderr_chunk in log_stream: if stdout_chunk: - _process_text(stdout_chunk.decode()) + stdout_assembler.feed(stdout_chunk.decode()) if stderr_chunk: - _process_text(stderr_chunk.decode()) + stderr_assembler.feed(stderr_chunk.decode()) + # Emit any trailing output that was not newline-terminated. + stdout_assembler.flush() + stderr_assembler.flush() output_thread = threading.Thread(target=_async_log, args=(stream,), daemon=True) output_thread.start() diff --git a/test/unit/BUILD b/test/unit/BUILD index 68c32da..10d537e 100644 --- a/test/unit/BUILD +++ b/test/unit/BUILD @@ -23,9 +23,16 @@ py_itf_unittest( deps = ["//score/itf/plugins/qemu:config"], ) +py_itf_unittest( + name = "test_docker_line_assembler", + srcs = ["test_docker_line_assembler.py"], + deps = ["//score/itf/plugins:docker"], +) + test_suite( name = "unit", tests = [ + ":test_docker_line_assembler", ":test_ping", ":test_qemu_config_schema", ], diff --git a/test/unit/test_docker_line_assembler.py b/test/unit/test_docker_line_assembler.py new file mode 100644 index 0000000..442a696 --- /dev/null +++ b/test/unit/test_docker_line_assembler.py @@ -0,0 +1,81 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +from score.itf.plugins.docker import _LineAssembler + + +def _collect(): + emitted = [] + return emitted, _LineAssembler(emitted.append) + + +def test_complete_lines_in_single_chunk_are_emitted_individually(): + emitted, assembler = _collect() + assembler.feed("line1\nline2\nline3\n") + assert emitted == ["line1", "line2", "line3"] + + +def test_line_split_across_chunks_is_emitted_once(): + """Regression: a single line arriving in two chunks must not be split. + + Mirrors the observed DLT output where a chunk boundary fell mid-line + and produced two separate log records. + """ + emitted, assembler = _collect() + assembler.feed("LM log fatal verbose 3") + assert emitted == [] # no newline yet, nothing emitted + assembler.feed(" clock() at failed initial state transition: 38.712000 ms\n") + assert emitted == ["LM log fatal verbose 3 clock() at failed initial state transition: 38.712000 ms"] + + +def test_partial_line_is_held_until_newline(): + emitted, assembler = _collect() + assembler.feed("partial") + assert emitted == [] + assembler.feed(" still partial") + assert emitted == [] + assembler.feed("\n") + assert emitted == ["partial still partial"] + + +def test_flush_emits_trailing_unterminated_line(): + emitted, assembler = _collect() + assembler.feed("no trailing newline") + assert emitted == [] + assembler.flush() + assert emitted == ["no trailing newline"] + + +def test_flush_without_buffered_data_emits_nothing(): + emitted, assembler = _collect() + assembler.feed("done\n") + assembler.flush() + assert emitted == ["done"] + + +def test_blank_lines_are_skipped(): + emitted, assembler = _collect() + assembler.feed("a\n\n\nb\n") + assert emitted == ["a", "b"] + + +def test_carriage_returns_are_stripped(): + emitted, assembler = _collect() + assembler.feed("windows\r\nunix\n") + assert emitted == ["windows", "unix"] + + +def test_byte_at_a_time_reassembles_full_line(): + emitted, assembler = _collect() + for ch in "hello world\n": + assembler.feed(ch) + assert emitted == ["hello world"]