From e7df5dc5551a3b601054fcc2354cfc8fe843efe0 Mon Sep 17 00:00:00 2001 From: Jelmer de Wit <1598297+jdwit@users.noreply.github.com> Date: Thu, 18 Jun 2026 12:43:04 +0200 Subject: [PATCH] Add analytics dimension title resolution --- skills/ytstudio/references/reference.md | 3 +- src/ytstudio/commands/analytics.py | 71 +++++++++- tests/test_analytics.py | 177 +++++++++++++++++++++++- 3 files changed, 248 insertions(+), 3 deletions(-) diff --git a/skills/ytstudio/references/reference.md b/skills/ytstudio/references/reference.md index e5c4baa..d47c540 100644 --- a/skills/ytstudio/references/reference.md +++ b/skills/ytstudio/references/reference.md @@ -289,7 +289,7 @@ Examples: ytstudio analytics query -m views,shares -d country --sort -views -n 10 ytstudio analytics query -m views,estimatedMinutesWatched -d video \ - --sort -views -n 5 -o json + --sort -views -n 5 --resolve -o json ytstudio analytics query -m videoThumbnailImpressions,videoThumbnailImpressionsClickRate \ -d video --sort -videoThumbnailImpressions -n 10 @@ -316,6 +316,7 @@ $ ytstudio analytics query [OPTIONS] * `--currency TEXT`: Currency code for revenue (e.g. EUR) * `-o, --output TEXT`: Output format: table, json, csv [default: table] * `--raw`: Show raw numbers instead of human-readable +* `--resolve`: Resolve video/playlist dimension IDs to title columns * `--help`: Show this message and exit. ### `ytstudio analytics metrics` diff --git a/src/ytstudio/commands/analytics.py b/src/ytstudio/commands/analytics.py index 0aea481..6bfff94 100644 --- a/src/ytstudio/commands/analytics.py +++ b/src/ytstudio/commands/analytics.py @@ -347,6 +347,67 @@ def _snap_to_week_start(value: str) -> str: return (d - timedelta(days=(d.weekday() + 1) % 7)).isoformat() +def _chunks(values: list[str], size: int) -> list[list[str]]: + return [values[i : i + size] for i in range(0, len(values), size)] + + +def _fetch_snippet_titles(data_service, resource: str, ids: list[str]) -> dict[str, str]: + """Fetch snippet titles for YouTube Data API resources by id.""" + if not ids: + return {} + + titles: dict[str, str] = {} + for batch in _chunks(ids, 50): + if resource == "video": + response = api(data_service.videos().list(part="snippet", id=",".join(batch))) + elif resource == "playlist": + response = api(data_service.playlists().list(part="snippet", id=",".join(batch))) + else: + raise ValueError(f"Unsupported resource for title resolution: {resource}") + + for item in response.get("items", []): + item_id = item.get("id") + title = item.get("snippet", {}).get("title") + if item_id and title is not None: + titles[item_id] = title + return titles + + +def _resolve_query_dimension_titles(data_service, response: dict) -> dict: + """Add title columns for video/playlist dimensions in an analytics query response.""" + headers = list(response.get("columnHeaders", [])) + rows = [list(row) for row in response.get("rows", [])] + header_names = [h.get("name") for h in headers] + + resolvable = sorted( + (name for name in (DimensionName.VIDEO, DimensionName.PLAYLIST) if name in header_names), + key=header_names.index, + ) + if not resolvable or not rows: + return response + + title_maps: dict[str, dict[str, str]] = {} + for name in resolvable: + idx = header_names.index(name) + ids = list(dict.fromkeys(str(row[idx]) for row in rows if idx < len(row) and row[idx])) + title_maps[name] = _fetch_snippet_titles(data_service, name, ids) + + # Insert title columns immediately after each resolved dimension. Iterate in + # reverse so earlier indices remain valid while mutating rows/headers. + for name in reversed(resolvable): + idx = header_names.index(name) + title_header = f"{name}Title" + headers.insert( + idx + 1, + {"name": title_header, "columnType": "DIMENSION", "dataType": "STRING"}, + ) + for row in rows: + resource_id = str(row[idx]) if idx < len(row) and row[idx] is not None else "" + row.insert(idx + 1, title_maps[name].get(resource_id)) + + return {**response, "columnHeaders": headers, "rows": rows} + + def _format_query_response(response: dict, output: str) -> None: headers = [h["name"] for h in response.get("columnHeaders", [])] rows = response.get("rows", []) @@ -422,6 +483,11 @@ def query( currency: str = typer.Option(None, "--currency", help="Currency code for revenue (e.g. EUR)"), output: str = typer.Option("table", "--output", "-o", help="Output format: table, json, csv"), raw: bool = typer.Option(False, "--raw", help="Show raw numbers instead of human-readable"), + resolve: bool = typer.Option( + False, + "--resolve", + help="Resolve video/playlist dimension IDs to title columns", + ), ): """Run a custom analytics query with any metrics and dimensions. @@ -435,7 +501,7 @@ def query( ytstudio analytics query -m views,shares -d country --sort -views -n 10 ytstudio analytics query -m views,estimatedMinutesWatched -d video \\ - --sort -views -n 5 -o json + --sort -views -n 5 --resolve -o json ytstudio analytics query -m videoThumbnailImpressions,videoThumbnailImpressionsClickRate \\ -d video --sort -videoThumbnailImpressions -n 10 @@ -507,6 +573,9 @@ def query( currency=currency, ) + if resolve: + response = _resolve_query_dimension_titles(data_service, response) + _format_query_response(response, output) diff --git a/tests/test_analytics.py b/tests/test_analytics.py index c0700e6..c952ab1 100644 --- a/tests/test_analytics.py +++ b/tests/test_analytics.py @@ -1,10 +1,15 @@ import json from unittest.mock import MagicMock, patch +import pytest import typer from typer.testing import CliRunner -from ytstudio.commands.analytics import _align_date_range +from ytstudio.commands.analytics import ( + _align_date_range, + _fetch_snippet_titles, + _resolve_query_dimension_titles, +) from ytstudio.main import app from ytstudio.ui import format_number, set_raw_output @@ -283,6 +288,176 @@ def test_query_csv_output(self): assert lines[0] == "day,views,likes" assert "2026-01-01" in lines[1] + def test_query_resolve_video_titles_json(self): + data_svc, analytics_svc = self._mock_services() + analytics_svc.reports.return_value.query.return_value.execute.return_value = { + "columnHeaders": [ + {"name": "video", "columnType": "DIMENSION", "dataType": "STRING"}, + {"name": "views", "columnType": "METRIC", "dataType": "INTEGER"}, + ], + "rows": [["vid1", 100], ["vid2", 50]], + } + data_svc.videos.return_value.list.return_value.execute.return_value = { + "items": [ + {"id": "vid1", "snippet": {"title": "First video"}}, + {"id": "vid2", "snippet": {"title": "Second video"}}, + ] + } + with ( + patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc), + patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc), + ): + result = runner.invoke( + app, + [ + "analytics", + "query", + "-m", + "views", + "-d", + "video", + "--sort", + "-views", + "-n", + "2", + "--resolve", + "-o", + "json", + ], + ) + assert result.exit_code == 0 + data = json.loads(result.output) + assert data[0] == {"video": "vid1", "videoTitle": "First video", "views": 100} + data_svc.videos.return_value.list.assert_called_once() + assert data_svc.videos.return_value.list.call_args.kwargs["id"] == "vid1,vid2" + + def test_query_resolve_playlist_titles_csv(self): + data_svc, analytics_svc = self._mock_services() + analytics_svc.reports.return_value.query.return_value.execute.return_value = { + "columnHeaders": [ + {"name": "playlist", "columnType": "DIMENSION", "dataType": "STRING"}, + {"name": "views", "columnType": "METRIC", "dataType": "INTEGER"}, + ], + "rows": [["pl1", 100]], + } + data_svc.playlists.return_value.list.return_value.execute.return_value = { + "items": [{"id": "pl1", "snippet": {"title": "My playlist"}}] + } + with ( + patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc), + patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc), + ): + result = runner.invoke( + app, + ["analytics", "query", "-m", "views", "-d", "playlist", "--resolve", "-o", "csv"], + ) + assert result.exit_code == 0 + assert result.output.strip().split("\n") == [ + "playlist,playlistTitle,views", + "pl1,My playlist,100", + ] + + def test_query_does_not_resolve_titles_without_flag(self): + data_svc, analytics_svc = self._mock_services() + analytics_svc.reports.return_value.query.return_value.execute.return_value = { + "columnHeaders": [ + {"name": "video", "columnType": "DIMENSION", "dataType": "STRING"}, + {"name": "views", "columnType": "METRIC", "dataType": "INTEGER"}, + ], + "rows": [["vid1", 100]], + } + with ( + patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc), + patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc), + ): + result = runner.invoke( + app, + [ + "analytics", + "query", + "-m", + "views", + "-d", + "video", + "--sort", + "-views", + "-n", + "1", + "-o", + "json", + ], + ) + assert result.exit_code == 0 + assert json.loads(result.output) == [{"video": "vid1", "views": 100}] + data_svc.videos.return_value.list.assert_not_called() + + def test_fetch_snippet_titles_empty_ids_skips_api(self): + data_svc = MagicMock() + assert _fetch_snippet_titles(data_svc, "video", []) == {} + data_svc.videos.assert_not_called() + + def test_fetch_snippet_titles_rejects_unknown_resource(self): + with pytest.raises(ValueError, match="Unsupported resource"): + _fetch_snippet_titles(MagicMock(), "channel", ["UC_test"]) + + def test_fetch_snippet_titles_batches_large_video_lists(self): + data_svc = MagicMock() + responses = [ + {"items": [{"id": "v0", "snippet": {"title": "Video 0"}}]}, + {"items": [{"id": "v50", "snippet": {"title": "Video 50"}}]}, + ] + data_svc.videos.return_value.list.return_value.execute.side_effect = responses + + titles = _fetch_snippet_titles(data_svc, "video", [f"v{i}" for i in range(51)]) + + assert titles == {"v0": "Video 0", "v50": "Video 50"} + calls = data_svc.videos.return_value.list.call_args_list + assert len(calls) == 2 + assert calls[0].kwargs["id"] == ",".join(f"v{i}" for i in range(50)) + assert calls[1].kwargs["id"] == "v50" + + def test_resolve_query_dimension_titles_no_rows_returns_original_response(self): + response = { + "columnHeaders": [{"name": "video", "columnType": "DIMENSION"}], + "rows": [], + } + assert _resolve_query_dimension_titles(MagicMock(), response) is response + + def test_resolve_query_dimension_titles_no_resolvable_dimension_returns_original_response(self): + response = { + "columnHeaders": [{"name": "country", "columnType": "DIMENSION"}], + "rows": [["NL"]], + } + assert _resolve_query_dimension_titles(MagicMock(), response) is response + + def test_resolve_query_dimension_titles_multiple_dimensions_preserves_order(self): + data_svc = MagicMock() + data_svc.videos.return_value.list.return_value.execute.return_value = { + "items": [{"id": "vid1", "snippet": {"title": "Video one"}}] + } + data_svc.playlists.return_value.list.return_value.execute.return_value = { + "items": [{"id": "pl1", "snippet": {"title": "Playlist one"}}] + } + response = { + "columnHeaders": [ + {"name": "playlist", "columnType": "DIMENSION"}, + {"name": "video", "columnType": "DIMENSION"}, + {"name": "views", "columnType": "METRIC"}, + ], + "rows": [["pl1", "vid1", 10]], + } + + resolved = _resolve_query_dimension_titles(data_svc, response) + + assert [h["name"] for h in resolved["columnHeaders"]] == [ + "playlist", + "playlistTitle", + "video", + "videoTitle", + "views", + ] + assert resolved["rows"] == [["pl1", "Playlist one", "vid1", "Video one", 10]] + def test_query_with_filter(self): data_svc, analytics_svc = self._mock_services() with (