Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion skills/ytstudio/references/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ Examples:
ytstudio analytics query -m views,shares -d country --sort -views -n 10

ytstudio analytics query -m views,estimatedMinutesWatched -d video \
--sort -views -n 5 -o json
--sort -views -n 5 --resolve -o json

ytstudio analytics query -m videoThumbnailImpressions,videoThumbnailImpressionsClickRate \
-d video --sort -videoThumbnailImpressions -n 10
Expand All @@ -316,6 +316,7 @@ $ ytstudio analytics query [OPTIONS]
* `--currency TEXT`: Currency code for revenue (e.g. EUR)
* `-o, --output TEXT`: Output format: table, json, csv [default: table]
* `--raw`: Show raw numbers instead of human-readable
* `--resolve`: Resolve video/playlist dimension IDs to title columns
* `--help`: Show this message and exit.

### `ytstudio analytics metrics`
Expand Down
71 changes: 70 additions & 1 deletion src/ytstudio/commands/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,67 @@ def _snap_to_week_start(value: str) -> str:
return (d - timedelta(days=(d.weekday() + 1) % 7)).isoformat()


def _chunks(values: list[str], size: int) -> list[list[str]]:
return [values[i : i + size] for i in range(0, len(values), size)]


def _fetch_snippet_titles(data_service, resource: str, ids: list[str]) -> dict[str, str]:
"""Fetch snippet titles for YouTube Data API resources by id."""
if not ids:
return {}

titles: dict[str, str] = {}
for batch in _chunks(ids, 50):
if resource == "video":
response = api(data_service.videos().list(part="snippet", id=",".join(batch)))
elif resource == "playlist":
response = api(data_service.playlists().list(part="snippet", id=",".join(batch)))
else:
raise ValueError(f"Unsupported resource for title resolution: {resource}")

for item in response.get("items", []):
item_id = item.get("id")
title = item.get("snippet", {}).get("title")
if item_id and title is not None:
titles[item_id] = title
return titles


def _resolve_query_dimension_titles(data_service, response: dict) -> dict:
"""Add title columns for video/playlist dimensions in an analytics query response."""
headers = list(response.get("columnHeaders", []))
rows = [list(row) for row in response.get("rows", [])]
header_names = [h.get("name") for h in headers]

resolvable = sorted(
(name for name in (DimensionName.VIDEO, DimensionName.PLAYLIST) if name in header_names),
key=header_names.index,
)
if not resolvable or not rows:
return response

title_maps: dict[str, dict[str, str]] = {}
for name in resolvable:
idx = header_names.index(name)
ids = list(dict.fromkeys(str(row[idx]) for row in rows if idx < len(row) and row[idx]))
title_maps[name] = _fetch_snippet_titles(data_service, name, ids)

# Insert title columns immediately after each resolved dimension. Iterate in
# reverse so earlier indices remain valid while mutating rows/headers.
for name in reversed(resolvable):
idx = header_names.index(name)
title_header = f"{name}Title"
headers.insert(
idx + 1,
{"name": title_header, "columnType": "DIMENSION", "dataType": "STRING"},
)
for row in rows:
resource_id = str(row[idx]) if idx < len(row) and row[idx] is not None else ""
row.insert(idx + 1, title_maps[name].get(resource_id))

return {**response, "columnHeaders": headers, "rows": rows}


