Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions score/itf/plugins/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,32 @@ def get_output(self) -> str:
return "\n".join(self._output_lines) + ("\n" if self._output_lines else "")


class _LineAssembler:
"""Buffer chunked stream data and emit only complete lines.

Docker streams exec output in byte chunks that do not align with
newlines, so a single line can arrive split across two chunks. A
partial line is held until its terminating newline arrives; call
:meth:`flush` at end of stream to emit any unterminated remainder.
"""

def __init__(self, emit):
self._emit = emit
self._pending = ""

def feed(self, text: str) -> None:
"""Buffer *text* and emit each line completed by a newline."""
*lines, self._pending = (self._pending + text).split("\n")
for line in lines:
line = line.rstrip("\r")
if line:
self._emit(line)

def flush(self) -> None:
"""Emit any buffered remainder that was not newline-terminated."""
self.feed("\n")


class DockerTarget(Target):
def __init__(self, container, network=None):
super().__init__()
Expand Down Expand Up @@ -181,21 +207,27 @@ def execute_async(self, binary_path, args=None, cwd="/", **kwargs) -> DockerAsyn
cmd_logger = logging.getLogger(os.path.basename(command.split()[0]))
output_lines = []

def _process_text(text):
for line in text.strip().split("\n"):
if line:
cmd_logger.info(line)
output_lines.append(line)
def _emit(line):
cmd_logger.info(line)
output_lines.append(line)

# Docker delivers stdout/stderr in chunks that do not align with line
# boundaries, so a single output line can arrive split across two
# chunks. Buffer each stream separately so a partial line is only
# emitted once its terminating newline has been received, instead of
# being logged as two separate records.
stdout_assembler = _LineAssembler(_emit)
stderr_assembler = _LineAssembler(_emit)

pid = None
for stdout_chunk, stderr_chunk in stream:
if stderr_chunk:
_process_text(stderr_chunk.decode())
stderr_assembler.feed(stderr_chunk.decode())
if stdout_chunk:
pid_line, _, remainder = stdout_chunk.decode().partition("\n")
pid = int(pid_line.strip())
if remainder.strip():
_process_text(remainder)
if remainder:
stdout_assembler.feed(remainder)
break

if pid is None:
Expand All @@ -204,9 +236,12 @@ def _process_text(text):
def _async_log(log_stream):
for stdout_chunk, stderr_chunk in log_stream:
if stdout_chunk:
_process_text(stdout_chunk.decode())
stdout_assembler.feed(stdout_chunk.decode())
if stderr_chunk:
_process_text(stderr_chunk.decode())
stderr_assembler.feed(stderr_chunk.decode())
# Emit any trailing output that was not newline-terminated.
stdout_assembler.flush()
stderr_assembler.flush()

output_thread = threading.Thread(target=_async_log, args=(stream,), daemon=True)
output_thread.start()
Expand Down
7 changes: 7 additions & 0 deletions test/unit/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ py_itf_unittest(
deps = ["//score/itf/plugins/qemu:config"],
)

py_itf_unittest(
name = "test_docker_line_assembler",
srcs = ["test_docker_line_assembler.py"],
deps = ["//score/itf/plugins:docker"],
)

test_suite(
name = "unit",
tests = [
":test_docker_line_assembler",
":test_ping",
":test_qemu_config_schema",
],
Expand Down
81 changes: 81 additions & 0 deletions test/unit/test_docker_line_assembler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# *******************************************************************************
# Copyright (c) 2026 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache License Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0
#
# SPDX-License-Identifier: Apache-2.0
# *******************************************************************************
from score.itf.plugins.docker import _LineAssembler


def _collect():
emitted = []
return emitted, _LineAssembler(emitted.append)


def test_complete_lines_in_single_chunk_are_emitted_individually():
emitted, assembler = _collect()
assembler.feed("line1\nline2\nline3\n")
assert emitted == ["line1", "line2", "line3"]


def test_line_split_across_chunks_is_emitted_once():
"""Regression: a single line arriving in two chunks must not be split.

Mirrors the observed DLT output where a chunk boundary fell mid-line
and produced two separate log records.
"""
emitted, assembler = _collect()
assembler.feed("LM log fatal verbose 3")
assert emitted == [] # no newline yet, nothing emitted
assembler.feed(" clock() at failed initial state transition: 38.712000 ms\n")
assert emitted == ["LM log fatal verbose 3 clock() at failed initial state transition: 38.712000 ms"]


def test_partial_line_is_held_until_newline():
emitted, assembler = _collect()
assembler.feed("partial")
assert emitted == []
assembler.feed(" still partial")
assert emitted == []
assembler.feed("\n")
assert emitted == ["partial still partial"]


def test_flush_emits_trailing_unterminated_line():
emitted, assembler = _collect()
assembler.feed("no trailing newline")
assert emitted == []
assembler.flush()
assert emitted == ["no trailing newline"]


def test_flush_without_buffered_data_emits_nothing():
emitted, assembler = _collect()
assembler.feed("done\n")
assembler.flush()
assert emitted == ["done"]


def test_blank_lines_are_skipped():
emitted, assembler = _collect()
assembler.feed("a\n\n\nb\n")
assert emitted == ["a", "b"]


def test_carriage_returns_are_stripped():
emitted, assembler = _collect()
assembler.feed("windows\r\nunix\n")
assert emitted == ["windows", "unix"]


def test_byte_at_a_time_reassembles_full_line():
emitted, assembler = _collect()
for ch in "hello world\n":
assembler.feed(ch)
assert emitted == ["hello world"]
Loading