Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d887714
fix: Respect display.progress_bar=None in background threads
shuoweil Apr 17, 2026
42e4a50
refactor: Refactor BQ event progress bar config
shuoweil Apr 17, 2026
d6757bc
Refactor BQ event progress bar config and add system test
shuoweil Apr 17, 2026
3e8ddde
refactor: refactor code
shuoweil Apr 17, 2026
be825b5
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil Apr 20, 2026
96aaff3
docs: add ignore
shuoweil Apr 20, 2026
bfba719
format file
shuoweil Apr 20, 2026
4ffd647
format file
shuoweil Apr 20, 2026
ffbc397
Roll back .pre-commit-config.yaml changes
shuoweil Apr 20, 2026
27010d3
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil Apr 22, 2026
a6c42af
chore: format files
shuoweil Apr 22, 2026
9742165
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil Apr 29, 2026
cdcbf0c
rename _FALLBACK_TO_GLOBAL to _DEFAULT
shuoweil Apr 29, 2026
c20751b
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 1, 2026
a139c65
format: format code
shuoweil May 1, 2026
85bf48f
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 8, 2026
67c16c6
Apply review comments in events.py
shuoweil May 8, 2026
13b75e7
format code
shuoweil May 8, 2026
7b9f249
format code
shuoweil May 8, 2026
dd69a8d
fix mypy
shuoweil May 8, 2026
a3e4ac5
update code for lint
shuoweil May 8, 2026
f393a0d
format code
shuoweil May 8, 2026
193d33c
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 11, 2026
d001a9b
refactor: remove progress_bar from events
shuoweil May 11, 2026
6377840
refactor: use EventEnvelope for progress_bar context
shuoweil May 11, 2026
5584531
style: fix line length lints in formatting_helpers and __init__
shuoweil May 11, 2026
a7d0171
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 11, 2026
d372472
format file
shuoweil May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 55 additions & 37 deletions packages/bigframes/bigframes/core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
import datetime
import threading
import uuid
from typing import Any, Callable, Optional, Set
from typing import Any, Callable, Literal, Set

import google.cloud.bigquery._job_helpers
import google.cloud.bigquery.job.query
import google.cloud.bigquery.table

import bigframes.session.executor

_DEFAULT: Literal["default"] = "default"


class Subscriber:
def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher):
def __init__(
self, callback: Callable[[Event], None], *, publisher: Publisher
): # noqa: E501
self._publisher = publisher
self._callback = callback
self._subscriber_id = uuid.uuid4()
Expand Down Expand Up @@ -81,16 +85,22 @@ def unsubscribe(self, subscriber: Subscriber):
with self._subscribers_lock:
self._subscribers.remove(subscriber)

def publish(self, event: Event):
def publish(self, envelope: "EventEnvelope"):
with self._subscribers_lock:
for subscriber in self._subscribers:
subscriber(event)
subscriber(envelope)


class Event:
pass


@dataclasses.dataclass(frozen=True)
class EventEnvelope:
event: Event
progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = None


@dataclasses.dataclass(frozen=True)
class SessionClosed(Event):
session_id: str
Expand All @@ -106,7 +116,7 @@ class ExecutionRunning(Event):

@dataclasses.dataclass(frozen=True)
class ExecutionFinished(Event):
result: Optional[bigframes.session.executor.ExecuteResult] = None
result: bigframes.session.executor.ExecuteResult | None = None


@dataclasses.dataclass(frozen=True)
Expand All @@ -121,13 +131,16 @@ class BigQuerySentEvent(ExecutionRunning):
"""Query sent to BigQuery."""

query: str
billing_project: Optional[str] = None
location: Optional[str] = None
job_id: Optional[str] = None
request_id: Optional[str] = None
billing_project: str | None = None
location: str | None = None
job_id: str | None = None
request_id: str | None = None

