diff --git a/desktop-sidecar/README.md b/desktop-sidecar/README.md index aeb420b..3f91369 100644 --- a/desktop-sidecar/README.md +++ b/desktop-sidecar/README.md @@ -2,11 +2,15 @@ Minimal Compose Multiplatform desktop app for watching the local `agent-task-queue` database in real time. -The sidecar reads the existing SQLite queue DB directly and shows: +The sidecar reads the existing SQLite queue DB directly and also inspects live task queue +server args plus `adb devices -l` when available. It shows: - running tasks - waiting tasks - exact queues grouped by root scope so hierarchical queue activity is easier to understand +- configured `--queue-capacity` scopes, including empty queues and slot counts +- agent/context identity for live servers and tasks when queue metadata is available, with process inspection as a fallback +- connected ADB devices so emulator queue plans can be compared with reality It is intentionally read-only. There is no new MCP protocol or server surface. @@ -33,4 +37,4 @@ Use a specific queue directory with: ## Notes - `./gradlew` in this directory delegates to the checked-in Gradle wrapper under `../intellij-plugin/` so the sidecar stays lightweight. -- Queue capacities configured with `--queue-capacity` are process-local and are not persisted in `queue.db`, so the app visualizes live tasks and queue layout rather than stored capacity numbers. +- Queue capacities configured with `--queue-capacity` are still process-local and are not persisted in `queue.db`; the app surfaces them by reading the args of live task queue server processes for the selected data dir. diff --git a/desktop-sidecar/build.gradle.kts b/desktop-sidecar/build.gradle.kts index 6a52402..381fa3b 100644 --- a/desktop-sidecar/build.gradle.kts +++ b/desktop-sidecar/build.gradle.kts @@ -20,6 +20,7 @@ kotlin { implementation(compose.foundation) implementation(compose.material3) implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2") + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.3") implementation("org.xerial:sqlite-jdbc:3.53.0.0") } } diff --git a/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/EnvironmentSnapshot.kt b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/EnvironmentSnapshot.kt new file mode 100644 index 0000000..ee13925 --- /dev/null +++ b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/EnvironmentSnapshot.kt @@ -0,0 +1,571 @@ +package com.block.agenttaskqueue.sidecar + +import java.nio.file.Path +import java.nio.file.Paths +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +private val DEFAULT_QUEUE_DATA_DIR: Path = Paths.get("/tmp/agent-task-queue").toAbsolutePath().normalize() +private const val COMMAND_TIMEOUT_SECONDS = 5L +private val EMULATOR_SERIAL_PATTERN = Regex("^(?:emu|emulator)-(\\d+)$", RegexOption.IGNORE_CASE) +private val LOCALHOST_EMULATOR_PATTERN = Regex("^(?:127\\.0\\.0\\.1|localhost):(\\d+)$", RegexOption.IGNORE_CASE) +private val TASK_QUEUE_ENTRYPOINT_NAMES = setOf("agent-task-queue", "task_queue", "task_queue.py") + +data class QueueConfigurationSnapshot( + val serverProcesses: List, + val configuredScopes: List, + val statusMessage: String? = null, + val errorMessage: String? = null, +) { + val serverCount: Int = serverProcesses.size + val totalSlots: Int = configuredScopes.sumOf { it.capacity ?: 0 } + val configuredEmulatorScopeCount: Int = configuredScopes.count { it.isEmulatorLike } + + companion object { + val EMPTY = QueueConfigurationSnapshot( + serverProcesses = emptyList(), + configuredScopes = emptyList(), + ) + } +} + +data class QueueServerProcess( + val pid: Int, + val parentPid: Int? = null, + val commandLine: String, + val dataDir: Path, + val queueCapacities: Map, + val agentLabel: String = "Task Queue", + val contextLabel: String? = null, +) + +data class ConfiguredQueueScope( + val scopeName: String, + val capacities: Set, + val sourcePids: List, +) { + val capacity: Int? = capacities.singleOrNull() + val hasConflict: Boolean = capacities.size > 1 + val rootScope: String = scopeName.substringBefore('/') + val leafName: String = scopeName.substringAfterLast('/') + val emulatorPort: String? = extractEmulatorPort(leafName) + val isEmulatorLike: Boolean = emulatorPort != null + + val displayCapacityLabel: String = when { + capacity != null -> capacity.toString() + else -> capacities.sorted().joinToString(" / ") + } +} + +data class AdbSnapshot( + val devices: List, + val statusMessage: String? = null, + val errorMessage: String? = null, +) { + val connectedDevices: Int = devices.count { it.isConnected } + val connectedEmulators: Int = devices.count { it.isConnected && it.isEmulator } + + companion object { + val EMPTY = AdbSnapshot(devices = emptyList()) + } +} + +data class AdbDevice( + val serial: String, + val state: String, + val details: Map, +) { + val isConnected: Boolean = state.equals("device", ignoreCase = true) + val emulatorPort: String? = extractEmulatorPort(serial) + val isEmulator: Boolean = emulatorPort != null || + details["device"]?.contains("emu", ignoreCase = true) == true || + details["model"]?.contains("sdk", ignoreCase = true) == true + + val detailLine: String = buildList { + if (!isConnected) add(state) + details["model"]?.takeIf { it.isNotBlank() }?.let(::add) + details["device"]?.takeIf { it.isNotBlank() }?.let(::add) + details["transport_id"]?.takeIf { it.isNotBlank() }?.let { add("transport $it") } + }.joinToString(" · ") +} + +object TaskQueueProcessInspector { + fun loadConfiguration(dataDir: Path): QueueConfigurationSnapshot { + val normalizedDataDir = dataDir.toAbsolutePath().normalize() + val commandResult = runCommandCandidates( + listOf( + listOf("ps", "eww", "-axo", "pid=,ppid=,command="), + listOf("ps", "eww", "axo", "pid=,ppid=,command="), + ) + ) + + if (commandResult.errorMessage != null) { + return QueueConfigurationSnapshot( + serverProcesses = emptyList(), + configuredScopes = emptyList(), + errorMessage = commandResult.errorMessage, + ) + } + + if (commandResult.exitCode != 0) { + return QueueConfigurationSnapshot( + serverProcesses = emptyList(), + configuredScopes = emptyList(), + errorMessage = commandResult.output.ifBlank { "Failed to inspect task queue processes." }, + ) + } + + val serverProcesses = parseTaskQueueProcesses(commandResult.output) + .filter { it.dataDir == normalizedDataDir } + + if (serverProcesses.isEmpty()) { + return QueueConfigurationSnapshot( + serverProcesses = emptyList(), + configuredScopes = emptyList(), + statusMessage = "No live task queue server detected for $normalizedDataDir. Exact queues default to capacity 1 unless a matching server is running.", + ) + } + + val scopesByName = linkedMapOf>() + val scopePids = linkedMapOf>() + serverProcesses.forEach { process -> + process.queueCapacities.forEach { (scopeName, capacity) -> + scopesByName.getOrPut(scopeName) { linkedSetOf() }.add(capacity) + scopePids.getOrPut(scopeName) { linkedSetOf() }.add(process.pid) + } + } + + val configuredScopes = scopesByName.entries + .sortedBy { it.key } + .map { (scopeName, capacities) -> + ConfiguredQueueScope( + scopeName = scopeName, + capacities = capacities.toSortedSet(), + sourcePids = scopePids[scopeName].orEmpty().sorted(), + ) + } + + val conflictingScopes = configuredScopes.filter { it.hasConflict } + val statusMessage = when { + configuredScopes.isEmpty() -> "Detected ${serverProcesses.size} live task queue server(s), but none advertise --queue-capacity overrides." + conflictingScopes.isEmpty() -> "Detected ${serverProcesses.size} live task queue server(s) with ${configuredScopes.size} configured scope(s) for this data dir." + else -> null + } + val errorMessage = conflictingScopes + .takeIf { it.isNotEmpty() } + ?.joinToString( + prefix = "Conflicting queue-capacity values detected for: ", + separator = ", ", + ) { "${it.scopeName} (${it.displayCapacityLabel})" } + + return QueueConfigurationSnapshot( + serverProcesses = serverProcesses, + configuredScopes = configuredScopes, + statusMessage = statusMessage, + errorMessage = errorMessage, + ) + } +} + +object AdbInspector { + fun loadSnapshot(): AdbSnapshot { + val commandResult = runCommand("adb", "devices", "-l") + if (commandResult.errorMessage != null) { + return AdbSnapshot( + devices = emptyList(), + statusMessage = commandResult.errorMessage, + ) + } + + if (commandResult.exitCode != 0) { + return AdbSnapshot( + devices = emptyList(), + errorMessage = commandResult.output.ifBlank { "`adb devices -l` failed." }, + ) + } + + return parseAdbSnapshot(commandResult.output) + } +} + +internal data class CommandResult( + val output: String, + val exitCode: Int, + val errorMessage: String? = null, +) + +internal fun parseTaskQueueProcesses(output: String): List { + val processEntries = output.lineSequence() + .mapNotNull(::parseProcessLine) + .toList() + val processesByPid = processEntries.associateBy { it.pid } + + return processEntries + .filter { looksLikeTaskQueueServer(it.tokens) } + .map { processEntry -> + val dataDir = resolveTaskQueueDataDir(processEntry.tokens) + QueueServerProcess( + pid = processEntry.pid, + parentPid = processEntry.parentPid, + commandLine = processEntry.commandLine, + dataDir = dataDir, + queueCapacities = parseQueueCapacities(processEntry.tokens), + agentLabel = inferProcessAgentLabel(processEntry, processesByPid), + contextLabel = inferProcessContextLabel(processEntry, processesByPid), + ) + } + .sortedBy { it.pid } + .toList() +} + +internal fun parseAdbSnapshot(output: String): AdbSnapshot { + val cleanedLines = output.lineSequence() + .map { it.trim() } + .filter { it.isNotBlank() } + .filterNot { it.startsWith("*") } + .toList() + + val headerIndex = cleanedLines.indexOfFirst { it.startsWith("List of devices attached") } + if (headerIndex == -1) { + return AdbSnapshot( + devices = emptyList(), + errorMessage = output.ifBlank { "Unexpected output from `adb devices -l`." }, + ) + } + + val devices = cleanedLines.drop(headerIndex + 1) + .mapNotNull(::parseAdbDevice) + .sortedWith(compareBy({ !it.isConnected }, { it.serial })) + + return AdbSnapshot( + devices = devices, + statusMessage = if (devices.isEmpty()) "No ADB devices detected." else null, + ) +} + +internal fun extractEmulatorPort(value: String): String? { + val trimmed = value.trim() + EMULATOR_SERIAL_PATTERN.matchEntire(trimmed)?.let { + return it.groupValues[1] + } + LOCALHOST_EMULATOR_PATTERN.matchEntire(trimmed)?.let { + return it.groupValues[1] + } + return null +} + +private data class ProcessEntry( + val pid: Int, + val parentPid: Int?, + val tokens: List, + val commandLine: String, +) + +private fun parseProcessLine(line: String): ProcessEntry? { + val trimmed = line.trim() + if (trimmed.isEmpty()) return null + + val firstSpace = trimmed.indexOfFirst { it.isWhitespace() } + if (firstSpace <= 0) return null + + val pid = trimmed.substring(0, firstSpace).toIntOrNull() ?: return null + val remainder = trimmed.substring(firstSpace).trimStart() + if (remainder.isEmpty()) return null + + val secondSpace = remainder.indexOfFirst { it.isWhitespace() } + val secondToken = if (secondSpace > 0) remainder.substring(0, secondSpace) else null + val parentPid = secondToken?.toIntOrNull() + val commandLine = if (parentPid != null && secondSpace > 0) { + remainder.substring(secondSpace).trim() + } else { + remainder.trim() + } + if (commandLine.isEmpty()) return null + + return ProcessEntry( + pid = pid, + parentPid = parentPid, + tokens = shellSplit(commandLine), + commandLine = commandLine, + ) +} + +private fun looksLikeTaskQueueServer(tokens: List): Boolean { + if (tokens.isEmpty()) return false + + val executable = tokens.first().substringAfterLast('/') + if (executable.startsWith("python")) { + return firstPythonEntrypointToken(tokens.drop(1)) + ?.let(::looksLikeTaskQueueEntrypoint) + ?: false + } + + return tokens.first().let(::looksLikeTaskQueueEntrypoint) +} + +private fun looksLikeTaskQueueEntrypoint(token: String): Boolean { + return token.substringAfterLast('/') in TASK_QUEUE_ENTRYPOINT_NAMES +} + +private fun firstPythonEntrypointToken(tokens: List): String? { + var index = 0 + while (index < tokens.size) { + val token = tokens[index] + when { + token == "-m" -> return tokens.getOrNull(index + 1) + token == "-c" -> return null + token == "-W" || token == "-X" -> index += 2 + token.startsWith('-') -> index += 1 + looksLikeEnvironmentAssignment(token) -> index += 1 + else -> return token + } + } + return null +} + +private fun looksLikeEnvironmentAssignment(token: String): Boolean { + val separator = token.indexOf('=') + if (separator <= 0) return false + + val name = token.substring(0, separator) + val startsLikeEnvName = name.firstOrNull()?.let { it == '_' || it.isLetter() } == true + return startsLikeEnvName && name.all { it == '_' || it.isLetterOrDigit() } +} + +private fun inferProcessAgentLabel( + processEntry: ProcessEntry, + processesByPid: Map, +): String { + return (listOf(processEntry) + ancestorChain(processEntry, processesByPid)) + .mapNotNull(::knownAgentLabel) + .firstOrNull() + ?: "Task Queue" +} + +private fun inferProcessContextLabel( + processEntry: ProcessEntry, + processesByPid: Map, +): String? { + return (listOf(processEntry) + ancestorChain(processEntry, processesByPid)) + .mapNotNull { entry -> findProcessDirectory(entry.tokens) } + .map(::compactPathLabel) + .firstOrNull() +} + +private fun ancestorChain( + processEntry: ProcessEntry, + processesByPid: Map, +): List { + val ancestors = mutableListOf() + val visited = mutableSetOf() + var currentPid = processEntry.parentPid + while (currentPid != null && visited.add(currentPid)) { + val ancestor = processesByPid[currentPid] ?: break + ancestors += ancestor + currentPid = ancestor.parentPid + } + return ancestors +} + +private fun knownAgentLabel(processEntry: ProcessEntry): String? { + val executable = processEntry.tokens.firstOrNull()?.substringAfterLast('/') ?: return null + return when { + executable == "amp" -> buildAmpLabel(processEntry.tokens) + executable.startsWith("claude") -> "Claude" + executable.startsWith("codex") -> "Codex" + executable.startsWith("cursor") -> "Cursor" + executable == "zed" -> "Zed" + executable == "windsurf" -> "Windsurf" + else -> null + } +} + +private fun buildAmpLabel(tokens: List): String { + val mode = tokens.zipWithNext().firstOrNull { (current, _) -> + current == "-m" || current == "--mode" + }?.second ?: tokens.firstOrNull { it.startsWith("--mode=") }?.substringAfter('=') + + return if (mode.isNullOrBlank()) { + "Amp" + } else { + "Amp ${mode.trim()}" + } +} + +private fun findProcessDirectory(tokens: List): String? { + var index = 0 + while (index < tokens.size) { + val token = tokens[index] + when { + token.startsWith("--directory=") -> return token.substringAfter('=') + token == "--directory" && index + 1 < tokens.size -> return tokens[index + 1] + token.startsWith("--cwd=") -> return token.substringAfter('=') + token == "--cwd" && index + 1 < tokens.size -> return tokens[index + 1] + } + index += 1 + } + return null +} + +private fun resolveTaskQueueDataDir(tokens: List): Path { + var index = 0 + while (index < tokens.size) { + val token = tokens[index] + when { + token.startsWith("--data-dir=") -> { + return Paths.get(token.substringAfter('=')) + .toAbsolutePath() + .normalize() + } + token == "--data-dir" && index + 1 < tokens.size -> { + return Paths.get(tokens[index + 1]) + .toAbsolutePath() + .normalize() + } + } + index += 1 + } + + tokens.firstOrNull { it.startsWith("TASK_QUEUE_DATA_DIR=") } + ?.substringAfter('=') + ?.takeIf { it.isNotBlank() } + ?.let { configuredPath -> + return Paths.get(configuredPath) + .toAbsolutePath() + .normalize() + } + + return DEFAULT_QUEUE_DATA_DIR +} + +private fun parseQueueCapacities(tokens: List): Map { + val capacities = linkedMapOf() + var index = 0 + while (index < tokens.size) { + val token = tokens[index] + val rawSpec = when { + token.startsWith("--queue-capacity=") -> token.substringAfter("--queue-capacity=") + token == "--queue-capacity" && index + 1 < tokens.size -> { + index += 1 + tokens[index] + } + else -> null + } + + if (rawSpec != null) { + val separator = rawSpec.indexOf('=') + if (separator > 0 && separator < rawSpec.lastIndex) { + val scopeName = rawSpec.substring(0, separator) + val capacity = rawSpec.substring(separator + 1).toIntOrNull() + if (capacity != null) { + capacities[scopeName] = capacity + } + } + } + + index += 1 + } + + return capacities +} + +private fun parseAdbDevice(line: String): AdbDevice? { + val parts = line.split(Regex("\\s+")) + if (parts.size < 2) return null + + val details = parts.drop(2) + .mapNotNull { segment -> + val separator = segment.indexOf(':') + if (separator <= 0) { + null + } else { + segment.substring(0, separator) to segment.substring(separator + 1) + } + } + .toMap() + + return AdbDevice( + serial = parts[0], + state = parts[1], + details = details, + ) +} + +internal fun runCommand(vararg command: String): CommandResult { + val outputReader = Executors.newSingleThreadExecutor() + return try { + val process = ProcessBuilder(*command) + .redirectErrorStream(true) + .start() + val outputFuture = outputReader.submit { + process.inputStream.bufferedReader().use { it.readText() } + } + if (!process.waitFor(COMMAND_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + process.destroyForcibly() + process.waitFor() + outputFuture.cancel(true) + return CommandResult( + output = "", + exitCode = -1, + errorMessage = "`${command.joinToString(" ")}` timed out after ${COMMAND_TIMEOUT_SECONDS}s.", + ) + } + + val output = outputFuture.get() + CommandResult(output = output.trim(), exitCode = process.exitValue()) + } catch (error: Exception) { + CommandResult( + output = "", + exitCode = -1, + errorMessage = error.message ?: "Failed to run `${command.joinToString(" ")}`.", + ) + } finally { + outputReader.shutdownNow() + } +} + +internal fun runCommandCandidates( + candidates: List>, + runner: (List) -> CommandResult = { runCommand(*it.toTypedArray()) }, +): CommandResult { + var lastResult = CommandResult(output = "", exitCode = -1, errorMessage = "No command candidates provided.") + candidates.forEach { command -> + val result = runner(command) + if (result.errorMessage == null && result.exitCode == 0) { + return result + } + lastResult = result + } + return lastResult +} + +private fun shellSplit(commandLine: String): List { + val tokens = mutableListOf() + val current = StringBuilder() + var quote: Char? = null + var escaping = false + + fun flush() { + if (current.isNotEmpty()) { + tokens += current.toString() + current.clear() + } + } + + commandLine.forEach { ch -> + when { + escaping -> { + current.append(ch) + escaping = false + } + ch == '\\' && quote != '\'' -> escaping = true + quote != null && ch == quote -> quote = null + quote == null && (ch == '"' || ch == '\'') -> quote = ch + quote == null && ch.isWhitespace() -> flush() + else -> current.append(ch) + } + } + flush() + + return tokens +} diff --git a/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/Main.kt b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/Main.kt index 01513dd..6ab82d4 100644 --- a/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/Main.kt +++ b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/Main.kt @@ -1,5 +1,7 @@ package com.block.agenttaskqueue.sidecar +import androidx.compose.foundation.ExperimentalFoundationApi +import androidx.compose.foundation.TooltipArea import androidx.compose.foundation.background import androidx.compose.foundation.border import androidx.compose.foundation.horizontalScroll @@ -7,21 +9,25 @@ import androidx.compose.foundation.layout.Arrangement import androidx.compose.foundation.layout.Box import androidx.compose.foundation.layout.Column import androidx.compose.foundation.layout.ColumnScope +import androidx.compose.foundation.layout.IntrinsicSize import androidx.compose.foundation.layout.Row import androidx.compose.foundation.layout.Spacer +import androidx.compose.foundation.layout.fillMaxHeight import androidx.compose.foundation.layout.fillMaxSize import androidx.compose.foundation.layout.fillMaxWidth +import androidx.compose.foundation.layout.height import androidx.compose.foundation.layout.padding +import androidx.compose.foundation.layout.size import androidx.compose.foundation.layout.width import androidx.compose.foundation.layout.widthIn import androidx.compose.foundation.rememberScrollState +import androidx.compose.foundation.shape.CircleShape import androidx.compose.foundation.shape.RoundedCornerShape import androidx.compose.foundation.verticalScroll import androidx.compose.material3.Button import androidx.compose.material3.Card import androidx.compose.material3.CardDefaults import androidx.compose.material3.ExperimentalMaterial3Api -import androidx.compose.material3.HorizontalDivider import androidx.compose.material3.MaterialTheme import androidx.compose.material3.Scaffold import androidx.compose.material3.Surface @@ -39,9 +45,12 @@ import androidx.compose.ui.Modifier import androidx.compose.ui.draw.clip import androidx.compose.ui.graphics.Brush import androidx.compose.ui.graphics.Color +import androidx.compose.ui.text.font.FontStyle import androidx.compose.ui.text.font.FontWeight +import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.text.style.TextOverflow import androidx.compose.ui.unit.dp +import androidx.compose.ui.unit.sp import androidx.compose.ui.window.Window import androidx.compose.ui.window.application import androidx.compose.ui.window.rememberWindowState @@ -51,22 +60,65 @@ import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull import java.nio.file.Path import java.nio.file.Paths +import kotlin.math.max import kotlin.system.exitProcess private const val ACTIVE_INTERVAL_MS = 1000L private const val IDLE_INTERVAL_MS = 3000L +// Neutral surfaces +private val Background = Color(0xFFF4EEE5) +private val BackgroundGradientEnd = Color(0xFFEDE3D4) +private val SurfaceCard = Color(0xFFFFFCF7) +private val SurfaceElevated = Color(0xFFFFFFFF) +private val DividerColor = Color(0x14000000) + +// Text +private val TextPrimary = Color(0xFF1F262D) +private val TextSecondary = Color(0xFF5E6670) +private val TextMuted = Color(0xFF8A9099) + +// Status accents +private val AccentRunning = Color(0xFFC96A3D) +private val AccentWaiting = Color(0xFF3F7698) +private val AccentSuccess = Color(0xFF4E8A5A) +private val AccentWarning = Color(0xFFD08A2E) +private val AccentDanger = Color(0xFFB8472E) +private val AccentIdle = Color(0xFFB0A99E) + +private val TooltipColor = Color(0xFF2B2F35) + +// Stable per-agent color palette so a developer running multiple agents can +// visually pick out "which agent is that" at a glance. +private val AgentPalette = linkedMapOf( + "Amp" to Color(0xFF2A7E76), + "Claude" to Color(0xFFB8742E), + "Codex" to Color(0xFF6B4AA8), + "Cursor" to Color(0xFFA83F6C), + "Zed" to Color(0xFF2E6BA8), + "Windsurf" to Color(0xFF5E8A2E), +) +private val UnknownAgentColor = Color(0xFF5A6370) + +private fun agentColor(label: String?): Color { + if (label.isNullOrBlank()) return UnknownAgentColor + AgentPalette.forEach { (name, color) -> + if (label.startsWith(name, ignoreCase = true)) return color + } + return UnknownAgentColor +} + private val DashboardColors = lightColorScheme( primary = Color(0xFF305B78), - secondary = Color(0xFFB35C33), - tertiary = Color(0xFF46705C), - background = Color(0xFFF7F1E8), - surface = Color(0xFFFFFCF8), - surfaceVariant = Color(0xFFE9DFCf), - onBackground = Color(0xFF1F262D), - onSurface = Color(0xFF1F262D), + secondary = AccentRunning, + tertiary = AccentSuccess, + background = Background, + surface = SurfaceCard, + surfaceVariant = Color(0xFFE9DFCF), + onBackground = TextPrimary, + onSurface = TextPrimary, outline = Color(0xFF877F74), - error = Color(0xFF8D2C2C), + error = AccentDanger, ) fun main(args: Array) = application { @@ -75,7 +127,7 @@ fun main(args: Array) = application { Window( onCloseRequest = ::exitApplication, title = "Agent Task Queue Sidecar", - state = rememberWindowState(width = 1320.dp, height = 900.dp), + state = rememberWindowState(width = 1440.dp, height = 920.dp), ) { MaterialTheme(colorScheme = DashboardColors) { QueueDashboard(dataDir = dataDir) @@ -83,7 +135,6 @@ fun main(args: Array) = application { } } -@OptIn(ExperimentalMaterial3Api::class) @Composable private fun QueueDashboard(dataDir: Path) { val refreshRequests = remember(dataDir) { Channel(Channel.CONFLATED) } @@ -102,187 +153,288 @@ private fun QueueDashboard(dataDir: Path) { } Scaffold( - topBar = { - TopAppBar( - title = { - Column(verticalArrangement = Arrangement.spacedBy(2.dp)) { - Text("Agent Task Queue", fontWeight = FontWeight.SemiBold) - Text( - text = snapshot.dataDir.toString(), - style = MaterialTheme.typography.bodySmall, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.7f), - maxLines = 1, - overflow = TextOverflow.Ellipsis, - ) - } - }, - actions = { - Text( - text = "Updated ${formatRefreshTime(snapshot.refreshedAt)}", - style = MaterialTheme.typography.bodySmall, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.7f), - ) - Spacer(Modifier.width(12.dp)) - Button(onClick = { refreshRequests.trySend(Unit) }) { - Text("Refresh") - } - Spacer(Modifier.width(16.dp)) - }, - ) - }, + containerColor = MaterialTheme.colorScheme.background, + topBar = { DashboardTopBar(snapshot) { refreshRequests.trySend(Unit) } }, ) { innerPadding -> Column( modifier = Modifier .fillMaxSize() .background( - Brush.verticalGradient( - listOf( - MaterialTheme.colorScheme.background, - Color(0xFFF3ECE2), - ) - ) + Brush.verticalGradient(listOf(Background, BackgroundGradientEnd)) ) - .verticalScroll(rememberScrollState()) .padding(innerPadding) - .padding(horizontal = 24.dp, vertical = 20.dp), - verticalArrangement = Arrangement.spacedBy(20.dp), + .padding(horizontal = 20.dp, vertical = 12.dp), + verticalArrangement = Arrangement.spacedBy(12.dp), ) { - SummaryRow(snapshot) - ScopeOverview(snapshot.scopeGroups) - - snapshot.errorMessage?.let { ErrorBanner(it) } - snapshot.statusMessage?.let { InfoBanner(it) } - - TaskSection( - title = "Running Now", - subtitle = "Tasks currently holding queue slots.", - tasks = snapshot.runningTasks, - emptyLabel = "No running tasks.", - ) - - TaskSection( - title = "Queued / Waiting", - subtitle = "Tasks blocked behind older work in their exact queue.", - tasks = snapshot.waitingTasks, - emptyLabel = "No waiting tasks.", - ) - - ScopeDetails(snapshot.scopeGroups) + HeroStatStrip(snapshot) + BannerStack(snapshot) + Row( + modifier = Modifier.fillMaxWidth().weight(1f, fill = true), + horizontalArrangement = Arrangement.spacedBy(14.dp), + ) { + Column( + modifier = Modifier + .weight(1f) + .fillMaxHeight() + .verticalScroll(rememberScrollState()), + verticalArrangement = Arrangement.spacedBy(12.dp), + ) { + QueueActivityPane(snapshot) + LegendFooter() + } + Column( + modifier = Modifier + .width(400.dp) + .fillMaxHeight() + .verticalScroll(rememberScrollState()), + verticalArrangement = Arrangement.spacedBy(12.dp), + ) { + AgentsPanel(snapshot) + EmulatorsPanel(snapshot) + ServersPanel(snapshot) + } + } + } + } +} +@OptIn(ExperimentalMaterial3Api::class) +@Composable +private fun DashboardTopBar(snapshot: QueueSnapshot, onRefresh: () -> Unit) { + TopAppBar( + title = { + Column(verticalArrangement = Arrangement.spacedBy(2.dp)) { + Text("Agent Task Queue", fontWeight = FontWeight.SemiBold) + Text( + text = snapshot.dataDir.toString(), + style = MaterialTheme.typography.bodySmall, + color = TextSecondary, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + } + }, + actions = { Text( - text = "Live view from queue.db. Queue capacities set with --queue-capacity are process-local and not persisted in SQLite.", + text = "Updated ${formatRefreshTime(snapshot.refreshedAt)}", style = MaterialTheme.typography.bodySmall, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.65f), + color = TextSecondary, ) - } - } + Spacer(Modifier.width(12.dp)) + Button(onClick = onRefresh) { Text("Refresh") } + Spacer(Modifier.width(16.dp)) + }, + ) } +// ---------- Hero strip ---------- + @Composable -private fun SummaryRow(snapshot: QueueSnapshot) { +private fun HeroStatStrip(snapshot: QueueSnapshot) { + val running = snapshot.summary.running + val waiting = snapshot.summary.waiting + val activeAgents = snapshot.runningTasks + .mapNotNull { it.displayAgentLabel } + .distinct() + .size + val configuredEmu = snapshot.emulatorAlignment.configuredQueues.size + val matchedEmu = snapshot.emulatorAlignment.matchedPorts.size + val connectedEmu = snapshot.adb.connectedEmulators + Row( modifier = Modifier.fillMaxWidth(), - horizontalArrangement = Arrangement.spacedBy(16.dp), + horizontalArrangement = Arrangement.spacedBy(12.dp), ) { - SummaryCard( - title = "Running", - value = snapshot.summary.running.toString(), - caption = "Active commands", - accent = Color(0xFFD06A3A), + StatTile( modifier = Modifier.weight(1f), + label = "RUNNING", + value = running.toString(), + caption = if (running == 0) "Nothing active" else if (running == 1) "Active task" else "Active tasks", + accent = if (running > 0) AccentRunning else AccentIdle, ) - SummaryCard( - title = "Waiting", - value = snapshot.summary.waiting.toString(), - caption = "Queued tasks", - accent = Color(0xFF3D7EA6), + StatTile( modifier = Modifier.weight(1f), + label = "WAITING", + value = waiting.toString(), + caption = when { + waiting == 0 -> "Queue clear" + waiting >= 5 -> "Queue backed up" + waiting == 1 -> "Queued task" + else -> "Queued tasks" + }, + accent = when { + waiting >= 5 -> AccentDanger + waiting > 0 -> AccentWarning + else -> AccentIdle + }, ) - SummaryCard( - title = "Exact Queues", - value = snapshot.queueLanes.size.toString(), - caption = "Distinct queue_name values", - accent = Color(0xFF5B8A67), + StatTile( modifier = Modifier.weight(1f), + label = "AGENTS", + value = activeAgents.toString(), + caption = if (activeAgents == 0) "No agents running" else "With running tasks", + accent = if (activeAgents > 0) AccentSuccess else AccentIdle, ) - SummaryCard( - title = "Root Scopes", - value = snapshot.scopeGroups.size.toString(), - caption = "Top-level queue groups", - accent = Color(0xFF8B5F8C), + StatTile( modifier = Modifier.weight(1f), + label = "EMULATORS", + value = if (configuredEmu == 0) connectedEmu.toString() else "$matchedEmu/$configuredEmu", + caption = when { + configuredEmu == 0 && connectedEmu == 0 -> "None connected" + configuredEmu == 0 -> "Connected, no lanes" + matchedEmu < configuredEmu -> "Lane missing device" + else -> "Lanes matched" + }, + accent = when { + configuredEmu > 0 && matchedEmu < configuredEmu -> AccentDanger + configuredEmu == 0 && connectedEmu == 0 -> AccentIdle + else -> AccentSuccess + }, ) } } @Composable -private fun SummaryCard( - title: String, +private fun StatTile( + modifier: Modifier, + label: String, value: String, caption: String, accent: Color, - modifier: Modifier = Modifier, ) { Card( modifier = modifier, - colors = CardDefaults.cardColors(containerColor = MaterialTheme.colorScheme.surface), + colors = CardDefaults.cardColors(containerColor = SurfaceElevated), + elevation = CardDefaults.cardElevation(defaultElevation = 1.dp), ) { - Column( + Row( modifier = Modifier .fillMaxWidth() - .padding(18.dp), - verticalArrangement = Arrangement.spacedBy(10.dp), + .height(IntrinsicSize.Min) + .padding(horizontal = 14.dp, vertical = 12.dp), + verticalAlignment = Alignment.CenterVertically, ) { Box( modifier = Modifier - .clip(RoundedCornerShape(999.dp)) - .background(accent.copy(alpha = 0.16f)) - .padding(horizontal = 10.dp, vertical = 5.dp), - ) { - Text(title, color = accent, style = MaterialTheme.typography.labelLarge) - } - Text(value, style = MaterialTheme.typography.headlineMedium, fontWeight = FontWeight.Bold) - Text( - caption, - style = MaterialTheme.typography.bodySmall, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.7f), + .width(4.dp) + .height(44.dp) + .clip(RoundedCornerShape(2.dp)) + .background(accent), ) + Spacer(Modifier.width(12.dp)) + Column(verticalArrangement = Arrangement.spacedBy(1.dp)) { + Text( + text = label, + style = MaterialTheme.typography.labelSmall, + color = accent, + fontWeight = FontWeight.SemiBold, + letterSpacing = 1.2.sp, + ) + Text( + text = value, + style = MaterialTheme.typography.headlineLarge, + fontWeight = FontWeight.Bold, + color = TextPrimary, + ) + Text( + text = caption, + style = MaterialTheme.typography.bodySmall, + color = TextSecondary, + ) + } } } } +// ---------- Queue activity pane ---------- + @Composable -private fun ScopeOverview(scopeGroups: List) { - if (scopeGroups.isEmpty()) { +private fun QueueActivityPane(snapshot: QueueSnapshot) { + if (snapshot.scopeGroups.isEmpty()) { + EmptyState( + title = "No queue activity", + message = snapshot.statusMessage ?: "No queues are visible yet.", + ) return } - SectionCard(title = "Scope Activity", subtitle = "Each card rolls up descendant exact queues under a shared root scope.") { + val sortedScopes = snapshot.scopeGroups.sortedWith( + compareByDescending { it.waitingCount > 0 } + .thenByDescending { scopePressure(it, snapshot) } + .thenByDescending { it.runningCount } + .thenBy { it.scopeName } + ) + Column(verticalArrangement = Arrangement.spacedBy(12.dp)) { + sortedScopes.forEach { scope -> + ScopeCard(scope = scope, snapshot = snapshot) + } + } +} + +private fun scopePressure(scope: ScopeGroup, snapshot: QueueSnapshot): Double { + val usage = snapshot.configuredScopeUsage.firstOrNull { it.scopeName == scope.scopeName } + val cap = usage?.capacity + if (cap == null || cap == 0) { + return if (scope.runningCount == 0) 0.0 else 1.0 + } + return scope.runningCount.toDouble() / cap +} + +@Composable +private fun ScopeCard(scope: ScopeGroup, snapshot: QueueSnapshot) { + val usage = snapshot.configuredScopeUsage.firstOrNull { it.scopeName == scope.scopeName } + val capacity = usage?.capacity + val used = usage?.usedSlots ?: scope.runningCount + val hasBackup = scope.waitingCount > 0 + val capFull = capacity != null && used >= capacity && capacity > 0 + + val accent = when { + hasBackup && capFull -> AccentDanger + hasBackup -> AccentWarning + capFull -> AccentWarning + used > 0 -> AccentRunning + else -> AccentIdle + } + + Card( + colors = CardDefaults.cardColors(containerColor = SurfaceCard), + elevation = CardDefaults.cardElevation(defaultElevation = 1.dp), + ) { Row( modifier = Modifier .fillMaxWidth() - .horizontalScroll(rememberScrollState()), - horizontalArrangement = Arrangement.spacedBy(12.dp), + .height(IntrinsicSize.Min), ) { - scopeGroups.forEach { scope -> - Card( - modifier = Modifier.widthIn(min = 220.dp), - colors = CardDefaults.cardColors(containerColor = Color(0xFFF7F0E4)), - ) { - Column( - modifier = Modifier.padding(16.dp), - verticalArrangement = Arrangement.spacedBy(8.dp), - ) { - Text(scope.scopeName, style = MaterialTheme.typography.titleMedium, fontWeight = FontWeight.SemiBold) - Text( - text = "${scope.runningCount} running · ${scope.waitingCount} waiting", - style = MaterialTheme.typography.bodyMedium, - ) - Text( - text = "${scope.lanes.size} exact queues · ${scope.taskCount} total tasks", - style = MaterialTheme.typography.bodySmall, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.68f), - ) + Box( + modifier = Modifier + .width(5.dp) + .fillMaxHeight() + .background(accent), + ) + Column( + modifier = Modifier + .fillMaxWidth() + .padding(horizontal = 16.dp, vertical = 14.dp), + verticalArrangement = Arrangement.spacedBy(12.dp), + ) { + ScopeHeader( + scope = scope, + usage = usage, + accent = accent, + hasBackup = hasBackup, + capFull = capFull, + ) + val sortedLanes = scope.lanes.sortedWith( + compareByDescending { it.waitingCount > 0 } + .thenByDescending { it.runningCount > 0 } + .thenByDescending { it.configuredScope != null } + .thenBy { it.queueName } + ) + Column(verticalArrangement = Arrangement.spacedBy(10.dp)) { + sortedLanes.forEach { lane -> + val matched = lane.emulatorPort != null && + lane.emulatorPort in snapshot.emulatorAlignment.matchedPorts + LaneRow(lane = lane, emulatorMatched = matched) } } } @@ -291,183 +443,906 @@ private fun ScopeOverview(scopeGroups: List) { } @Composable -private fun TaskSection( - title: String, - subtitle: String, - tasks: List, - emptyLabel: String, +private fun ScopeHeader( + scope: ScopeGroup, + usage: ConfiguredScopeUsage?, + accent: Color, + hasBackup: Boolean, + capFull: Boolean, ) { - SectionCard(title = title, subtitle = subtitle) { - if (tasks.isEmpty()) { - Text(emptyLabel, color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.65f)) - return@SectionCard - } - - Column(verticalArrangement = Arrangement.spacedBy(10.dp)) { - tasks.forEach { task -> - TaskRow(task = task, showQueue = true) + Row( + modifier = Modifier.fillMaxWidth(), + horizontalArrangement = Arrangement.SpaceBetween, + verticalAlignment = Alignment.Top, + ) { + Column( + modifier = Modifier.weight(1f), + verticalArrangement = Arrangement.spacedBy(4.dp), + ) { + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(8.dp), + ) { + Text( + text = scope.scopeName, + style = MaterialTheme.typography.headlineSmall, + fontWeight = FontWeight.SemiBold, + ) + if (hasBackup) { + PressureChip("BACKUP", AccentDanger) + } + if (capFull && !hasBackup) { + PressureChip("AT CAPACITY", AccentWarning) + } } + Text( + text = "${scope.lanes.size} lane${if (scope.lanes.size == 1) "" else "s"} · " + + "${scope.runningCount} running · ${scope.waitingCount} waiting", + style = MaterialTheme.typography.bodySmall, + color = TextSecondary, + ) } + CapacityMeter( + capacity = usage?.capacity, + used = usage?.usedSlots ?: scope.runningCount, + accent = accent, + configured = usage != null, + ) } } @Composable -private fun ScopeDetails(scopeGroups: List) { - SectionCard( - title = "Queues By Scope", - subtitle = "Exact queues stay FIFO; grouping them here makes hierarchical queue families easier to scan.", +private fun CapacityMeter( + capacity: Int?, + used: Int, + accent: Color, + configured: Boolean, +) { + Column( + horizontalAlignment = Alignment.End, + verticalArrangement = Arrangement.spacedBy(4.dp), ) { - if (scopeGroups.isEmpty()) { - Text("No active queues.", color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.65f)) - return@SectionCard + if (capacity == null) { + Text( + text = if (configured) "cap conflict" else "default per-lane", + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + ) + return + } + Text( + text = "$used / $capacity slots", + style = MaterialTheme.typography.labelLarge, + fontWeight = FontWeight.SemiBold, + color = accent, + ) + Row(horizontalArrangement = Arrangement.spacedBy(3.dp)) { + repeat(capacity) { i -> + Box( + modifier = Modifier + .size(width = 18.dp, height = 10.dp) + .clip(RoundedCornerShape(3.dp)) + .background(if (i < used) accent else accent.copy(alpha = 0.15f)) + .border(1.dp, accent.copy(alpha = 0.35f), RoundedCornerShape(3.dp)), + ) + } } + } +} + +@Composable +private fun LaneRow(lane: QueueLane, emulatorMatched: Boolean) { + val leaf = lane.queueName.substringAfterLast('/') + val showFullPath = leaf != lane.queueName + val cap = lane.exactCapacity + val running = lane.tasks.filter { it.status.equals("running", ignoreCase = true) } + val waiting = lane.tasks.filter { it.status.equals("waiting", ignoreCase = true) } + val hasBackup = waiting.isNotEmpty() + val accent = when { + hasBackup -> AccentWarning + running.isNotEmpty() -> AccentRunning + else -> AccentIdle + } - Column(verticalArrangement = Arrangement.spacedBy(18.dp)) { - scopeGroups.forEach { scope -> - Column(verticalArrangement = Arrangement.spacedBy(12.dp)) { - Text(scope.scopeName, style = MaterialTheme.typography.titleLarge, fontWeight = FontWeight.SemiBold) - scope.lanes.forEach { lane -> - QueueLaneCard(lane) + Surface( + shape = RoundedCornerShape(14.dp), + color = SurfaceElevated, + tonalElevation = 0.dp, + ) { + Row( + modifier = Modifier + .fillMaxWidth() + .height(IntrinsicSize.Min) + .border(1.dp, accent.copy(alpha = 0.2f), RoundedCornerShape(14.dp)), + ) { + Box( + modifier = Modifier + .width(3.dp) + .fillMaxHeight() + .background(accent), + ) + Column( + modifier = Modifier + .fillMaxWidth() + .padding(horizontal = 12.dp, vertical = 10.dp), + verticalArrangement = Arrangement.spacedBy(8.dp), + ) { + Row( + modifier = Modifier.fillMaxWidth(), + horizontalArrangement = Arrangement.SpaceBetween, + verticalAlignment = Alignment.CenterVertically, + ) { + Column( + modifier = Modifier.weight(1f), + verticalArrangement = Arrangement.spacedBy(1.dp), + ) { + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(8.dp), + ) { + Text( + text = leaf, + style = MaterialTheme.typography.titleSmall, + fontWeight = FontWeight.SemiBold, + ) + if (lane.isEmulatorLike) { + EmulatorDot( + matched = emulatorMatched, + port = lane.emulatorPort, + ) + } + if (lane.hasCapacityConflict) { + PressureChip("CAP CONFLICT", AccentDanger) + } + } + if (showFullPath) { + Text( + text = lane.queueName, + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + } } + LaneCountBadge(running = running.size, waiting = waiting.size, cap = cap) } + + LaneTimeline(cap = cap, running = running, waiting = waiting) } } } } @Composable -private fun QueueLaneCard(lane: QueueLane) { - Card(colors = CardDefaults.cardColors(containerColor = Color(0xFFFFFBF5))) { - Column(modifier = Modifier.fillMaxWidth().padding(16.dp), verticalArrangement = Arrangement.spacedBy(12.dp)) { +private fun LaneCountBadge(running: Int, waiting: Int, cap: Int) { + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(8.dp), + ) { + if (running > 0) { Row( - modifier = Modifier.fillMaxWidth(), - horizontalArrangement = Arrangement.SpaceBetween, verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(4.dp), ) { - Column(verticalArrangement = Arrangement.spacedBy(4.dp)) { - Text(lane.queueName, style = MaterialTheme.typography.titleMedium, fontWeight = FontWeight.SemiBold) - Text( - text = "${lane.runningCount} running · ${lane.waitingCount} waiting · ${lane.tasks.size} task(s)", - style = MaterialTheme.typography.bodySmall, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.7f), - ) - } + Box( + Modifier + .size(7.dp) + .clip(CircleShape) + .background(AccentRunning), + ) + Text( + text = "$running/$cap", + style = MaterialTheme.typography.labelSmall, + color = TextSecondary, + fontWeight = FontWeight.Medium, + ) } + } + if (waiting > 0) { + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(4.dp), + ) { + Box( + Modifier + .size(7.dp) + .clip(CircleShape) + .background(AccentWarning), + ) + Text( + text = "+$waiting", + style = MaterialTheme.typography.labelSmall, + color = TextSecondary, + fontWeight = FontWeight.Medium, + ) + } + } + if (running == 0 && waiting == 0) { + Text( + text = "idle", + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + ) + } + } +} - HorizontalDivider(color = MaterialTheme.colorScheme.outline.copy(alpha = 0.3f)) - - Column(verticalArrangement = Arrangement.spacedBy(8.dp)) { - lane.tasks.forEach { task -> - TaskRow(task = task, showQueue = false) - } +@Composable +private fun LaneTimeline(cap: Int, running: List, waiting: List) { + if (running.isEmpty() && waiting.isEmpty()) { + Text( + text = "Lane idle", + style = MaterialTheme.typography.bodySmall, + color = TextMuted, + fontStyle = FontStyle.Italic, + ) + return + } + val slotCount = max(cap, running.size).coerceAtLeast(1) + Row( + modifier = Modifier + .fillMaxWidth() + .horizontalScroll(rememberScrollState()), + horizontalArrangement = Arrangement.spacedBy(8.dp), + verticalAlignment = Alignment.CenterVertically, + ) { + repeat(slotCount) { i -> + val task = running.getOrNull(i) + if (task != null) { + TaskPill(task = task, running = true) + } else { + EmptySlotPill() + } + } + if (waiting.isNotEmpty()) { + Box( + modifier = Modifier + .width(1.dp) + .height(44.dp) + .background(DividerColor), + ) + waiting.forEach { task -> + TaskPill(task = task, running = false) } } } } @Composable -private fun TaskRow(task: QueueTask, showQueue: Boolean) { - val accent = if (task.status.equals("running", ignoreCase = true)) { - Color(0xFFD06A3A) - } else { - Color(0xFF3D7EA6) +private fun EmptySlotPill() { + Box( + modifier = Modifier + .size(width = 200.dp, height = 62.dp) + .clip(RoundedCornerShape(10.dp)) + .background(Color(0xFFF2ECE0)) + .border(1.dp, DividerColor, RoundedCornerShape(10.dp)), + contentAlignment = Alignment.Center, + ) { + Text( + text = "open slot", + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + fontStyle = FontStyle.Italic, + ) } +} - Surface( - modifier = Modifier.fillMaxWidth(), - shape = RoundedCornerShape(18.dp), - color = MaterialTheme.colorScheme.surface, - tonalElevation = 1.dp, +@OptIn(ExperimentalFoundationApi::class) +@Composable +private fun TaskPill(task: QueueTask, running: Boolean) { + val accent = agentColor(task.displayAgentLabel) + val bg = if (running) accent.copy(alpha = 0.14f) else accent.copy(alpha = 0.06f) + val border = if (running) accent.copy(alpha = 0.55f) else accent.copy(alpha = 0.3f) + + TooltipArea( + tooltip = { TooltipBubble(buildTaskTooltip(task)) }, + delayMillis = 200, ) { Column( modifier = Modifier - .fillMaxWidth() - .border(1.dp, accent.copy(alpha = 0.18f), RoundedCornerShape(18.dp)) - .padding(14.dp), - verticalArrangement = Arrangement.spacedBy(6.dp), + .widthIn(min = 200.dp, max = 260.dp) + .clip(RoundedCornerShape(10.dp)) + .background(bg) + .border(1.dp, border, RoundedCornerShape(10.dp)) + .padding(horizontal = 10.dp, vertical = 8.dp), + verticalArrangement = Arrangement.spacedBy(3.dp), ) { Row( - modifier = Modifier.fillMaxWidth(), - horizontalArrangement = Arrangement.SpaceBetween, verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(6.dp), ) { - Row(horizontalArrangement = Arrangement.spacedBy(8.dp), verticalAlignment = Alignment.CenterVertically) { - StatusBadge(task.status, accent) - Text("#${task.id}", fontWeight = FontWeight.Medium) - if (showQueue) { - Text( - task.queueName, - style = MaterialTheme.typography.bodySmall, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.7f), - ) - } - } + Box( + Modifier + .size(8.dp) + .clip(CircleShape) + .background(accent), + ) Text( - task.statusAge(), - style = MaterialTheme.typography.bodySmall, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.7f), + text = task.displayAgentLabel ?: "unknown agent", + style = MaterialTheme.typography.labelSmall, + color = accent, + fontWeight = FontWeight.SemiBold, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + modifier = Modifier.weight(1f, fill = false), + ) + Spacer(Modifier.weight(1f)) + Text( + text = "#${task.id}", + style = MaterialTheme.typography.labelSmall, + color = TextSecondary, ) } - Text( text = task.displayCommand, - style = MaterialTheme.typography.bodyLarge, - maxLines = 2, + style = MaterialTheme.typography.bodySmall, + fontWeight = if (running) FontWeight.Medium else FontWeight.Normal, + color = TextPrimary, + maxLines = 1, overflow = TextOverflow.Ellipsis, ) - - val processLine = buildString { - task.pid?.let { append("server pid $it") } - task.childPid?.let { - if (isNotEmpty()) append(" · ") - append("child pid $it") + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(6.dp), + ) { + task.displayContextLabel?.let { + Text( + text = it, + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + modifier = Modifier.weight(1f, fill = false), + ) } - } - if (processLine.isNotEmpty()) { + Spacer(Modifier.weight(1f)) Text( - processLine, - style = MaterialTheme.typography.bodySmall, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.6f), + text = task.statusAge(), + style = MaterialTheme.typography.labelSmall, + color = TextMuted, ) } } } } +private fun buildTaskTooltip(task: QueueTask): String = buildString { + append("#${task.id} ") + append(task.status.uppercase()) + append('\n') + append(task.displayCommand) + task.displayIdentityLabel?.let { + append("\n\n") + append(it) + } + append("\n\nQueue: ") + append(task.queueName) + append('\n') + append(task.statusAge()) + val pidParts = buildList { + task.pid?.let { add("server pid $it") } + task.childPid?.let { add("child pid $it") } + } + if (pidParts.isNotEmpty()) { + append('\n') + append(pidParts.joinToString(" · ")) + } +} + +@OptIn(ExperimentalFoundationApi::class) +@Composable +private fun EmulatorDot(matched: Boolean, port: String?) { + val color = if (matched) AccentSuccess else AccentDanger + val tooltip = if (matched) { + "Queue lane maps to ADB emulator on port ${port ?: "?"}." + } else { + "Lane expects emulator on port ${port ?: "?"}, but `adb devices -l` does not show one." + } + TooltipArea( + tooltip = { TooltipBubble(tooltip) }, + delayMillis = 200, + ) { + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(4.dp), + modifier = Modifier + .clip(RoundedCornerShape(999.dp)) + .background(color.copy(alpha = 0.12f)) + .padding(horizontal = 6.dp, vertical = 2.dp), + ) { + Box( + Modifier + .size(6.dp) + .clip(CircleShape) + .background(color), + ) + Text( + text = if (matched) "ADB :${port ?: "?"}" else "ADB missing", + style = MaterialTheme.typography.labelSmall, + color = color, + fontWeight = FontWeight.Medium, + ) + } + } +} + @Composable -private fun StatusBadge(text: String, accent: Color) { +private fun PressureChip(text: String, accent: Color) { Box( modifier = Modifier .clip(RoundedCornerShape(999.dp)) - .background(accent.copy(alpha = 0.16f)) - .padding(horizontal = 10.dp, vertical = 4.dp), + .background(accent.copy(alpha = 0.15f)) + .border(1.dp, accent.copy(alpha = 0.55f), RoundedCornerShape(999.dp)) + .padding(horizontal = 8.dp, vertical = 3.dp), ) { Text( - text = text.uppercase(), + text = text, color = accent, + style = MaterialTheme.typography.labelSmall, + fontWeight = FontWeight.SemiBold, + letterSpacing = 0.8.sp, + ) + } +} + +// ---------- Environment pane: agents ---------- + +private data class AgentContextSummary( + val context: String, + val runningCount: Int, + val waitingCount: Int, +) + +private data class AgentSummary( + val agentLabel: String, + val contexts: List, + val runningTotal: Int, + val waitingTotal: Int, +) + +private fun buildAgentSummaries(snapshot: QueueSnapshot): List { + return snapshot.tasks + .groupBy { it.displayAgentLabel ?: "Unknown agent" } + .map { (agentLabel, tasks) -> + val contexts = tasks + .groupBy { it.displayContextLabel ?: "no context" } + .map { (ctx, ctxTasks) -> + AgentContextSummary( + context = ctx, + runningCount = ctxTasks.count { it.status.equals("running", ignoreCase = true) }, + waitingCount = ctxTasks.count { it.status.equals("waiting", ignoreCase = true) }, + ) + } + .sortedWith( + compareByDescending { it.runningCount } + .thenByDescending { it.waitingCount } + .thenBy { it.context } + ) + AgentSummary( + agentLabel = agentLabel, + contexts = contexts, + runningTotal = tasks.count { it.status.equals("running", ignoreCase = true) }, + waitingTotal = tasks.count { it.status.equals("waiting", ignoreCase = true) }, + ) + } + .sortedWith( + compareByDescending { it.runningTotal } + .thenByDescending { it.waitingTotal } + .thenBy { it.agentLabel } + ) +} + +@Composable +private fun AgentsPanel(snapshot: QueueSnapshot) { + val summaries = buildAgentSummaries(snapshot) + PaneSection( + title = "Agents", + subtitle = when { + summaries.isEmpty() -> "No agent activity" + summaries.size == 1 -> "1 agent with live tasks" + else -> "${summaries.size} agents with live tasks" + }, + ) { + if (summaries.isEmpty()) { + Text( + text = "No running or queued tasks.", + style = MaterialTheme.typography.bodySmall, + color = TextMuted, + ) + return@PaneSection + } + summaries.forEach { AgentCard(it) } + } +} + +@Composable +private fun AgentCard(summary: AgentSummary) { + val accent = agentColor(summary.agentLabel) + Surface( + shape = RoundedCornerShape(12.dp), + color = SurfaceElevated, + ) { + Row( + modifier = Modifier + .fillMaxWidth() + .height(IntrinsicSize.Min) + .border(1.dp, accent.copy(alpha = 0.22f), RoundedCornerShape(12.dp)), + ) { + Box( + modifier = Modifier + .width(4.dp) + .fillMaxHeight() + .background(accent), + ) + Column( + modifier = Modifier + .fillMaxWidth() + .padding(horizontal = 12.dp, vertical = 10.dp), + verticalArrangement = Arrangement.spacedBy(6.dp), + ) { + Row(verticalAlignment = Alignment.CenterVertically) { + Box( + Modifier + .size(10.dp) + .clip(CircleShape) + .background(accent), + ) + Spacer(Modifier.width(8.dp)) + Text( + text = summary.agentLabel, + style = MaterialTheme.typography.titleMedium, + fontWeight = FontWeight.SemiBold, + color = accent, + modifier = Modifier.weight(1f), + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + AgentCountBadges( + running = summary.runningTotal, + waiting = summary.waitingTotal, + ) + } + if (summary.contexts.isNotEmpty()) { + Column(verticalArrangement = Arrangement.spacedBy(3.dp)) { + summary.contexts.forEach { ctx -> + Row( + modifier = Modifier.fillMaxWidth(), + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(6.dp), + ) { + Box( + Modifier + .size(5.dp) + .clip(CircleShape) + .background(accent.copy(alpha = 0.45f)), + ) + Text( + text = ctx.context, + style = MaterialTheme.typography.bodySmall, + color = TextPrimary, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + modifier = Modifier.weight(1f), + ) + if (ctx.runningCount > 0) { + Text( + text = "${ctx.runningCount} run", + style = MaterialTheme.typography.labelSmall, + color = AccentRunning, + fontWeight = FontWeight.Medium, + ) + } + if (ctx.waitingCount > 0) { + Text( + text = "${ctx.waitingCount} wait", + style = MaterialTheme.typography.labelSmall, + color = AccentWarning, + fontWeight = FontWeight.Medium, + ) + } + } + } + } + } + } + } + } +} + +@Composable +private fun AgentCountBadges(running: Int, waiting: Int) { + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(6.dp), + ) { + CountBubble(label = running.toString(), accent = AccentRunning, caption = "run") + CountBubble(label = waiting.toString(), accent = AccentWarning, caption = "wait") + } +} + +@Composable +private fun CountBubble(label: String, accent: Color, caption: String) { + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(3.dp), + modifier = Modifier + .clip(RoundedCornerShape(999.dp)) + .background(accent.copy(alpha = 0.12f)) + .padding(horizontal = 7.dp, vertical = 2.dp), + ) { + Text( + text = label, style = MaterialTheme.typography.labelMedium, + color = accent, fontWeight = FontWeight.SemiBold, ) + Text( + text = caption, + style = MaterialTheme.typography.labelSmall, + color = accent.copy(alpha = 0.75f), + ) } } +// ---------- Environment pane: emulators ---------- + @Composable -private fun SectionCard( +private fun EmulatorsPanel(snapshot: QueueSnapshot) { + val alignment = snapshot.emulatorAlignment + val matchedPorts = alignment.matchedPorts + val configured = alignment.configuredQueues + val connected = snapshot.adb.devices.filter { it.isConnected && it.isEmulator } + val devicesByPort = connected.associateBy { it.emulatorPort } + val extraDevices = connected.filter { it.emulatorPort == null || it.emulatorPort !in matchedPorts } + + PaneSection( + title = "Emulators", + subtitle = when { + configured.isEmpty() && connected.isEmpty() -> "No configured lanes · no emulators" + configured.isEmpty() -> "${connected.size} emulator(s) connected · no lanes configured" + else -> "${matchedPorts.size}/${configured.size} lanes matched to devices" + }, + ) { + if (configured.isEmpty() && connected.isEmpty()) { + Text( + text = "Nothing to show. Start an emulator or configure an emulator queue lane.", + style = MaterialTheme.typography.bodySmall, + color = TextMuted, + ) + return@PaneSection + } + + val sortedConfigured = configured.sortedWith( + compareByDescending { it.emulatorPort != null && it.emulatorPort !in matchedPorts } + .thenByDescending { it.runningCount } + .thenBy { it.queueName } + ) + sortedConfigured.forEach { lane -> + val port = lane.emulatorPort + val device = port?.let { devicesByPort[it] } + val matched = port != null && port in matchedPorts + EmulatorPairRow(lane = lane, device = device, matched = matched) + } + if (extraDevices.isNotEmpty()) { + Text( + text = "Unbound emulators", + style = MaterialTheme.typography.labelMedium, + color = TextSecondary, + fontWeight = FontWeight.SemiBold, + modifier = Modifier.padding(top = 4.dp), + ) + extraDevices.forEach { device -> + EmulatorPairRow(lane = null, device = device, matched = false) + } + } + } +} + +@Composable +private fun EmulatorPairRow(lane: QueueLane?, device: AdbDevice?, matched: Boolean) { + val accent = when { + matched -> AccentSuccess + lane != null -> AccentDanger + else -> AccentWarning + } + Surface( + shape = RoundedCornerShape(10.dp), + color = SurfaceElevated, + ) { + Row( + modifier = Modifier + .fillMaxWidth() + .border(1.dp, accent.copy(alpha = 0.28f), RoundedCornerShape(10.dp)) + .padding(horizontal = 10.dp, vertical = 8.dp), + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(8.dp), + ) { + Column( + modifier = Modifier.weight(1f), + verticalArrangement = Arrangement.spacedBy(1.dp), + ) { + if (lane != null) { + Text( + text = lane.queueName.substringAfterLast('/'), + style = MaterialTheme.typography.labelLarge, + fontWeight = FontWeight.SemiBold, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + Text( + text = "lane · ${lane.runningCount}/${lane.exactCapacity} run · ${lane.waitingCount} wait", + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + maxLines = 1, + ) + } else { + Text( + text = "no configured lane", + style = MaterialTheme.typography.labelMedium, + color = TextMuted, + fontStyle = FontStyle.Italic, + ) + } + } + Text( + text = when { + matched -> "↔" + lane == null -> "→" + else -> "✕" + }, + color = accent, + style = MaterialTheme.typography.titleMedium, + fontWeight = FontWeight.Bold, + ) + Column( + modifier = Modifier.weight(1f), + horizontalAlignment = Alignment.End, + verticalArrangement = Arrangement.spacedBy(1.dp), + ) { + if (device != null) { + Text( + text = device.serial, + style = MaterialTheme.typography.labelLarge, + fontWeight = FontWeight.SemiBold, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + Text( + text = device.detailLine.ifBlank { "state ${device.state}" }, + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + } else { + Text( + text = "no device on :${lane?.emulatorPort ?: "?"}", + style = MaterialTheme.typography.labelMedium, + color = AccentDanger, + fontWeight = FontWeight.Medium, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + Text( + text = "start emulator or remove lane", + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + } + } + } + } +} + +// ---------- Environment pane: servers ---------- + +@Composable +private fun ServersPanel(snapshot: QueueSnapshot) { + val servers = snapshot.configuration.serverProcesses + PaneSection( + title = "Queue Servers", + subtitle = when { + servers.isEmpty() -> "No live servers for this data dir" + servers.size == 1 -> "1 task-queue server" + else -> "${servers.size} task-queue servers" + }, + ) { + if (servers.isEmpty()) { + Text( + text = snapshot.configuration.statusMessage + ?: "No matching task queue server processes.", + style = MaterialTheme.typography.bodySmall, + color = TextMuted, + ) + return@PaneSection + } + servers.forEach { proc -> + val identity = snapshot.serverIdentityByPid[proc.pid] + val accentSource = identity?.primaryLabel ?: proc.agentLabel + val accent = agentColor(accentSource) + Surface(shape = RoundedCornerShape(10.dp), color = SurfaceElevated) { + Column( + modifier = Modifier + .fillMaxWidth() + .border(1.dp, accent.copy(alpha = 0.2f), RoundedCornerShape(10.dp)) + .padding(horizontal = 10.dp, vertical = 8.dp), + verticalArrangement = Arrangement.spacedBy(4.dp), + ) { + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(6.dp), + ) { + Box( + Modifier + .size(8.dp) + .clip(CircleShape) + .background(accent), + ) + Text( + text = identity?.displayLabel ?: proc.agentLabel, + style = MaterialTheme.typography.labelLarge, + fontWeight = FontWeight.SemiBold, + maxLines = 1, + overflow = TextOverflow.Ellipsis, + modifier = Modifier.weight(1f), + ) + Text( + text = "pid ${proc.pid}", + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + ) + } + identity?.detailLabel?.let { + Text( + text = it, + style = MaterialTheme.typography.labelSmall, + color = TextSecondary, + ) + } + if (proc.queueCapacities.isNotEmpty()) { + Text( + text = proc.queueCapacities.entries.joinToString(" · ") { "${it.key}=${it.value}" }, + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + maxLines = 2, + overflow = TextOverflow.Ellipsis, + ) + } + } + } + } + } +} + +// ---------- Shared pane/utility composables ---------- + +@Composable +private fun PaneSection( title: String, subtitle: String, content: @Composable ColumnScope.() -> Unit, ) { - Card(colors = CardDefaults.cardColors(containerColor = MaterialTheme.colorScheme.surface)) { + Card( + colors = CardDefaults.cardColors(containerColor = SurfaceCard), + elevation = CardDefaults.cardElevation(defaultElevation = 1.dp), + ) { Column( - modifier = Modifier.fillMaxWidth().padding(20.dp), - verticalArrangement = Arrangement.spacedBy(14.dp), + modifier = Modifier.fillMaxWidth().padding(14.dp), + verticalArrangement = Arrangement.spacedBy(10.dp), content = { - Column(verticalArrangement = Arrangement.spacedBy(4.dp)) { - Text(title, style = MaterialTheme.typography.headlineSmall, fontWeight = FontWeight.SemiBold) + Column(verticalArrangement = Arrangement.spacedBy(2.dp)) { + Text( + text = title, + style = MaterialTheme.typography.titleMedium, + fontWeight = FontWeight.SemiBold, + ) Text( - subtitle, - style = MaterialTheme.typography.bodyMedium, - color = MaterialTheme.colorScheme.onSurface.copy(alpha = 0.7f), + text = subtitle, + style = MaterialTheme.typography.bodySmall, + color = TextSecondary, ) } content() @@ -477,26 +1352,113 @@ private fun SectionCard( } @Composable -private fun ErrorBanner(message: String) { - Banner(message = message, background = Color(0xFFFBE4E3), foreground = MaterialTheme.colorScheme.error) +private fun BannerStack(snapshot: QueueSnapshot) { + val errors = listOfNotNull( + snapshot.errorMessage, + snapshot.configuration.errorMessage, + snapshot.adb.errorMessage, + ) + if (errors.isEmpty()) return + Column(verticalArrangement = Arrangement.spacedBy(6.dp)) { + errors.forEach { ErrorBanner(it) } + } } @Composable -private fun InfoBanner(message: String) { - Banner(message = message, background = Color(0xFFEAF1F6), foreground = MaterialTheme.colorScheme.primary) +private fun EmptyState(title: String, message: String) { + Card( + modifier = Modifier.fillMaxWidth(), + colors = CardDefaults.cardColors(containerColor = SurfaceCard), + elevation = CardDefaults.cardElevation(defaultElevation = 1.dp), + ) { + Column( + modifier = Modifier.fillMaxWidth().padding(32.dp), + horizontalAlignment = Alignment.CenterHorizontally, + verticalArrangement = Arrangement.spacedBy(8.dp), + ) { + Text( + text = title, + style = MaterialTheme.typography.titleLarge, + fontWeight = FontWeight.SemiBold, + color = TextSecondary, + ) + Text( + text = message, + style = MaterialTheme.typography.bodyMedium, + color = TextMuted, + textAlign = TextAlign.Center, + ) + } + } } @Composable -private fun Banner(message: String, background: Color, foreground: Color) { - Surface(shape = RoundedCornerShape(16.dp), color = background) { +private fun LegendFooter() { + Row( + modifier = Modifier.fillMaxWidth(), + horizontalArrangement = Arrangement.spacedBy(14.dp), + verticalAlignment = Alignment.CenterVertically, + ) { + LegendDot(color = AccentRunning, label = "running") + LegendDot(color = AccentWarning, label = "waiting / backup") + LegendDot(color = AccentDanger, label = "backup + full") + LegendDot(color = AccentSuccess, label = "emulator matched") + LegendDot(color = AccentIdle, label = "idle") + } +} + +@Composable +private fun LegendDot(color: Color, label: String) { + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(5.dp), + ) { + Box( + Modifier + .size(8.dp) + .clip(CircleShape) + .background(color), + ) + Text( + text = label, + style = MaterialTheme.typography.labelSmall, + color = TextMuted, + ) + } +} + +@Composable +private fun TooltipBubble(text: String) { + Surface( + shape = RoundedCornerShape(10.dp), + color = TooltipColor, + shadowElevation = 8.dp, + ) { + Text( + text = text, + modifier = Modifier + .widthIn(max = 320.dp) + .padding(horizontal = 10.dp, vertical = 8.dp), + color = Color.White, + style = MaterialTheme.typography.bodySmall, + ) + } +} + +@Composable +private fun ErrorBanner(message: String) { + Surface(shape = RoundedCornerShape(14.dp), color = Color(0xFFFBE4E3)) { Text( text = message, modifier = Modifier.fillMaxWidth().padding(14.dp), - color = foreground, + color = MaterialTheme.colorScheme.error, + style = MaterialTheme.typography.bodyMedium, ) } } +// ---------- CLI args ---------- + private fun resolveDataDir(args: Array): Path { if (args.any { it == "-h" || it == "--help" }) { println("Usage: ./gradlew run --args=\"[--data-dir PATH]\"") diff --git a/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/MetricsSnapshot.kt b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/MetricsSnapshot.kt new file mode 100644 index 0000000..8a78ccf --- /dev/null +++ b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/MetricsSnapshot.kt @@ -0,0 +1,96 @@ +package com.block.agenttaskqueue.sidecar + +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.contentOrNull +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import java.nio.file.Path + +data class HistoricalTaskUsage( + val pid: Int, + val timestamp: String, + val workingDirectory: String? = null, + val worktreeRoot: String? = null, + val repoName: String? = null, + val gitBranch: String? = null, + val agentName: String? = null, +) { + val displayAgentLabel: String? + get() = agentName + ?.trim() + ?.takeIf { it.isNotBlank() } + ?.let(::normalizeAgentLabel) + + val displayContextLabel: String? + get() = buildList { + repoName?.trim()?.takeIf { it.isNotBlank() }?.let(::add) + gitBranch?.trim()?.takeIf { it.isNotBlank() }?.let(::add) + + if (gitBranch.isNullOrBlank()) { + worktreeRoot?.trim()?.takeIf { it.isNotBlank() }?.let(::compactPathLabel)?.let(::add) + } + if (isEmpty()) { + workingDirectory?.trim()?.takeIf { it.isNotBlank() }?.let(::compactPathLabel)?.let(::add) + } + } + .distinct() + .joinToString(" · ") + .takeIf { it.isNotBlank() } +} + +data class QueueMetricsSnapshot( + val latestUsageByPid: Map, +) { + companion object { + val EMPTY = QueueMetricsSnapshot(latestUsageByPid = emptyMap()) + } +} + +object TaskQueueMetrics { + fun loadSnapshot(dataDir: Path): QueueMetricsSnapshot { + val metricsPath = dataDir.resolve("agent-task-queue-logs.json") + if (!metricsPath.toFile().exists()) { + return QueueMetricsSnapshot.EMPTY + } + + return runCatching { + parseMetricsSnapshot(metricsPath.toFile().readText()) + }.getOrElse { + QueueMetricsSnapshot.EMPTY + } + } +} + +internal fun parseMetricsSnapshot(output: String): QueueMetricsSnapshot { + val latestUsageByPid = linkedMapOf() + + output.lineSequence() + .map(String::trim) + .filter(String::isNotBlank) + .forEach { line -> + val entry = runCatching { Json.parseToJsonElement(line).jsonObject }.getOrNull() ?: return@forEach + val pid = entry["pid"]?.jsonPrimitive?.content?.toIntOrNull() ?: return@forEach + val timestamp = entry["timestamp"]?.jsonPrimitive?.contentOrNull ?: return@forEach + + val usage = HistoricalTaskUsage( + pid = pid, + timestamp = timestamp, + workingDirectory = entry["working_directory"]?.jsonPrimitive?.contentOrNull, + worktreeRoot = entry["worktree_root"]?.jsonPrimitive?.contentOrNull, + repoName = entry["repo_name"]?.jsonPrimitive?.contentOrNull, + gitBranch = entry["git_branch"]?.jsonPrimitive?.contentOrNull, + agentName = entry["agent_name"]?.jsonPrimitive?.contentOrNull, + ) + + if (usage.displayAgentLabel == null && usage.displayContextLabel == null) { + return@forEach + } + + val existing = latestUsageByPid[pid] + if (existing == null || usage.timestamp > existing.timestamp) { + latestUsageByPid[pid] = usage + } + } + + return QueueMetricsSnapshot(latestUsageByPid = latestUsageByPid) +} diff --git a/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/QueueSnapshot.kt b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/QueueSnapshot.kt index 75c9faf..5c62ab6 100644 --- a/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/QueueSnapshot.kt +++ b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/QueueSnapshot.kt @@ -1,6 +1,7 @@ package com.block.agenttaskqueue.sidecar import java.nio.file.Path +import java.nio.file.Paths import java.time.Duration import java.time.Instant import java.time.LocalDateTime @@ -16,10 +17,42 @@ data class QueueTask( val childPid: Int?, val createdAt: String?, val updatedAt: String?, + val workingDirectory: String? = null, + val worktreeRoot: String? = null, + val repoName: String? = null, + val gitBranch: String? = null, + val agentName: String? = null, ) { val displayCommand: String get() = (command ?: "unknown").replace(Regex("^(\\w+=\\S+\\s+)+"), "") + val displayAgentLabel: String? + get() = agentName + ?.trim() + ?.takeIf { it.isNotBlank() } + ?.let(::normalizeAgentLabel) + + val displayContextLabel: String? + get() = buildList { + repoName?.trim()?.takeIf { it.isNotBlank() }?.let(::add) + gitBranch?.trim()?.takeIf { it.isNotBlank() }?.let(::add) + + if (gitBranch.isNullOrBlank()) { + worktreeRoot?.trim()?.takeIf { it.isNotBlank() }?.let(::compactPathLabel)?.let(::add) + } + if (isEmpty()) { + workingDirectory?.trim()?.takeIf { it.isNotBlank() }?.let(::compactPathLabel)?.let(::add) + } + } + .distinct() + .joinToString(" · ") + .takeIf { it.isNotBlank() } + + val displayIdentityLabel: String? + get() = listOfNotNull(displayAgentLabel, displayContextLabel) + .joinToString(" · ") + .takeIf { it.isNotBlank() } + fun statusAge(now: Instant = Instant.now()): String { val reference = when (status.lowercase()) { "running" -> parseQueueInstant(updatedAt, ZoneId.systemDefault()) ?: parseQueueInstant(createdAt) @@ -50,9 +83,15 @@ data class QueueSummary( data class QueueLane( val queueName: String, val tasks: List, + val configuredScope: ConfiguredQueueScope? = null, ) { val runningCount: Int = tasks.count { it.status.equals("running", ignoreCase = true) } val waitingCount: Int = tasks.count { it.status.equals("waiting", ignoreCase = true) } + val configuredCapacity: Int? = configuredScope?.capacity + val hasCapacityConflict: Boolean = configuredScope?.hasConflict == true + val exactCapacity: Int = configuredCapacity ?: 1 + val emulatorPort: String? = extractEmulatorPort(queueName.substringAfterLast('/')) + val isEmulatorLike: Boolean = emulatorPort != null } data class ScopeGroup( @@ -64,28 +103,97 @@ data class ScopeGroup( val waitingCount: Int = lanes.sumOf { it.waitingCount } } +data class ConfiguredScopeUsage( + val configuredScope: ConfiguredQueueScope, + val runningCount: Int, + val waitingCount: Int, + val descendantQueueCount: Int, + val sourceServerLabels: List = emptyList(), +) { + val scopeName: String = configuredScope.scopeName + val capacity: Int? = configuredScope.capacity + val displayCapacityLabel: String = configuredScope.displayCapacityLabel + val usedSlots: Int = capacity?.let { runningCount.coerceAtMost(it) } ?: runningCount +} + +data class ServerIdentity( + val primaryLabel: String, + val contextLabel: String? = null, + val launchContextLabel: String? = null, + val detailLabel: String? = null, +) { + val displayLabel: String = listOfNotNull(primaryLabel, contextLabel) + .joinToString(" · ") + .takeIf { it.isNotBlank() } + ?: primaryLabel +} + +data class EmulatorAlignment( + val configuredQueues: List, + val connectedDevices: List, + val matchedPorts: Set, +) { + val unmatchedConfiguredQueues: List = configuredQueues.filter { lane -> + lane.emulatorPort == null || lane.emulatorPort !in matchedPorts + } + val unmatchedDevices: List = connectedDevices.filter { device -> + device.emulatorPort == null || device.emulatorPort !in matchedPorts + } +} + data class QueueSnapshot( val dataDir: Path, val tasks: List, val refreshedAt: Instant, + val configuration: QueueConfigurationSnapshot = QueueConfigurationSnapshot.EMPTY, + val adb: AdbSnapshot = AdbSnapshot.EMPTY, + val metrics: QueueMetricsSnapshot = QueueMetricsSnapshot.EMPTY, val statusMessage: String? = null, val errorMessage: String? = null, ) { val summary: QueueSummary = QueueSummary.fromTasks(tasks) val runningTasks: List = tasks.filter { it.status.equals("running", ignoreCase = true) } val waitingTasks: List = tasks.filter { it.status.equals("waiting", ignoreCase = true) } - val queueLanes: List = tasks - .groupBy { it.queueName } - .toSortedMap() - .map { (queueName, queuedTasks) -> QueueLane(queueName, queuedTasks.sortedBy { it.id }) } + val queueLanes: List = buildQueueLanes(tasks, configuration) + private val tasksByServerPid: Map> = tasks + .mapNotNull { task -> task.pid?.let { pid -> pid to task } } + .groupBy(keySelector = { it.first }, valueTransform = { it.second }) + val serverIdentityByPid: Map = configuration.serverProcesses.associate { process -> + process.pid to buildServerIdentity( + process = process, + tasks = tasksByServerPid[process.pid].orEmpty(), + historicalUsage = metrics.latestUsageByPid[process.pid], + ) + } val scopeGroups: List = queueLanes .groupBy { rootScope(it.queueName) } .toSortedMap() .map { (scopeName, lanes) -> ScopeGroup(scopeName, lanes) } + val configuredScopeUsage: List = configuration.configuredScopes + .map { configuredScope -> + ConfiguredScopeUsage( + configuredScope = configuredScope, + runningCount = tasks.count { task -> + task.status.equals("running", ignoreCase = true) && inScope(task.queueName, configuredScope.scopeName) + }, + waitingCount = tasks.count { task -> + task.status.equals("waiting", ignoreCase = true) && inScope(task.queueName, configuredScope.scopeName) + }, + descendantQueueCount = queueLanes.count { lane -> inScope(lane.queueName, configuredScope.scopeName) }, + sourceServerLabels = configuredScope.sourcePids.map { sourcePid -> + serverIdentityByPid[sourcePid]?.displayLabel ?: "pid $sourcePid" + }, + ) + } + .sortedBy { it.scopeName } + val emulatorAlignment: EmulatorAlignment = buildEmulatorAlignment(queueLanes, adb) companion object { fun empty( dataDir: Path, + configuration: QueueConfigurationSnapshot = QueueConfigurationSnapshot.EMPTY, + adb: AdbSnapshot = AdbSnapshot.EMPTY, + metrics: QueueMetricsSnapshot = QueueMetricsSnapshot.EMPTY, statusMessage: String? = null, errorMessage: String? = null, ): QueueSnapshot { @@ -93,6 +201,9 @@ data class QueueSnapshot( dataDir = dataDir, tasks = emptyList(), refreshedAt = Instant.now(), + configuration = configuration, + adb = adb, + metrics = metrics, statusMessage = statusMessage, errorMessage = errorMessage, ) @@ -101,12 +212,18 @@ data class QueueSnapshot( fun fromTasks( dataDir: Path, tasks: List, + configuration: QueueConfigurationSnapshot = QueueConfigurationSnapshot.EMPTY, + adb: AdbSnapshot = AdbSnapshot.EMPTY, + metrics: QueueMetricsSnapshot = QueueMetricsSnapshot.EMPTY, statusMessage: String? = null, ): QueueSnapshot { return QueueSnapshot( dataDir = dataDir, tasks = tasks.sortedWith(compareBy({ it.queueName }, { it.id })), refreshedAt = Instant.now(), + configuration = configuration, + adb = adb, + metrics = metrics, statusMessage = statusMessage, ) } @@ -129,8 +246,91 @@ fun formatRefreshTime(instant: Instant): String { return localTime.truncatedTo(java.time.temporal.ChronoUnit.SECONDS).toString() } +internal fun normalizeAgentLabel(raw: String): String { + return when (raw.trim().lowercase()) { + "amp" -> "Amp" + "claude" -> "Claude" + "codex" -> "Codex" + "cursor" -> "Cursor" + else -> raw.trim() + } +} + +internal fun compactPathLabel(raw: String): String { + return runCatching { + Paths.get(raw).normalize().fileName?.toString() ?: raw + }.getOrElse { raw } +} + private fun rootScope(queueName: String): String = queueName.substringBefore('/') +private fun buildServerIdentity( + process: QueueServerProcess, + tasks: List, + historicalUsage: HistoricalTaskUsage?, +): ServerIdentity { + val agentLabels = tasks.mapNotNull { it.displayAgentLabel }.distinct() + val contextLabels = tasks.mapNotNull { it.displayContextLabel }.distinct() + val hasVisibleUsage = tasks.isNotEmpty() + return ServerIdentity( + primaryLabel = summarizeIdentityLabels(agentLabels) + ?: historicalUsage?.displayAgentLabel + ?: process.agentLabel, + contextLabel = summarizeIdentityLabels(contextLabels) + ?: historicalUsage?.displayContextLabel, + launchContextLabel = process.contextLabel, + detailLabel = if (hasVisibleUsage) { + "${tasks.size} visible task${if (tasks.size == 1) "" else "s"}" + } else { + "idle server" + }, + ) +} + +private fun summarizeIdentityLabels(labels: List): String? { + return when { + labels.isEmpty() -> null + labels.size <= 2 -> labels.joinToString(" + ") + else -> "${labels.first()} +${labels.size - 1} more" + } +} + +private fun buildQueueLanes( + tasks: List, + configuration: QueueConfigurationSnapshot, +): List { + val tasksByQueue = tasks.groupBy { it.queueName } + val configuredScopes = configuration.configuredScopes.associateBy { it.scopeName } + return (tasksByQueue.keys + configuredScopes.keys) + .toSortedSet() + .map { queueName -> + QueueLane( + queueName = queueName, + tasks = tasksByQueue[queueName].orEmpty().sortedBy { it.id }, + configuredScope = configuredScopes[queueName], + ) + } +} + +private fun buildEmulatorAlignment( + queueLanes: List, + adb: AdbSnapshot, +): EmulatorAlignment { + val configuredQueues = queueLanes.filter { it.configuredScope?.isEmulatorLike == true } + val connectedDevices = adb.devices.filter { it.isConnected && it.isEmulator } + val matchedPorts = configuredQueues.mapNotNull { it.emulatorPort }.toSet() + .intersect(connectedDevices.mapNotNull { it.emulatorPort }.toSet()) + return EmulatorAlignment( + configuredQueues = configuredQueues, + connectedDevices = connectedDevices, + matchedPorts = matchedPorts, + ) +} + +private fun inScope(queueName: String, scopeName: String): Boolean { + return queueName == scopeName || queueName.startsWith("$scopeName/") +} + private fun relativeDuration(now: Instant, then: Instant): String { val elapsed = Duration.between(then, now).seconds.coerceAtLeast(0) return when { diff --git a/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/TaskQueueDatabase.kt b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/TaskQueueDatabase.kt index b5e475c..6487c40 100644 --- a/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/TaskQueueDatabase.kt +++ b/desktop-sidecar/src/desktopMain/kotlin/com/block/agenttaskqueue/sidecar/TaskQueueDatabase.kt @@ -1,5 +1,6 @@ package com.block.agenttaskqueue.sidecar +import java.sql.Connection import java.nio.file.Path import java.sql.DriverManager import java.sql.ResultSet @@ -10,10 +11,16 @@ object TaskQueueDatabase { } fun loadSnapshot(dataDir: Path): QueueSnapshot { + val configuration = TaskQueueProcessInspector.loadConfiguration(dataDir) + val adb = AdbInspector.loadSnapshot() + val metrics = TaskQueueMetrics.loadSnapshot(dataDir) val dbPath = dataDir.resolve("queue.db") if (!dbPath.toFile().exists()) { return QueueSnapshot.empty( dataDir = dataDir, + configuration = configuration, + adb = adb, + metrics = metrics, statusMessage = "Waiting for queue database at $dbPath", ) } @@ -25,8 +32,19 @@ object TaskQueueDatabase { statement.execute("PRAGMA busy_timeout=5000") } + if (!connection.hasTable("queue")) { + return QueueSnapshot.empty( + dataDir = dataDir, + configuration = configuration, + adb = adb, + metrics = metrics, + statusMessage = "Waiting for queue schema at $dbPath", + ) + } + connection.createStatement().use { statement -> statement.executeQuery("SELECT * FROM queue ORDER BY queue_name, id").use { rs -> + val availableColumns = rs.columnNames() val tasks = mutableListOf() while (rs.next()) { tasks += QueueTask( @@ -38,12 +56,20 @@ object TaskQueueDatabase { childPid = rs.getNullableInt("child_pid"), createdAt = rs.getString("created_at"), updatedAt = rs.getString("updated_at"), + workingDirectory = rs.getOptionalString(availableColumns, "working_directory"), + worktreeRoot = rs.getOptionalString(availableColumns, "worktree_root"), + repoName = rs.getOptionalString(availableColumns, "repo_name"), + gitBranch = rs.getOptionalString(availableColumns, "git_branch"), + agentName = rs.getOptionalString(availableColumns, "agent_name"), ) } QueueSnapshot.fromTasks( dataDir = dataDir, tasks = tasks, + configuration = configuration, + adb = adb, + metrics = metrics, statusMessage = if (tasks.isEmpty()) "Queue is empty" else null, ) } @@ -52,18 +78,60 @@ object TaskQueueDatabase { }.getOrElse { error -> QueueSnapshot.empty( dataDir = dataDir, + configuration = configuration, + adb = adb, + metrics = metrics, errorMessage = error.message ?: "Failed to read $dbPath", ) } } } +private fun Connection.hasTable(tableName: String): Boolean { + prepareStatement( + "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ? LIMIT 1" + ).use { statement -> + statement.setString(1, tableName) + statement.executeQuery().use { resultSet -> + return resultSet.next() + } + } +} + private fun ResultSet.getNullableInt(columnName: String): Int? { - val value = getObject(columnName) ?: return null - return when (value) { - is Int -> value - is Long -> value.toInt() - is Number -> value.toInt() - else -> value.toString().toIntOrNull() + return coerceNullableInt(getObject(columnName)) +} + +internal fun coerceNullableInt(value: Any?): Int? { + val longValue = when (value) { + null -> return null + is Int -> return value + is Long -> value + is Short -> value.toLong() + is Byte -> value.toLong() + is Number -> value.toLong() + else -> value.toString().toLongOrNull() ?: return null } + + return longValue + .takeIf { it in Int.MIN_VALUE.toLong()..Int.MAX_VALUE.toLong() } + ?.toInt() +} + +private fun ResultSet.columnNames(): Set { + val metadata = metaData + return (1..metadata.columnCount) + .map { index -> metadata.getColumnLabel(index).lowercase() } + .toSet() +} + +private fun ResultSet.getOptionalString( + availableColumns: Set, + columnName: String, +): String? { + if (columnName.lowercase() !in availableColumns) { + return null + } + + return getString(columnName)?.takeIf { it.isNotBlank() } } diff --git a/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/EnvironmentSnapshotTest.kt b/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/EnvironmentSnapshotTest.kt new file mode 100644 index 0000000..4b4a9a8 --- /dev/null +++ b/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/EnvironmentSnapshotTest.kt @@ -0,0 +1,154 @@ +package com.block.agenttaskqueue.sidecar + +import java.nio.file.Paths +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class EnvironmentSnapshotTest { + @Test + fun commandRunnerFallsBackToLinuxPsFlags() { + val attemptedCommands = mutableListOf>() + val result = runCommandCandidates( + listOf( + listOf("ps", "eww", "-axo", "pid=,ppid=,command="), + listOf("ps", "eww", "axo", "pid=,ppid=,command="), + ) + ) { command -> + attemptedCommands += command + when (command) { + listOf("ps", "eww", "-axo", "pid=,ppid=,command=") -> CommandResult( + output = "must set personality to get -x option", + exitCode = 1, + ) + listOf("ps", "eww", "axo", "pid=,ppid=,command=") -> CommandResult( + output = "902 1 python task_queue.py", + exitCode = 0, + ) + else -> error("Unexpected command: $command") + } + } + + assertEquals( + listOf( + listOf("ps", "eww", "-axo", "pid=,ppid=,command="), + listOf("ps", "eww", "axo", "pid=,ppid=,command="), + ), + attemptedCommands, + ) + assertEquals(0, result.exitCode) + assertEquals("902 1 python task_queue.py", result.output) + } + + @Test + fun commandRunnerDrainsLargeStdoutWithoutTimingOut() { + val result = runCommand( + "python3", + "-c", + "import sys; sys.stdout.write('x' * 200000)", + ) + + assertEquals(null, result.errorMessage) + assertEquals(0, result.exitCode) + assertEquals(200000, result.output.length) + } + + @Test + fun parsesLiveTaskQueueServerCapacitiesFromPsOutput() { + val processes = parseTaskQueueProcesses( + """ + 73781 /Users/me/.venv/bin/python3 task_queue.py --data-dir /tmp/agent-task-queue --queue-capacity=gradle=2 --queue-capacity=gradle/emulator-5554=1 + 73782 uv run --directory /repo python task_queue.py --queue-capacity=gradle=2 + 73783 /Users/me/.venv/bin/python3 task_queue.py --data-dir=/tmp/other-queue --queue-capacity=web=3 + """.trimIndent() + ) + + assertEquals(2, processes.size) + assertEquals(Paths.get("/tmp/agent-task-queue"), processes.first().dataDir) + assertEquals(2, processes.first().queueCapacities["gradle"]) + assertEquals(1, processes.first().queueCapacities["gradle/emulator-5554"]) + assertEquals(Paths.get("/tmp/other-queue"), processes.last().dataDir) + } + + @Test + fun fallsBackToTaskQueueDataDirEnvironmentWhenFlagIsOmitted() { + val process = parseTaskQueueProcesses( + """ + 73781 1 /Users/me/.venv/bin/python3 task_queue.py --queue-capacity=gradle=2 TASK_QUEUE_DATA_DIR=/tmp/custom-queue + """.trimIndent() + ).single() + + assertEquals(Paths.get("/tmp/custom-queue"), process.dataDir) + assertEquals(2, process.queueCapacities["gradle"]) + } + + @Test + fun parsesInstalledAgentTaskQueueEntrypointLaunchedUnderPython() { + val process = parseTaskQueueProcesses( + """ + 73781 1 /Users/me/.venv/bin/python3 /Users/me/.venv/bin/agent-task-queue --queue-capacity=gradle=2 --queue-capacity=gradle/emulator-5554=1 TASK_QUEUE_DATA_DIR=/tmp/custom-queue + """.trimIndent() + ).single() + + assertEquals(Paths.get("/tmp/custom-queue"), process.dataDir) + assertEquals(2, process.queueCapacities["gradle"]) + assertEquals(1, process.queueCapacities["gradle/emulator-5554"]) + } + + @Test + fun parsesInstalledEntrypointWhenPythonUsesFlagValuesWithEquals() { + val process = parseTaskQueueProcesses( + """ + 73781 1 /Users/me/.venv/bin/python3 -X faulthandler=1 /Users/me/.venv/bin/agent-task-queue --queue-capacity=gradle=2 TASK_QUEUE_DATA_DIR=/tmp/custom-queue + """.trimIndent() + ).single() + + assertEquals(Paths.get("/tmp/custom-queue"), process.dataDir) + assertEquals(2, process.queueCapacities["gradle"]) + } + + @Test + fun parsesAdbDevicesAndMatchesEmulatorPorts() { + val adb = parseAdbSnapshot( + """ + List of devices attached + emulator-5554 device product:sdk_gphone64_arm64 model:sdk_gphone64_arm64 device:emu64a transport_id:1 + 127.0.0.1:5557 device transport_id:2 + R58N12345AB unauthorized transport_id:3 + """.trimIndent() + ) + + assertEquals(3, adb.devices.size) + assertEquals(2, adb.connectedDevices) + assertEquals(2, adb.connectedEmulators) + val devicesBySerial = adb.devices.associateBy { it.serial } + assertEquals("5554", devicesBySerial.getValue("emulator-5554").emulatorPort) + assertTrue(devicesBySerial.getValue("127.0.0.1:5557").isEmulator) + assertNull(devicesBySerial.getValue("R58N12345AB").emulatorPort) + } + + @Test + fun extractsEmulatorPortFromQueueAndAdbLabels() { + assertEquals("5554", extractEmulatorPort("emulator-5554")) + assertEquals("5557", extractEmulatorPort("emu-5557")) + assertEquals("5559", extractEmulatorPort("127.0.0.1:5559")) + assertNull(extractEmulatorPort("pixel-9-pro")) + } + + @Test + fun infersAgentAndContextFromServerParentProcesses() { + val process = parseTaskQueueProcesses( + """ + 900 1 amp -m deep + 901 900 uv run --directory /Users/example/Development/sample-repo-worktrees/feature-queue-view python task_queue.py --data-dir /tmp/agent-task-queue --queue-capacity=gradle=2 + 902 901 /Users/me/.venv/bin/python3 task_queue.py --data-dir /tmp/agent-task-queue --queue-capacity=gradle=2 + """.trimIndent() + ).single() + + assertEquals(902, process.pid) + assertEquals(901, process.parentPid) + assertEquals("Amp deep", process.agentLabel) + assertEquals("feature-queue-view", process.contextLabel) + } +} diff --git a/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/MetricsSnapshotTest.kt b/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/MetricsSnapshotTest.kt new file mode 100644 index 0000000..425b1f4 --- /dev/null +++ b/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/MetricsSnapshotTest.kt @@ -0,0 +1,25 @@ +package com.block.agenttaskqueue.sidecar + +import kotlin.test.Test +import kotlin.test.assertEquals + +class MetricsSnapshotTest { + @Test + fun parseMetricsSnapshotKeepsLatestUsagePerPid() { + val snapshot = parseMetricsSnapshot( + """ + {"event":"task_queued","timestamp":"2026-04-24T11:00:01.389560","task_id":74,"queue_name":"global","pid":88226,"agent_name":"amp","repo_name":"sample-repo","git_branch":"feature/queue-visibility"} + {"event":"task_completed","timestamp":"2026-04-24T11:10:13.561185","task_id":77,"queue_name":"global","pid":88226,"agent_name":"amp","repo_name":"sample-mobile-app","git_branch":"feature/real-work"} + {"event":"task_completed","timestamp":"2026-04-24T11:10:33.478135","task_id":78,"queue_name":"gradle","pid":74067,"agent_name":"claude","repo_name":"payments-app","git_branch":"feature/payments"} + """.trimIndent() + ) + + assertEquals(2, snapshot.latestUsageByPid.size) + assertEquals( + "sample-mobile-app · feature/real-work", + snapshot.latestUsageByPid.getValue(88226).displayContextLabel, + ) + assertEquals("Amp", snapshot.latestUsageByPid.getValue(88226).displayAgentLabel) + assertEquals("Claude", snapshot.latestUsageByPid.getValue(74067).displayAgentLabel) + } +} diff --git a/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/QueueSnapshotTest.kt b/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/QueueSnapshotTest.kt index 66a62fa..bf9c9b5 100644 --- a/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/QueueSnapshotTest.kt +++ b/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/QueueSnapshotTest.kt @@ -1,9 +1,11 @@ package com.block.agenttaskqueue.sidecar import java.time.Instant +import java.nio.file.Paths import java.util.TimeZone import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertTrue class QueueSnapshotTest { @Test @@ -42,6 +44,161 @@ class QueueSnapshotTest { } } + @Test + fun configuredQueuesStayVisibleWhenIdle() { + val snapshot = QueueSnapshot.fromTasks( + dataDir = Paths.get("/tmp/agent-task-queue"), + tasks = emptyList(), + configuration = QueueConfigurationSnapshot( + serverProcesses = emptyList(), + configuredScopes = listOf( + ConfiguredQueueScope( + scopeName = "gradle", + capacities = setOf(2), + sourcePids = listOf(1234), + ), + ConfiguredQueueScope( + scopeName = "gradle/emulator-5554", + capacities = setOf(1), + sourcePids = listOf(1234), + ), + ), + ), + ) + + assertEquals(listOf("gradle", "gradle/emulator-5554"), snapshot.queueLanes.map { it.queueName }) + assertEquals(1, snapshot.scopeGroups.size) + assertEquals(listOf(2, 1), snapshot.configuredScopeUsage.mapNotNull { it.capacity }) + assertTrue(snapshot.queueLanes.all { it.tasks.isEmpty() }) + } + + @Test + fun taskIdentityPrefersAgentRepoAndBranchMetadata() { + val task = QueueTask( + id = 7, + queueName = "gradle/emulator-5554", + status = "running", + command = "./gradlew connectedDebugAndroidTest", + pid = 902, + childPid = 8112, + createdAt = null, + updatedAt = null, + workingDirectory = "/Users/example/Development/sample-repo", + worktreeRoot = "/Users/example/Development/sample-repo-worktrees/feature-queue-view", + repoName = "sample-repo", + gitBranch = "feature/queue-view", + agentName = "amp", + ) + + assertEquals("Amp", task.displayAgentLabel) + assertEquals("sample-repo · feature/queue-view", task.displayContextLabel) + assertEquals("Amp · sample-repo · feature/queue-view", task.displayIdentityLabel) + } + + @Test + fun serverIdentityUsesVisibleTaskContextInsteadOfLaunchBranch() { + val snapshot = QueueSnapshot.fromTasks( + dataDir = Paths.get("/tmp/agent-task-queue"), + tasks = listOf( + QueueTask( + id = 7, + queueName = "gradle", + status = "running", + command = "./gradlew test", + pid = 902, + childPid = null, + createdAt = null, + updatedAt = null, + repoName = "sample-mobile-app", + gitBranch = "feature/real-work", + agentName = "amp", + ) + ), + configuration = QueueConfigurationSnapshot( + serverProcesses = listOf( + QueueServerProcess( + pid = 902, + commandLine = "python task_queue.py", + dataDir = Paths.get("/tmp/agent-task-queue"), + queueCapacities = emptyMap(), + agentLabel = "Amp deep", + contextLabel = "desktop-sidecar", + ) + ), + configuredScopes = emptyList(), + ), + ) + + val identity = snapshot.serverIdentityByPid.getValue(902) + assertEquals("Amp", identity.primaryLabel) + assertEquals("sample-mobile-app · feature/real-work", identity.contextLabel) + assertEquals("desktop-sidecar", identity.launchContextLabel) + } + + @Test + fun idleServerDoesNotPretendLaunchBranchIsQueueUsage() { + val snapshot = QueueSnapshot.fromTasks( + dataDir = Paths.get("/tmp/agent-task-queue"), + tasks = emptyList(), + configuration = QueueConfigurationSnapshot( + serverProcesses = listOf( + QueueServerProcess( + pid = 902, + commandLine = "python task_queue.py", + dataDir = Paths.get("/tmp/agent-task-queue"), + queueCapacities = emptyMap(), + agentLabel = "Amp deep", + contextLabel = "desktop-sidecar", + ) + ), + configuredScopes = emptyList(), + ), + ) + + val identity = snapshot.serverIdentityByPid.getValue(902) + assertEquals("Amp deep", identity.displayLabel) + assertEquals(null, identity.contextLabel) + assertEquals("desktop-sidecar", identity.launchContextLabel) + assertEquals("idle server", identity.detailLabel) + } + + @Test + fun idleServerFallsBackToHistoricalMetricsUsage() { + val snapshot = QueueSnapshot.fromTasks( + dataDir = Paths.get("/tmp/agent-task-queue"), + tasks = emptyList(), + configuration = QueueConfigurationSnapshot( + serverProcesses = listOf( + QueueServerProcess( + pid = 902, + commandLine = "python task_queue.py", + dataDir = Paths.get("/tmp/agent-task-queue"), + queueCapacities = emptyMap(), + agentLabel = "Amp deep", + contextLabel = "desktop-sidecar", + ) + ), + configuredScopes = emptyList(), + ), + metrics = QueueMetricsSnapshot( + latestUsageByPid = mapOf( + 902 to HistoricalTaskUsage( + pid = 902, + timestamp = "2026-04-24T11:10:13.561185", + repoName = "sample-mobile-app", + gitBranch = "feature/real-work", + agentName = "amp", + ) + ) + ), + ) + + val identity = snapshot.serverIdentityByPid.getValue(902) + assertEquals("Amp · sample-mobile-app · feature/real-work", identity.displayLabel) + assertEquals("desktop-sidecar", identity.launchContextLabel) + assertEquals("idle server", identity.detailLabel) + } + private fun withDefaultTimeZone(timeZoneId: String, block: () -> Unit) { val original = TimeZone.getDefault() try { diff --git a/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/TaskQueueDatabaseTest.kt b/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/TaskQueueDatabaseTest.kt new file mode 100644 index 0000000..f89db6e --- /dev/null +++ b/desktop-sidecar/src/desktopTest/kotlin/com/block/agenttaskqueue/sidecar/TaskQueueDatabaseTest.kt @@ -0,0 +1,42 @@ +package com.block.agenttaskqueue.sidecar + +import java.nio.file.Files +import java.sql.DriverManager +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNull + +class TaskQueueDatabaseTest { + @Test + fun missingQueueTableIsTreatedAsWaitingForSchema() { + val tempDir = Files.createTempDirectory("task-queue-db-test") + try { + val dbPath = tempDir.resolve("queue.db") + DriverManager.getConnection("jdbc:sqlite:$dbPath").use { connection -> + connection.createStatement().use { statement -> + statement.execute("CREATE TABLE metadata (id INTEGER PRIMARY KEY)") + } + } + + val snapshot = TaskQueueDatabase.loadSnapshot(tempDir) + + assertEquals(emptyList(), snapshot.tasks) + assertEquals("Waiting for queue schema at $dbPath", snapshot.statusMessage) + assertNull(snapshot.errorMessage) + } finally { + tempDir.toFile().deleteRecursively() + } + } + + @Test + fun coerceNullableIntKeepsInRangeValues() { + assertEquals(42, coerceNullableInt(42L)) + assertEquals(7, coerceNullableInt("7")) + } + + @Test + fun coerceNullableIntRejectsOutOfRangeValues() { + assertNull(coerceNullableInt(Long.MAX_VALUE)) + assertNull(coerceNullableInt("2147483648")) + } +} diff --git a/queue_core.py b/queue_core.py index f671903..821b354 100644 --- a/queue_core.py +++ b/queue_core.py @@ -11,6 +11,7 @@ import signal import shlex import sqlite3 +import subprocess from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timedelta @@ -23,6 +24,7 @@ DEFAULT_MAX_LOCK_AGE_MINUTES = 120 DEFAULT_MAX_METRICS_SIZE_MB = 5 QUEUE_SCOPE_SEPARATOR = "/" +GIT_METADATA_TIMEOUT_SECONDS = 2 @dataclass @@ -44,6 +46,65 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths": ) +@dataclass(frozen=True) +class TaskOrigin: + """Optional metadata about where a queued task came from.""" + + working_directory: str + worktree_root: str | None = None + repo_name: str | None = None + git_branch: str | None = None + agent_name: str | None = None + + +def task_origin_kwargs(task_origin: "TaskOrigin | None") -> dict[str, str]: + """Return non-empty task origin fields as kwargs suitable for DB/metrics inserts.""" + if task_origin is None: + return {} + + return { + key: value + for key, value in { + "working_directory": task_origin.working_directory, + "worktree_root": task_origin.worktree_root, + "repo_name": task_origin.repo_name, + "git_branch": task_origin.git_branch, + "agent_name": task_origin.agent_name, + }.items() + if value + } + + +def insert_waiting_task( + conn, + queue_name: str, + pid: int, + server_id: str, + command: str | None = None, + task_origin: "TaskOrigin | None" = None, +) -> int: + """Insert a waiting task row and return its task ID.""" + cursor = conn.execute( + """INSERT INTO queue ( + queue_name, status, pid, server_id, command, + working_directory, worktree_root, repo_name, git_branch, agent_name + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + queue_name, + "waiting", + pid, + server_id, + command, + task_origin.working_directory if task_origin else None, + task_origin.worktree_root if task_origin else None, + task_origin.repo_name if task_origin else None, + task_origin.git_branch if task_origin else None, + task_origin.agent_name if task_origin else None, + ), + ) + return cursor.lastrowid + + # --- Database Schema --- QUEUE_SCHEMA = """ CREATE TABLE IF NOT EXISTS queue ( @@ -54,6 +115,11 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths": server_id TEXT, child_pid INTEGER, command TEXT, + working_directory TEXT, + worktree_root TEXT, + repo_name TEXT, + git_branch TEXT, + agent_name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) @@ -69,6 +135,26 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths": ALTER TABLE queue ADD COLUMN command TEXT """ +QUEUE_MIGRATION_WORKING_DIRECTORY = """ +ALTER TABLE queue ADD COLUMN working_directory TEXT +""" + +QUEUE_MIGRATION_WORKTREE_ROOT = """ +ALTER TABLE queue ADD COLUMN worktree_root TEXT +""" + +QUEUE_MIGRATION_REPO_NAME = """ +ALTER TABLE queue ADD COLUMN repo_name TEXT +""" + +QUEUE_MIGRATION_GIT_BRANCH = """ +ALTER TABLE queue ADD COLUMN git_branch TEXT +""" + +QUEUE_MIGRATION_AGENT_NAME = """ +ALTER TABLE queue ADD COLUMN agent_name TEXT +""" + QUEUE_INDEX = """ CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(queue_name, status) """ @@ -99,11 +185,99 @@ def init_db(paths: QueuePaths): conn.execute(QUEUE_SCHEMA) conn.execute(QUEUE_INDEX) # Run migrations for existing databases - for migration in [QUEUE_MIGRATION_SERVER_ID, QUEUE_MIGRATION_COMMAND]: + for migration in [ + QUEUE_MIGRATION_SERVER_ID, + QUEUE_MIGRATION_COMMAND, + QUEUE_MIGRATION_WORKING_DIRECTORY, + QUEUE_MIGRATION_WORKTREE_ROOT, + QUEUE_MIGRATION_REPO_NAME, + QUEUE_MIGRATION_GIT_BRANCH, + QUEUE_MIGRATION_AGENT_NAME, + ]: try: conn.execute(migration) - except sqlite3.OperationalError: - pass # Column already exists + except sqlite3.OperationalError as exc: + if "duplicate column name" not in str(exc).lower(): + raise + + +def collect_task_origin(working_directory: str, agent_name: str | None = None) -> TaskOrigin: + """Capture stable repo/worktree metadata for a queued task.""" + resolved_working_directory = str(Path(working_directory).resolve()) + worktree_root, repo_name, git_branch = _git_context(resolved_working_directory) + + return TaskOrigin( + working_directory=resolved_working_directory, + worktree_root=worktree_root, + repo_name=repo_name, + git_branch=git_branch, + agent_name=agent_name, + ) + + +def _git_context(working_directory: str) -> tuple[str | None, str | None, str | None]: + try: + result = subprocess.run( + [ + "git", + "-C", + working_directory, + "rev-parse", + "--show-toplevel", + "--git-common-dir", + "HEAD", + "--symbolic-full-name", + "HEAD", + ], + capture_output=True, + text=True, + timeout=GIT_METADATA_TIMEOUT_SECONDS, + ) + except ( + FileNotFoundError, + NotADirectoryError, + PermissionError, + subprocess.SubprocessError, + ): + return None, None, None + + if result.returncode != 0: + return None, None, None + + lines = [line.strip() for line in result.stdout.splitlines() if line.strip()] + if len(lines) < 4: + return None, None, None + + worktree_root, git_common_dir, head_oid, head_ref = lines[:4] + repo_name = _git_repo_name_from_common_dir( + working_directory, + worktree_root, + git_common_dir, + ) + git_branch = ( + head_ref.removeprefix("refs/heads/") + if head_ref.startswith("refs/heads/") + else head_oid[:7] or None + ) + return worktree_root or None, repo_name, git_branch + + +def _git_repo_name_from_common_dir( + working_directory: str, + worktree_root: str | None, + git_common_dir: str, +) -> str | None: + common_dir_path = Path(git_common_dir) + if not common_dir_path.is_absolute(): + common_dir_path = (Path(working_directory) / common_dir_path).resolve() + + if common_dir_path.name == ".git": + return common_dir_path.parent.name or None + + if worktree_root: + return Path(worktree_root).name + + return None def normalize_queue_name(queue_name: str) -> str: diff --git a/task_queue.py b/task_queue.py index e44076a..4590efa 100644 --- a/task_queue.py +++ b/task_queue.py @@ -28,18 +28,22 @@ # Import shared queue infrastructure from queue_core import ( QueuePaths, + TaskOrigin, get_db as _get_db, init_db as _init_db, ensure_db as _ensure_db, cleanup_queue as _cleanup_queue, cleanup_targets_for_queue, + collect_task_origin, log_metric as _log_metric, log_fmt, is_process_alive, kill_process_tree, + insert_waiting_task, normalize_queue_name, parse_queue_capacities, attempt_task_start, + task_origin_kwargs, POLL_INTERVAL_WAITING, ) @@ -156,6 +160,19 @@ def log_metric(event: str, **kwargs): _log_metric(PATHS.metrics_path, event, MAX_METRICS_SIZE_MB, **kwargs) +def _current_context(): + """Best-effort FastMCP request context; unavailable in tests and background codepaths.""" + try: + return get_context() + except LookupError: + return None + + +def _current_client_id() -> str | None: + ctx = _current_context() + return ctx.client_id if ctx and ctx.client_id else None + + def cleanup_queue(conn, queue_name: str, queue_capacities: dict[str, int] | None = None): """Clean up queue using configured paths and detect orphaned tasks.""" if queue_capacities is None: @@ -269,7 +286,11 @@ def get_memory_mb() -> float: # --- Core Queue Logic --- -async def wait_for_turn(queue_name: str, command: str | None = None) -> int: +async def wait_for_turn( + queue_name: str, + command: str | None = None, + task_origin: TaskOrigin | None = None, +) -> int: """Register task, wait for turn, return task ID when acquired.""" queue_name = normalize_queue_name(queue_name) @@ -282,24 +303,29 @@ async def wait_for_turn(queue_name: str, command: str | None = None) -> int: cleanup_queue(conn, queue_name, QUEUE_CAPACITIES) my_pid = os.getpid() - ctx = None - try: - ctx = get_context() - except LookupError: - pass # Running outside request context (e.g., in tests) + ctx = _current_context() with get_db() as conn: - cursor = conn.execute( - "INSERT INTO queue (queue_name, status, pid, server_id, command) VALUES (?, ?, ?, ?, ?)", - (queue_name, "waiting", my_pid, SERVER_INSTANCE_ID, command), + task_id = insert_waiting_task( + conn, + queue_name, + my_pid, + SERVER_INSTANCE_ID, + command=command, + task_origin=task_origin, ) - task_id = cursor.lastrowid # Track this task as active for orphan detection with _active_task_ids_lock: _active_task_ids.add(task_id) - log_metric("task_queued", task_id=task_id, queue_name=queue_name, pid=my_pid) + log_metric( + "task_queued", + task_id=task_id, + queue_name=queue_name, + pid=my_pid, + **task_origin_kwargs(task_origin), + ) queued_at = time.time() if ctx: @@ -330,7 +356,9 @@ async def wait_for_turn(queue_name: str, command: str | None = None) -> int: "task_started", task_id=task_id, queue_name=queue_name, + pid=my_pid, wait_time_seconds=round(wait_time, 2), + **task_origin_kwargs(task_origin), ) if ctx: await ctx.info(log_fmt("Lock ACQUIRED. Starting execution.")) @@ -361,7 +389,9 @@ async def wait_for_turn(queue_name: str, command: str | None = None) -> int: "task_cancelled", task_id=task_id, queue_name=queue_name, + pid=my_pid, reason="client_disconnected", + **task_origin_kwargs(task_origin), ) with get_db() as conn: conn.execute("DELETE FROM queue WHERE id = ?", (task_id,)) @@ -374,11 +404,7 @@ async def release_lock(task_id: int): with _active_task_ids_lock: _active_task_ids.discard(task_id) - ctx = None - try: - ctx = get_context() - except LookupError: - pass + ctx = _current_context() try: with get_db() as conn: @@ -406,6 +432,7 @@ async def run_task( queue_name: str = "global", timeout_seconds: int = 1200, env_vars: str = "", + agent_name: str = "", ): """ Execute a command through the task queue for sequential processing. @@ -458,6 +485,7 @@ async def run_task( timeout_seconds: Max **execution** time before killing the task (default: 1200 = 20 mins). Queue wait time does NOT count against this timeout. env_vars: Environment variables to set, format: "KEY1=value1,KEY2=value2" + agent_name: Optional friendly caller label (for example `amp` or `claude-code`). Returns: Command output including stdout, stderr, and exit code. @@ -481,7 +509,10 @@ async def run_task( key, value = pair.split("=", 1) env[key.strip()] = value.strip() - task_id = await wait_for_turn(queue_name, command) + caller_name = agent_name.strip() or _current_client_id() + task_origin = collect_task_origin(working_directory, caller_name) + + task_id = await wait_for_turn(queue_name, command, task_origin=task_origin) mem_before = get_memory_mb() start = time.time() @@ -580,9 +611,11 @@ async def stream_to_file(stream, tail_buffer: deque, label: str): "task_timeout", task_id=task_id, queue_name=queue_name, + pid=os.getpid(), command=command, timeout_seconds=timeout_seconds, memory_mb=round(get_memory_mb(), 1), + **task_origin_kwargs(task_origin), ) cleanup_output_files() @@ -607,6 +640,7 @@ async def stream_to_file(stream, tail_buffer: deque, label: str): "task_completed", task_id=task_id, queue_name=queue_name, + pid=os.getpid(), command=command, exit_code=proc.returncode, duration_seconds=round(duration, 2), @@ -614,6 +648,7 @@ async def stream_to_file(stream, tail_buffer: deque, label: str): stderr_lines=stderr_count, memory_before_mb=round(mem_before, 1), memory_after_mb=round(mem_after, 1), + **task_origin_kwargs(task_origin), ) cleanup_output_files() @@ -654,8 +689,10 @@ async def stream_to_file(stream, tail_buffer: deque, label: str): "task_cancelled", task_id=task_id, queue_name=queue_name, + pid=os.getpid(), command=command, reason="client_disconnected_during_execution", + **task_origin_kwargs(task_origin), ) try: os.killpg(proc.pid, signal.SIGTERM) @@ -672,8 +709,10 @@ async def stream_to_file(stream, tail_buffer: deque, label: str): "task_error", task_id=task_id, queue_name=queue_name, + pid=os.getpid(), command=command, error=str(e), + **task_origin_kwargs(task_origin), ) return f"ERROR: {str(e)}" diff --git a/tests/test_queue.py b/tests/test_queue.py index d2f4339..9ca0a83 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -5,10 +5,12 @@ import pytest import asyncio +import json import os import subprocess import time from pathlib import Path +from types import SimpleNamespace # Set fast polling intervals for tests BEFORE importing task_queue os.environ["TASK_QUEUE_POLL_WAITING"] = "0.1" @@ -43,6 +45,8 @@ def clean_db(): """Clean database before each test.""" if DB_PATH.exists(): DB_PATH.unlink() + if PATHS.metrics_path.exists(): + PATHS.metrics_path.unlink() # Also remove WAL files if present wal_path = Path(str(DB_PATH) + "-wal") shm_path = Path(str(DB_PATH) + "-shm") @@ -81,6 +85,116 @@ def read_output_file(result_str: str) -> str: return "" +def create_git_repo(tmp_path: Path, name: str) -> Path: + repo_dir = tmp_path / name + repo_dir.mkdir() + subprocess.run(["git", "init", "-b", "main"], cwd=repo_dir, check=True, capture_output=True) + (repo_dir / "README.md").write_text("test repo\n") + subprocess.run(["git", "add", "README.md"], cwd=repo_dir, check=True, capture_output=True) + subprocess.run( + [ + "git", + "-c", + "user.name=Test User", + "-c", + "user.email=test@example.com", + "commit", + "-m", + "init", + ], + cwd=repo_dir, + check=True, + capture_output=True, + ) + return repo_dir + + +def test_git_context_uses_commit_hash_for_detached_head(monkeypatch): + calls = [] + + def fake_run(command, capture_output, text, timeout): + calls.append(command) + return SimpleNamespace( + returncode=0, + stdout="/tmp/repo\n/tmp/repo/.git\n0123456789abcdef0123456789abcdef01234567\nHEAD\n", + ) + + monkeypatch.setattr(queue_core.subprocess, "run", fake_run) + monkeypatch.setattr(queue_core, "_git_repo_name_from_common_dir", lambda *_: "sample-repo") + + assert queue_core._git_context("/tmp/repo") == ( + "/tmp/repo", + "sample-repo", + "0123456", + ) + assert calls == [[ + "git", + "-C", + "/tmp/repo", + "rev-parse", + "--show-toplevel", + "--git-common-dir", + "HEAD", + "--symbolic-full-name", + "HEAD", + ]] + + +@pytest.mark.asyncio +async def test_task_origin_is_persisted_in_queue_and_metrics(client, tmp_path): + repo_dir = create_git_repo(tmp_path, "metadata-repo") + expected_origin = queue_core.collect_task_origin(str(repo_dir), "amp") + + async with client: + result_task = asyncio.create_task( + client.call_tool( + "run_task", + { + "command": "sleep 1", + "working_directory": str(repo_dir), + "queue_name": "metadata_test", + "agent_name": "amp", + }, + ) + ) + + await asyncio.sleep(0.2) + + with get_db() as conn: + row = conn.execute( + """SELECT id, pid, working_directory, worktree_root, repo_name, git_branch, agent_name + FROM queue + WHERE queue_name = ? AND status = 'running'""", + ("metadata_test",), + ).fetchone() + + assert row is not None + assert row["working_directory"] == expected_origin.working_directory + assert row["worktree_root"] == expected_origin.worktree_root + assert row["repo_name"] == expected_origin.repo_name + assert row["git_branch"] == expected_origin.git_branch + assert row["agent_name"] == expected_origin.agent_name + + task_id = row["id"] + pid = row["pid"] + result = await result_task + + assert "SUCCESS" in str(result) + + entries = [json.loads(line) for line in PATHS.metrics_path.read_text().splitlines() if line.strip()] + task_entries = [entry for entry in entries if entry.get("task_id") == task_id] + + assert {entry["event"] for entry in task_entries} >= {"task_queued", "task_started", "task_completed"} + for entry in task_entries: + if entry["event"] in {"task_queued", "task_started", "task_completed"}: + assert entry["pid"] == pid + assert entry["working_directory"] == expected_origin.working_directory + assert entry["worktree_root"] == expected_origin.worktree_root + assert entry["repo_name"] == expected_origin.repo_name + assert entry["git_branch"] == expected_origin.git_branch + assert entry["agent_name"] == expected_origin.agent_name + + @pytest.mark.asyncio async def test_single_task_execution(client): """Test that a single task executes successfully.""" diff --git a/tq.py b/tq.py index caef9d7..fbfd235 100644 --- a/tq.py +++ b/tq.py @@ -21,6 +21,8 @@ # Import shared queue infrastructure from queue_core import ( QueuePaths, + TaskOrigin, + collect_task_origin, get_db, init_db, ensure_db, @@ -30,9 +32,11 @@ release_lock, is_process_alive, kill_process_tree, + insert_waiting_task, normalize_queue_name, parse_queue_capacities, attempt_task_start, + task_origin_kwargs, POLL_INTERVAL_WAITING, DEFAULT_MAX_LOCK_AGE_MINUTES, DEFAULT_MAX_METRICS_SIZE_MB, @@ -297,23 +301,46 @@ def cleanup_queue( conn.commit() -def register_task(conn, queue_name: str, paths: QueuePaths, command: str = None) -> int: +def register_task( + conn, + queue_name: str, + paths: QueuePaths, + command: str = None, + task_origin: TaskOrigin | None = None, +) -> int: """Register a task in the queue. Returns task_id immediately.""" my_pid = os.getpid() - cursor = conn.execute( - "INSERT INTO queue (queue_name, status, pid, server_id, command) VALUES (?, ?, ?, ?, ?)", - (queue_name, "waiting", my_pid, CLI_INSTANCE_ID, command), + task_id = insert_waiting_task( + conn, + queue_name, + my_pid, + CLI_INSTANCE_ID, + command=command, + task_origin=task_origin, ) conn.commit() - task_id = cursor.lastrowid - log_metric(paths, "task_queued", task_id=task_id, queue_name=queue_name, pid=my_pid) + log_metric( + paths, + "task_queued", + task_id=task_id, + queue_name=queue_name, + pid=my_pid, + **task_origin_kwargs(task_origin), + ) print(f"[tq] Task #{task_id} queued in '{queue_name}'") return task_id -def wait_for_turn(conn, queue_name: str, task_id: int, paths: QueuePaths, queue_capacities: dict[str, int]) -> None: +def wait_for_turn( + conn, + queue_name: str, + task_id: int, + paths: QueuePaths, + queue_capacities: dict[str, int], + task_origin: TaskOrigin | None = None, +) -> None: """Wait for the task's turn to run. Task must already be registered.""" my_pid = os.getpid() queued_at = time.time() @@ -347,7 +374,9 @@ def wait_for_turn(conn, queue_name: str, task_id: int, paths: QueuePaths, queue_ "task_started", task_id=task_id, queue_name=queue_name, + pid=my_pid, wait_time_seconds=round(wait_time, 2), + **task_origin_kwargs(task_origin), ) if wait_time > 1: print(f"[tq] Lock acquired after {wait_time:.1f}s wait") @@ -385,6 +414,7 @@ def cmd_run(args): paths = get_paths(args) paths.data_dir.mkdir(parents=True, exist_ok=True) + task_origin = collect_task_origin(working_dir) # Ensure database exists and is valid (recover if corrupted) ensure_db(paths) @@ -441,8 +471,8 @@ def cleanup_handler(signum, frame): cleanup_queue(conn, queue_name, paths, queue_capacities) # Register task first so task_id is available for cleanup if interrupted - task_id = register_task(conn, queue_name, paths, command=command) - wait_for_turn(conn, queue_name, task_id, paths, queue_capacities) + task_id = register_task(conn, queue_name, paths, command=command, task_origin=task_origin) + wait_for_turn(conn, queue_name, task_id, paths, queue_capacities, task_origin=task_origin) print(f"[tq] Running: {command}") print(f"[tq] Directory: {working_dir}") @@ -483,8 +513,10 @@ def cleanup_handler(signum, frame): "task_timeout", task_id=task_id, queue_name=queue_name, + pid=os.getpid(), command=command, timeout_seconds=timeout, + **task_origin_kwargs(task_origin), ) return 124 # Standard timeout exit code @@ -502,9 +534,11 @@ def cleanup_handler(signum, frame): "task_completed", task_id=task_id, queue_name=queue_name, + pid=os.getpid(), command=command, exit_code=exit_code, duration_seconds=round(duration, 2), + **task_origin_kwargs(task_origin), ) return exit_code @@ -517,7 +551,9 @@ def cleanup_handler(signum, frame): "task_error", task_id=task_id, queue_name=queue_name, + pid=os.getpid(), error=str(e), + **task_origin_kwargs(task_origin), ) return 1