def _format_query_response(response: dict, output: str) -> None:
headers = [h["name"] for h in response.get("columnHeaders", [])]
rows = response.get("rows", [])
Expand Down Expand Up @@ -422,6 +483,11 @@ def query(
currency: str = typer.Option(None, "--currency", help="Currency code for revenue (e.g. EUR)"),
output: str = typer.Option("table", "--output", "-o", help="Output format: table, json, csv"),
raw: bool = typer.Option(False, "--raw", help="Show raw numbers instead of human-readable"),
resolve: bool = typer.Option(
False,
"--resolve",
help="Resolve video/playlist dimension IDs to title columns",
),
):
"""Run a custom analytics query with any metrics and dimensions.

Expand All @@ -435,7 +501,7 @@ def query(
ytstudio analytics query -m views,shares -d country --sort -views -n 10

ytstudio analytics query -m views,estimatedMinutesWatched -d video \\
--sort -views -n 5 -o json
--sort -views -n 5 --resolve -o json

ytstudio analytics query -m videoThumbnailImpressions,videoThumbnailImpressionsClickRate \\
-d video --sort -videoThumbnailImpressions -n 10
Expand Down Expand Up @@ -507,6 +573,9 @@ def query(
currency=currency,
)

if resolve:
response = _resolve_query_dimension_titles(data_service, response)

_format_query_response(response, output)


Expand Down
177 changes: 176 additions & 1 deletion tests/test_analytics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import json
from unittest.mock import MagicMock, patch

import pytest
import typer
from typer.testing import CliRunner

from ytstudio.commands.analytics import _align_date_range
from ytstudio.commands.analytics import (
_align_date_range,
_fetch_snippet_titles,
_resolve_query_dimension_titles,
)
from ytstudio.main import app
from ytstudio.ui import format_number, set_raw_output

Expand Down Expand Up @@ -283,6 +288,176 @@ def test_query_csv_output(self):
assert lines[0] == "day,views,likes"
assert "2026-01-01" in lines[1]

def test_query_resolve_video_titles_json(self):
data_svc, analytics_svc = self._mock_services()
analytics_svc.reports.return_value.query.return_value.execute.return_value = {
"columnHeaders": [
{"name": "video", "columnType": "DIMENSION", "dataType": "STRING"},
{"name": "views", "columnType": "METRIC", "dataType": "INTEGER"},
],
"rows": [["vid1", 100], ["vid2", 50]],
}
data_svc.videos.return_value.list.return_value.execute.return_value = {
"items": [
{"id": "vid1", "snippet": {"title": "First video"}},
{"id": "vid2", "snippet": {"title": "Second video"}},
]
}
with (
patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc),
patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc),
):
result = runner.invoke(
app,
[
"analytics",
"query",
"-m",
"views",
"-d",
"video",
"--sort",
"-views",
"-n",
"2",
"--resolve",
"-o",
"json",
],
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data[0] == {"video": "vid1", "videoTitle": "First video", "views": 100}
data_svc.videos.return_value.list.assert_called_once()
assert data_svc.videos.return_value.list.call_args.kwargs["id"] == "vid1,vid2"

def test_query_resolve_playlist_titles_csv(self):
data_svc, analytics_svc = self._mock_services()
analytics_svc.reports.return_value.query.return_value.execute.return_value = {
"columnHeaders": [
{"name": "playlist", "columnType": "DIMENSION", "dataType": "STRING"},
{"name": "views", "columnType": "METRIC", "dataType": "INTEGER"},
],
"rows": [["pl1", 100]],
}
data_svc.playlists.return_value.list.return_value.execute.return_value = {
"items": [{"id": "pl1", "snippet": {"title": "My playlist"}}]
}
with (
patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc),
patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc),
):
result = runner.invoke(
app,
["analytics", "query", "-m", "views", "-d", "playlist", "--resolve", "-o", "csv"],
)
assert result.exit_code == 0
assert result.output.strip().split("\n") == [
"playlist,playlistTitle,views",
"pl1,My playlist,100",
]

def test_query_does_not_resolve_titles_without_flag(self):
data_svc, analytics_svc = self._mock_services()
analytics_svc.reports.return_value.query.return_value.execute.return_value = {
"columnHeaders": [
{"name": "video", "columnType": "DIMENSION", "dataType": "STRING"},
{"name": "views", "columnType": "METRIC", "dataType": "INTEGER"},
],
"rows": [["vid1", 100]],
}
with (
patch("ytstudio.commands.analytics.get_data_service", return_value=data_svc),
patch("ytstudio.commands.analytics.get_analytics_service", return_value=analytics_svc),
):
result = runner.invoke(
app,
[
"analytics",
"query",
"-m",
"views",
"-d",
"video",
"--sort",
"-views",
"-n",
"1",
"-o",
"json",
],
)
assert result.exit_code == 0
assert json.loads(result.output) == [{"video": "vid1", "views": 100}]
data_svc.videos.return_value.list.assert_not_called()

def test_fetch_snippet_titles_empty_ids_skips_api(self):
data_svc = MagicMock()
assert _fetch_snippet_titles(data_svc, "video", []) == {}
data_svc.videos.assert_not_called()

def test_fetch_snippet_titles_rejects_unknown_resource(self):
with pytest.raises(ValueError, match="Unsupported resource"):
_fetch_snippet_titles(MagicMock(), "channel", ["UC_test"])

def test_fetch_snippet_titles_batches_large_video_lists(self):
data_svc = MagicMock()
responses = [
{"items": [{"id": "v0", "snippet": {"title": "Video 0"}}]},
{"items": [{"id": "v50", "snippet": {"title": "Video 50"}}]},
]
data_svc.videos.return_value.list.return_value.execute.side_effect = responses

titles = _fetch_snippet_titles(data_svc, "video", [f"v{i}" for i in range(51)])

assert titles == {"v0": "Video 0", "v50": "Video 50"}
calls = data_svc.videos.return_value.list.call_args_list
assert len(calls) == 2
assert calls[0].kwargs["id"] == ",".join(f"v{i}" for i in range(50))
assert calls[1].kwargs["id"] == "v50"

def test_resolve_query_dimension_titles_no_rows_returns_original_response(self):
response = {
"columnHeaders": [{"name": "video", "columnType": "DIMENSION"}],
"rows": [],
}
assert _resolve_query_dimension_titles(MagicMock(), response) is response

def test_resolve_query_dimension_titles_no_resolvable_dimension_returns_original_response(self):
response = {
"columnHeaders": [{"name": "country", "columnType": "DIMENSION"}],
"rows": [["NL"]],
}
assert _resolve_query_dimension_titles(MagicMock(), response) is response

def test_resolve_query_dimension_titles_multiple_dimensions_preserves_order(self):
data_svc = MagicMock()
data_svc.videos.return_value.list.return_value.execute.return_value = {
"items": [{"id": "vid1", "snippet": {"title": "Video one"}}]
}
data_svc.playlists.return_value.list.return_value.execute.return_value = {
"items": [{"id": "pl1", "snippet": {"title": "Playlist one"}}]
}
response = {
"columnHeaders": [
{"name": "playlist", "columnType": "DIMENSION"},
{"name": "video", "columnType": "DIMENSION"},
{"name": "views", "columnType": "METRIC"},
],
"rows": [["pl1", "vid1", 10]],
}

resolved = _resolve_query_dimension_titles(data_svc, response)

assert [h["name"] for h in resolved["columnHeaders"]] == [
"playlist",
"playlistTitle",
"video",
"videoTitle",
"views",
]
assert resolved["rows"] == [["pl1", "Playlist one", "vid1", "Video one", 10]]

def test_query_with_filter(self):
data_svc, analytics_svc = self._mock_services()
with (
Expand Down
Loading