@classmethod
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent):
def from_bqclient(
cls,
event: google.cloud.bigquery._job_helpers.QuerySentEvent,
):
return cls(
query=event.query,
billing_project=event.billing_project,
Expand All @@ -142,13 +155,16 @@ class BigQueryRetryEvent(ExecutionRunning):
"""Query sent another time because the previous attempt failed."""

query: str
billing_project: Optional[str] = None
location: Optional[str] = None
job_id: Optional[str] = None
request_id: Optional[str] = None
billing_project: str | None = None
location: str | None = None
job_id: str | None = None
request_id: str | None = None

@classmethod
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent):
def from_bqclient(
cls,
event: google.cloud.bigquery._job_helpers.QueryRetryEvent,
):
return cls(
query=event.query,
billing_project=event.billing_project,
Expand All @@ -162,19 +178,20 @@ def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent
class BigQueryReceivedEvent(ExecutionRunning):
"""Query received and acknowledged by the BigQuery API."""

billing_project: Optional[str] = None
location: Optional[str] = None
job_id: Optional[str] = None
statement_type: Optional[str] = None
state: Optional[str] = None
query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None
created: Optional[datetime.datetime] = None
started: Optional[datetime.datetime] = None
ended: Optional[datetime.datetime] = None
billing_project: str | None = None
location: str | None = None
job_id: str | None = None
statement_type: str | None = None
state: str | None = None
query_plan: list[google.cloud.bigquery.job.query.QueryPlanEntry] | None = None
created: datetime.datetime | None = None
started: datetime.datetime | None = None
ended: datetime.datetime | None = None

@classmethod
def from_bqclient(
cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent
cls,
event: google.cloud.bigquery._job_helpers.QueryReceivedEvent,
):
return cls(
billing_project=event.billing_project,
Expand All @@ -193,21 +210,22 @@ def from_bqclient(
class BigQueryFinishedEvent(ExecutionRunning):
"""Query finished successfully."""

billing_project: Optional[str] = None
location: Optional[str] = None
query_id: Optional[str] = None
job_id: Optional[str] = None
destination: Optional[google.cloud.bigquery.table.TableReference] = None
total_rows: Optional[int] = None
total_bytes_processed: Optional[int] = None
slot_millis: Optional[int] = None
created: Optional[datetime.datetime] = None
started: Optional[datetime.datetime] = None
ended: Optional[datetime.datetime] = None
billing_project: str | None = None
location: str | None = None
query_id: str | None = None
job_id: str | None = None
destination: google.cloud.bigquery.table.TableReference | None = None
total_rows: int | None = None
total_bytes_processed: int | None = None
slot_millis: int | None = None
created: datetime.datetime | None = None
started: datetime.datetime | None = None
ended: datetime.datetime | None = None

@classmethod
def from_bqclient(
cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent
cls,
event: google.cloud.bigquery._job_helpers.QueryFinishedEvent,
):
return cls(
billing_project=event.billing_project,
Expand Down
74 changes: 57 additions & 17 deletions packages/bigframes/bigframes/formatting_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]):
if query_job is None:
return "No job information available"
if query_job.dry_run:
return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}"
return (
f"Computation deferred. Computation will process "
f"{get_formatted_bytes(query_job.total_bytes_processed)}"
)
res = "Query Job Info"
for key, value in query_job_prop_pairs.items():
job_val = getattr(query_job, value)
Expand Down Expand Up @@ -105,11 +108,15 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]):
if query_job is None:
return "No job information available"
if query_job.dry_run:
return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}"
return (
f"Computation deferred. Computation will process "
f"{get_formatted_bytes(query_job.total_bytes_processed)}"
)

# We can reuse the plaintext repr for now or make a nicer table.
# For deferred mode consistency, let's just wrap the text in a pre block or similar,
# but the request implies we want a distinct HTML representation if possible.
# For deferred mode consistency, let's just wrap the text in a pre
# block or similar, but the request implies we want a distinct HTML
# representation if possible.
# However, existing repr_query_job returns a simple string.
# Let's format it as a simple table or list.

Expand All @@ -123,7 +130,10 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]):
location=query_job.location,
job_id=query_job.job_id,
)
res += f'<li>Job: <a target="_blank" href="{url}">{query_job.job_id}</a></li>'
res += (
f'<li>Job: <a target="_blank" href="{url}">'
f"{query_job.job_id}</a></li>"
)
elif key == "Slot Time":
res += f"<li>{key}: {get_formatted_time(job_val)}</li>"
elif key == "Bytes Processed":
Expand All @@ -138,7 +148,7 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]):


def progress_callback(
event: bigframes.core.events.Event,
envelope: Any,
):
"""Displays a progress bar while the query is running"""
global current_display_id
Expand All @@ -147,12 +157,21 @@ def progress_callback(
import bigframes._config
import bigframes.core.events
except ImportError:
# Since this gets called from __del__, skip if the import fails to avoid
# Since this gets called from __del__, skip if the import fails
# to avoid
# ImportError: sys.meta_path is None, Python is likely shutting down.
# This will allow cleanup to continue.
return

progress_bar = bigframes._config.options.display.progress_bar
if isinstance(envelope, bigframes.core.events.EventEnvelope):
event = envelope.event
progress_bar = envelope.progress_bar
else:
event = envelope
progress_bar = bigframes.core.events._DEFAULT

if progress_bar == bigframes.core.events._DEFAULT:
progress_bar = bigframes._config.options.display.progress_bar

if progress_bar == "auto":
progress_bar = "notebook" if in_ipython() else "terminal"
Expand Down Expand Up @@ -232,7 +251,8 @@ def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None):
job.result()
job.reload()
display.update_display(
display.HTML(get_base_job_loading_html(job)), display_id=display_id
display.HTML(get_base_job_loading_html(job)),
display_id=display_id,
)
elif progress_bar == "terminal":
inital_loading_bar = get_base_job_loading_string(job)
Expand Down Expand Up @@ -286,7 +306,10 @@ def render_job_link_html(
job_id=job_id,
)
if job_url:
job_link = f' [<a target="_blank" href="{job_url}">Job {project_id}:{location}.{job_id} details</a>]'
job_link = (
f' [<a target="_blank" href="{job_url}">'
f"Job {project_id}:{location}.{job_id} details</a>]"
)
else:
job_link = ""
return job_link
Expand Down Expand Up @@ -323,7 +346,10 @@ def get_job_url(
"""
if project_id is None or location is None or job_id is None:
return None
return f"""https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"""
return (
f"https://console.cloud.google.com/bigquery?project={project_id}"
f"&j=bq:{location}:{job_id}&page=queryresults"
)


def render_bqquery_sent_event_html(
Expand All @@ -348,7 +374,10 @@ def render_bqquery_sent_event_html(
job_id=event.job_id,
request_id=event.request_id,
)
query_text_details = f"<details><summary>SQL</summary><pre>{html.escape(event.query)}</pre></details>"
query_text_details = (
f"<details><summary>SQL</summary><pre>"
f"{html.escape(event.query)}</pre></details>"
)

return f"""
Query started{query_id}.{job_link}{query_text_details}
Expand Down Expand Up @@ -397,7 +426,10 @@ def render_bqquery_retry_event_html(
job_id=event.job_id,
request_id=event.request_id,
)
query_text_details = f"<details><summary>SQL</summary><pre>{html.escape(event.query)}</pre></details>"
query_text_details = (
f"<details><summary>SQL</summary><pre>"
f"{html.escape(event.query)}</pre></details>"
)

return f"""
Retrying query{query_id}.{job_link}{query_text_details}
Expand Down Expand Up @@ -443,7 +475,10 @@ def render_bqquery_received_event_html(
query_plan_details = ""
if event.query_plan:
plan_str = "\n".join([str(entry) for entry in event.query_plan])
query_plan_details = f"<details><summary>Query Plan</summary><pre>{html.escape(plan_str)}</pre></details>"
query_plan_details = (
f"<details><summary>Query Plan</summary><pre>"
f"{html.escape(plan_str)}</pre></details>"
)

return f"""
Query{query_id} is {event.state}.{job_link}{query_plan_details}
Expand Down Expand Up @@ -506,7 +541,8 @@ def render_bqquery_finished_event_plaintext(

bytes_str = ""
if event.total_bytes_processed is not None:
bytes_str = f" {humanize.naturalsize(event.total_bytes_processed)} processed."
size_str = humanize.naturalsize(event.total_bytes_processed)
bytes_str = f" {size_str} processed."

slot_time_str = ""
if event.slot_millis is not None:
Expand Down Expand Up @@ -572,7 +608,8 @@ def get_formatted_time(val):
Duration string
"""
try:
return humanize.naturaldelta(datetime.timedelta(milliseconds=float(val)))
delta = datetime.timedelta(milliseconds=float(val))
return humanize.naturaldelta(delta)
except Exception:
return val

Expand All @@ -591,7 +628,10 @@ def get_formatted_bytes(val):


def get_bytes_processed_string(val: Any):
"""Try to get bytes processed string. Return empty if passed non int value"""
"""Try to get bytes processed string.

Return empty if passed non int value.
"""
bytes_processed_string = ""
if isinstance(val, int):
bytes_processed_string = f"""{get_formatted_bytes(val)} processed. """
Expand Down
3 changes: 2 additions & 1 deletion packages/bigframes/bigframes/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ class Udf(Protocol):
"""

@property
def udf_def(self) -> udf_def.BigqueryUdf: ...
def udf_def(self) -> udf_def.BigqueryUdf:
...


class BigqueryCallableRoutine:
Expand Down
Loading
Loading