From d887714284f10c0ae57f2cd6d442378131c2c21f Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 17 Apr 2026 23:20:32 +0000 Subject: [PATCH 01/21] fix: Respect display.progress_bar=None in background threads --- packages/bigframes/bigframes/core/events.py | 28 +++++++++++++--- .../bigframes/bigframes/formatting_helpers.py | 5 ++- .../session/_io/bigquery/__init__.py | 32 ++++++++++++++----- .../tests/unit/test_formatting_helpers.py | 25 +++++++++++++++ 4 files changed, 77 insertions(+), 13 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 0724cc5414bb..54bb58acd595 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -125,15 +125,21 @@ class BigQuerySentEvent(ExecutionRunning): location: Optional[str] = None job_id: Optional[str] = None request_id: Optional[str] = None + progress_bar: Optional[str] = "fallback_to_global" @classmethod - def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent): + def from_bqclient( + cls, + event: google.cloud.bigquery._job_helpers.QuerySentEvent, + progress_bar: Optional[str] = "fallback_to_global", + ): return cls( query=event.query, billing_project=event.billing_project, location=event.location, job_id=event.job_id, request_id=event.request_id, + progress_bar=progress_bar, ) @@ -146,15 +152,21 @@ class BigQueryRetryEvent(ExecutionRunning): location: Optional[str] = None job_id: Optional[str] = None request_id: Optional[str] = None + progress_bar: Optional[str] = "fallback_to_global" @classmethod - def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent): + def from_bqclient( + cls, + event: google.cloud.bigquery._job_helpers.QueryRetryEvent, + progress_bar: Optional[str] = "fallback_to_global", + ): return cls( query=event.query, billing_project=event.billing_project, location=event.location, job_id=event.job_id, request_id=event.request_id, + progress_bar=progress_bar, ) @@ -171,10 +183,13 @@ class BigQueryReceivedEvent(ExecutionRunning): created: Optional[datetime.datetime] = None started: Optional[datetime.datetime] = None ended: Optional[datetime.datetime] = None + progress_bar: Optional[str] = "fallback_to_global" @classmethod def from_bqclient( - cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent + cls, + event: google.cloud.bigquery._job_helpers.QueryReceivedEvent, + progress_bar: Optional[str] = "fallback_to_global", ): return cls( billing_project=event.billing_project, @@ -186,6 +201,7 @@ def from_bqclient( created=event.created, started=event.started, ended=event.ended, + progress_bar=progress_bar, ) @@ -204,10 +220,13 @@ class BigQueryFinishedEvent(ExecutionRunning): created: Optional[datetime.datetime] = None started: Optional[datetime.datetime] = None ended: Optional[datetime.datetime] = None + progress_bar: Optional[str] = "fallback_to_global" @classmethod def from_bqclient( - cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent + cls, + event: google.cloud.bigquery._job_helpers.QueryFinishedEvent, + progress_bar: Optional[str] = "fallback_to_global", ): return cls( billing_project=event.billing_project, @@ -221,6 +240,7 @@ def from_bqclient( created=event.created, started=event.started, ended=event.ended, + progress_bar=progress_bar, ) diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py index cef14d39a3f6..834675de0d64 100644 --- a/packages/bigframes/bigframes/formatting_helpers.py +++ b/packages/bigframes/bigframes/formatting_helpers.py @@ -152,7 +152,10 @@ def progress_callback( # This will allow cleanup to continue. return - progress_bar = bigframes._config.options.display.progress_bar + # Prioritize progress_bar set on the event, falling back to thread-local option. + progress_bar = getattr(event, "progress_bar", "fallback_to_global") + if progress_bar == "fallback_to_global": + progress_bar = bigframes._config.options.display.progress_bar if progress_bar == "auto": progress_bar = "notebook" if in_ipython() else "terminal" diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index 780ba55c50db..7a7332073cd0 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -245,15 +245,27 @@ def add_and_trim_labels(job_config, session=None): def create_bq_event_callback(publisher): + import bigframes._config + + progress_bar = bigframes._config.options.display.progress_bar + def publish_bq_event(event): if isinstance(event, google.cloud.bigquery._job_helpers.QueryFinishedEvent): - bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient(event) + bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient( + event, progress_bar=progress_bar + ) elif isinstance(event, google.cloud.bigquery._job_helpers.QueryReceivedEvent): - bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient(event) + bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient( + event, progress_bar=progress_bar + ) elif isinstance(event, google.cloud.bigquery._job_helpers.QueryRetryEvent): - bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient(event) + bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient( + event, progress_bar=progress_bar + ) elif isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent): - bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient(event) + bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient( + event, progress_bar=progress_bar + ) else: bf_event = bigframes.core.events.BigQueryUnknownEvent(event) @@ -275,7 +287,8 @@ def start_query_with_client( query_with_job: Literal[True], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: + ... @overload @@ -291,7 +304,8 @@ def start_query_with_client( query_with_job: Literal[False], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + ... @overload @@ -308,7 +322,8 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: + ... @overload @@ -325,7 +340,8 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + ... def start_query_with_client( diff --git a/packages/bigframes/tests/unit/test_formatting_helpers.py b/packages/bigframes/tests/unit/test_formatting_helpers.py index ec681b36ab05..67be9398d241 100644 --- a/packages/bigframes/tests/unit/test_formatting_helpers.py +++ b/packages/bigframes/tests/unit/test_formatting_helpers.py @@ -212,3 +212,28 @@ def test_get_job_url(): job_id=job_id, location=location, project_id=project_id ) assert actual_url == expected_url + + +def test_progress_callback_respects_event_progress_bar(): + event = bfevents.BigQuerySentEvent( + query="SELECT * FROM my_table", + progress_bar=None, + ) + + with mock.patch("bigframes._config.options.display.progress_bar", "terminal"): + with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False): + with mock.patch("builtins.print") as mock_print: + formatting_helpers.progress_callback(event) + mock_print.assert_not_called() + + +def test_progress_callback_falls_back_to_global(): + event = bfevents.BigQuerySentEvent( + query="SELECT * FROM my_table", + ) + + with mock.patch("bigframes._config.options.display.progress_bar", "terminal"): + with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False): + with mock.patch("builtins.print") as mock_print: + formatting_helpers.progress_callback(event) + mock_print.assert_called_once() From 42e4a5094d5daed5fc4b8a82680f428d78a5fa14 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 17 Apr 2026 23:48:07 +0000 Subject: [PATCH 02/21] refactor: Refactor BQ event progress bar config --- packages/bigframes/bigframes/core/events.py | 19 +++++++----- .../bigframes/bigframes/formatting_helpers.py | 6 ++-- .../session/_io/bigquery/__init__.py | 31 +++++++------------ 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 54bb58acd595..2b42d4bb3bff 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -27,6 +27,9 @@ import bigframes.session.executor +_FALLBACK_TO_GLOBAL = "fallback_to_global" + + class Subscriber: def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): self._publisher = publisher @@ -125,13 +128,13 @@ class BigQuerySentEvent(ExecutionRunning): location: Optional[str] = None job_id: Optional[str] = None request_id: Optional[str] = None - progress_bar: Optional[str] = "fallback_to_global" + progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent, - progress_bar: Optional[str] = "fallback_to_global", + progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL, ): return cls( query=event.query, @@ -152,13 +155,13 @@ class BigQueryRetryEvent(ExecutionRunning): location: Optional[str] = None job_id: Optional[str] = None request_id: Optional[str] = None - progress_bar: Optional[str] = "fallback_to_global" + progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent, - progress_bar: Optional[str] = "fallback_to_global", + progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL, ): return cls( query=event.query, @@ -183,13 +186,13 @@ class BigQueryReceivedEvent(ExecutionRunning): created: Optional[datetime.datetime] = None started: Optional[datetime.datetime] = None ended: Optional[datetime.datetime] = None - progress_bar: Optional[str] = "fallback_to_global" + progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent, - progress_bar: Optional[str] = "fallback_to_global", + progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL, ): return cls( billing_project=event.billing_project, @@ -220,13 +223,13 @@ class BigQueryFinishedEvent(ExecutionRunning): created: Optional[datetime.datetime] = None started: Optional[datetime.datetime] = None ended: Optional[datetime.datetime] = None - progress_bar: Optional[str] = "fallback_to_global" + progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent, - progress_bar: Optional[str] = "fallback_to_global", + progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL, ): return cls( billing_project=event.billing_project, diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py index 834675de0d64..3d4082578f5a 100644 --- a/packages/bigframes/bigframes/formatting_helpers.py +++ b/packages/bigframes/bigframes/formatting_helpers.py @@ -153,8 +153,10 @@ def progress_callback( return # Prioritize progress_bar set on the event, falling back to thread-local option. - progress_bar = getattr(event, "progress_bar", "fallback_to_global") - if progress_bar == "fallback_to_global": + progress_bar = getattr( + event, "progress_bar", bigframes.core.events._FALLBACK_TO_GLOBAL + ) + if progress_bar == bigframes.core.events._FALLBACK_TO_GLOBAL: progress_bar = bigframes._config.options.display.progress_bar if progress_bar == "auto": diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index 7a7332073cd0..a9787ba12bb8 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -249,26 +249,19 @@ def create_bq_event_callback(publisher): progress_bar = bigframes._config.options.display.progress_bar - def publish_bq_event(event): - if isinstance(event, google.cloud.bigquery._job_helpers.QueryFinishedEvent): - bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient( - event, progress_bar=progress_bar - ) - elif isinstance(event, google.cloud.bigquery._job_helpers.QueryReceivedEvent): - bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient( - event, progress_bar=progress_bar - ) - elif isinstance(event, google.cloud.bigquery._job_helpers.QueryRetryEvent): - bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient( - event, progress_bar=progress_bar - ) - elif isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent): - bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient( - event, progress_bar=progress_bar - ) - else: - bf_event = bigframes.core.events.BigQueryUnknownEvent(event) + event_map = { + google.cloud.bigquery._job_helpers.QueryFinishedEvent: bigframes.core.events.BigQueryFinishedEvent, + google.cloud.bigquery._job_helpers.QueryReceivedEvent: bigframes.core.events.BigQueryReceivedEvent, + google.cloud.bigquery._job_helpers.QueryRetryEvent: bigframes.core.events.BigQueryRetryEvent, + google.cloud.bigquery._job_helpers.QuerySentEvent: bigframes.core.events.BigQuerySentEvent, + } + def publish_bq_event(event): + bf_event = bigframes.core.events.BigQueryUnknownEvent(event) + for bq_type, bf_type in event_map.items(): + if isinstance(event, bq_type): + bf_event = bf_type.from_bqclient(event, progress_bar=progress_bar) + break publisher.publish(bf_event) return publish_bq_event From d6757bc61c5a31c4850e0730ab7aec0f628ad83d Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 17 Apr 2026 23:54:59 +0000 Subject: [PATCH 03/21] Refactor BQ event progress bar config and add system test --- .../tests/system/small/test_progress_bar.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/bigframes/tests/system/small/test_progress_bar.py b/packages/bigframes/tests/system/small/test_progress_bar.py index bc247f6078ce..a179e18332af 100644 --- a/packages/bigframes/tests/system/small/test_progress_bar.py +++ b/packages/bigframes/tests/system/small/test_progress_bar.py @@ -104,6 +104,23 @@ def test_progress_bar_load_jobs( assert_loading_msg_exist(capsys.readouterr().out, pattern="Load") +def test_progress_bar_uniqueness_check(session: bf.Session, capsys): + # Ensure strictly_ordered is True (default) to trigger uniqueness check + assert session._strictly_ordered + + capsys.readouterr() # clear output + + with bf.option_context("display.progress_bar", "terminal"): + # Read a table and specify a non-unique index_col to trigger the check. + # We use a public table to make it a "real" test. + session.read_gbq_table( + "bigquery-public-data.ml_datasets.penguins", + index_col="island", + ) + + assert_loading_msg_exist(capsys.readouterr().out) + + def assert_loading_msg_exist(capstdout: str, pattern=job_load_message_regex): num_loading_msg = 0 lines = capstdout.split("\n") From 3e8ddde0bc6f79dd76c9befb94dffc11d3eb8ce3 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 17 Apr 2026 23:59:22 +0000 Subject: [PATCH 04/21] refactor: refactor code --- .../bigframes/session/_io/bigquery/__init__.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index a9787ba12bb8..afef219f2f23 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -280,8 +280,7 @@ def start_query_with_client( query_with_job: Literal[True], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: - ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... @overload @@ -297,8 +296,7 @@ def start_query_with_client( query_with_job: Literal[False], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: - ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... @overload @@ -315,8 +313,7 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: - ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... @overload @@ -333,8 +330,7 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: - ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... def start_query_with_client( From 96aaff385992f5e0a3dad00bd440ed4068b3543c Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 20 Apr 2026 18:32:33 +0000 Subject: [PATCH 05/21] docs: add ignore --- .../bigframes/session/_io/bigquery/__init__.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index afef219f2f23..fb87c611d7bb 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -260,7 +260,7 @@ def publish_bq_event(event): bf_event = bigframes.core.events.BigQueryUnknownEvent(event) for bq_type, bf_type in event_map.items(): if isinstance(event, bq_type): - bf_event = bf_type.from_bqclient(event, progress_bar=progress_bar) + bf_event = bf_type.from_bqclient(event, progress_bar=progress_bar) # type: ignore break publisher.publish(bf_event) @@ -280,7 +280,8 @@ def start_query_with_client( query_with_job: Literal[True], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: + ... @overload @@ -296,7 +297,8 @@ def start_query_with_client( query_with_job: Literal[False], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + ... @overload @@ -313,7 +315,8 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: + ... @overload @@ -330,7 +333,8 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + ... def start_query_with_client( From bfba71905e23671e24ad5ebfebc5807010c448f5 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 20 Apr 2026 19:04:01 +0000 Subject: [PATCH 06/21] format file --- .pre-commit-config.yaml | 6 +++--- packages/bigframes/bigframes/core/events.py | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5405cc8ff1f3..f0d84681dfc3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,16 +16,16 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.0.1 + rev: v6.0.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer - id: check-yaml - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 26.3.1 hooks: - id: black - repo: https://github.com/pycqa/flake8 - rev: 3.9.2 + rev: 7.3.0 hooks: - id: flake8 diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 2b42d4bb3bff..579621a107f0 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -22,16 +22,18 @@ import google.cloud.bigquery._job_helpers import google.cloud.bigquery.job.query +from google.cloud.bigquery.job.query import QueryPlanEntry import google.cloud.bigquery.table import bigframes.session.executor - _FALLBACK_TO_GLOBAL = "fallback_to_global" class Subscriber: - def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): + def __init__( + self, callback: Callable[[Event], None], *, publisher: Publisher + ): # noqa: E501 self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() @@ -182,7 +184,7 @@ class BigQueryReceivedEvent(ExecutionRunning): job_id: Optional[str] = None statement_type: Optional[str] = None state: Optional[str] = None - query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None + query_plan: Optional[list[QueryPlanEntry]] = None created: Optional[datetime.datetime] = None started: Optional[datetime.datetime] = None ended: Optional[datetime.datetime] = None From 4ffd647d8eb08a56e93288addcf06f6d7e330c31 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 20 Apr 2026 22:52:36 +0000 Subject: [PATCH 07/21] format file --- packages/bigframes/bigframes/core/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 579621a107f0..220e0e9a535f 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -22,8 +22,8 @@ import google.cloud.bigquery._job_helpers import google.cloud.bigquery.job.query -from google.cloud.bigquery.job.query import QueryPlanEntry import google.cloud.bigquery.table +from google.cloud.bigquery.job.query import QueryPlanEntry import bigframes.session.executor From ffbc3974718c460ba1e4211c29c8e3ba9f4037d6 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 20 Apr 2026 22:53:44 +0000 Subject: [PATCH 08/21] Roll back .pre-commit-config.yaml changes --- .pre-commit-config.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f0d84681dfc3..5405cc8ff1f3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,16 +16,16 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v6.0.0 + rev: v4.0.1 hooks: - id: trailing-whitespace - id: end-of-file-fixer - id: check-yaml - repo: https://github.com/psf/black - rev: 26.3.1 + rev: 22.3.0 hooks: - id: black - repo: https://github.com/pycqa/flake8 - rev: 7.3.0 + rev: 3.9.2 hooks: - id: flake8 From a6c42aff42e449fdedf438c49d727238948a9b4a Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 22 Apr 2026 18:39:57 +0000 Subject: [PATCH 09/21] chore: format files --- packages/bigframes/bigframes/core/events.py | 4 +--- .../bigframes/session/_io/bigquery/__init__.py | 12 ++++-------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 220e0e9a535f..6471cce58733 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -31,9 +31,7 @@ class Subscriber: - def __init__( - self, callback: Callable[[Event], None], *, publisher: Publisher - ): # noqa: E501 + def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): # noqa: E501 self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index fb87c611d7bb..703cb4704fec 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -280,8 +280,7 @@ def start_query_with_client( query_with_job: Literal[True], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: - ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... @overload @@ -297,8 +296,7 @@ def start_query_with_client( query_with_job: Literal[False], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: - ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... @overload @@ -315,8 +313,7 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: - ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... @overload @@ -333,8 +330,7 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: - ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... def start_query_with_client( From cdcbf0c609a9389f3228808d34bab73881aa5ddb Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 29 Apr 2026 17:55:36 +0000 Subject: [PATCH 10/21] rename _FALLBACK_TO_GLOBAL to _DEFAULT --- packages/bigframes/bigframes/core/events.py | 18 +++++++++--------- .../bigframes/bigframes/formatting_helpers.py | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 6471cce58733..ef843763e85a 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -27,7 +27,7 @@ import bigframes.session.executor -_FALLBACK_TO_GLOBAL = "fallback_to_global" +_DEFAULT = "default" class Subscriber: @@ -128,13 +128,13 @@ class BigQuerySentEvent(ExecutionRunning): location: Optional[str] = None job_id: Optional[str] = None request_id: Optional[str] = None - progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL + progress_bar: Optional[str] = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent, - progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL, + progress_bar: Optional[str] = _DEFAULT, ): return cls( query=event.query, @@ -155,13 +155,13 @@ class BigQueryRetryEvent(ExecutionRunning): location: Optional[str] = None job_id: Optional[str] = None request_id: Optional[str] = None - progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL + progress_bar: Optional[str] = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent, - progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL, + progress_bar: Optional[str] = _DEFAULT, ): return cls( query=event.query, @@ -186,13 +186,13 @@ class BigQueryReceivedEvent(ExecutionRunning): created: Optional[datetime.datetime] = None started: Optional[datetime.datetime] = None ended: Optional[datetime.datetime] = None - progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL + progress_bar: Optional[str] = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent, - progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL, + progress_bar: Optional[str] = _DEFAULT, ): return cls( billing_project=event.billing_project, @@ -223,13 +223,13 @@ class BigQueryFinishedEvent(ExecutionRunning): created: Optional[datetime.datetime] = None started: Optional[datetime.datetime] = None ended: Optional[datetime.datetime] = None - progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL + progress_bar: Optional[str] = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent, - progress_bar: Optional[str] = _FALLBACK_TO_GLOBAL, + progress_bar: Optional[str] = _DEFAULT, ): return cls( billing_project=event.billing_project, diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py index 3d4082578f5a..4b86b9816c07 100644 --- a/packages/bigframes/bigframes/formatting_helpers.py +++ b/packages/bigframes/bigframes/formatting_helpers.py @@ -154,9 +154,9 @@ def progress_callback( # Prioritize progress_bar set on the event, falling back to thread-local option. progress_bar = getattr( - event, "progress_bar", bigframes.core.events._FALLBACK_TO_GLOBAL + event, "progress_bar", bigframes.core.events._DEFAULT ) - if progress_bar == bigframes.core.events._FALLBACK_TO_GLOBAL: + if progress_bar == bigframes.core.events._DEFAULT: progress_bar = bigframes._config.options.display.progress_bar if progress_bar == "auto": From a139c658752706e3283b7fc1b7bcdf0002b1c5c9 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 1 May 2026 23:40:19 +0000 Subject: [PATCH 11/21] format: format code --- packages/bigframes/bigframes/formatting_helpers.py | 4 +--- packages/bigframes/bigframes/pandas/io/api.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py index 4b86b9816c07..8b78e775d56c 100644 --- a/packages/bigframes/bigframes/formatting_helpers.py +++ b/packages/bigframes/bigframes/formatting_helpers.py @@ -153,9 +153,7 @@ def progress_callback( return # Prioritize progress_bar set on the event, falling back to thread-local option. - progress_bar = getattr( - event, "progress_bar", bigframes.core.events._DEFAULT - ) + progress_bar = getattr(event, "progress_bar", bigframes.core.events._DEFAULT) if progress_bar == bigframes.core.events._DEFAULT: progress_bar = bigframes._config.options.display.progress_bar diff --git a/packages/bigframes/bigframes/pandas/io/api.py b/packages/bigframes/bigframes/pandas/io/api.py index e412f5f2798a..fd175459fa4b 100644 --- a/packages/bigframes/bigframes/pandas/io/api.py +++ b/packages/bigframes/bigframes/pandas/io/api.py @@ -654,8 +654,8 @@ def from_glob_path( def _get_bqclient_and_project() -> Tuple[bigquery.Client, str]: # Address circular imports in doctest due to bigframes/session/__init__.py # containing a lot of logic and samples. - from bigframes.session import clients import bigframes._config.auth + from bigframes.session import clients credentials, project = bigframes._config.auth.resolve_credentials_and_project( config.options.bigquery From 67c16c660b1a7a54f946756e4acc94896ff06080 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 8 May 2026 21:00:27 +0000 Subject: [PATCH 12/21] Apply review comments in events.py --- packages/bigframes/bigframes/core/events.py | 85 +++++++++++---------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index ef843763e85a..fa1f86e3d568 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -18,12 +18,11 @@ import datetime import threading import uuid -from typing import Any, Callable, Optional, Set +from typing import Any, Callable, Literal, Set import google.cloud.bigquery._job_helpers import google.cloud.bigquery.job.query import google.cloud.bigquery.table -from google.cloud.bigquery.job.query import QueryPlanEntry import bigframes.session.executor @@ -31,7 +30,9 @@ class Subscriber: - def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): # noqa: E501 + def __init__( + self, callback: Callable[[Event], None], *, publisher: Publisher + ): # noqa: E501 self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() @@ -109,7 +110,7 @@ class ExecutionRunning(Event): @dataclasses.dataclass(frozen=True) class ExecutionFinished(Event): - result: Optional[bigframes.session.executor.ExecuteResult] = None + result: bigframes.session.executor.ExecuteResult | None = None @dataclasses.dataclass(frozen=True) @@ -124,17 +125,18 @@ class BigQuerySentEvent(ExecutionRunning): """Query sent to BigQuery.""" query: str - billing_project: Optional[str] = None - location: Optional[str] = None - job_id: Optional[str] = None - request_id: Optional[str] = None - progress_bar: Optional[str] = _DEFAULT + billing_project: str | None = None + location: str | None = None + job_id: str | None = None + request_id: str | None = None + progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent, - progress_bar: Optional[str] = _DEFAULT, + progress_bar: Literal["default", "auto", "notebook", "terminal"] + | None = _DEFAULT, ): return cls( query=event.query, @@ -151,17 +153,18 @@ class BigQueryRetryEvent(ExecutionRunning): """Query sent another time because the previous attempt failed.""" query: str - billing_project: Optional[str] = None - location: Optional[str] = None - job_id: Optional[str] = None - request_id: Optional[str] = None - progress_bar: Optional[str] = _DEFAULT + billing_project: str | None = None + location: str | None = None + job_id: str | None = None + request_id: str | None = None + progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent, - progress_bar: Optional[str] = _DEFAULT, + progress_bar: Literal["default", "auto", "notebook", "terminal"] + | None = _DEFAULT, ): return cls( query=event.query, @@ -177,22 +180,23 @@ def from_bqclient( class BigQueryReceivedEvent(ExecutionRunning): """Query received and acknowledged by the BigQuery API.""" - billing_project: Optional[str] = None - location: Optional[str] = None - job_id: Optional[str] = None - statement_type: Optional[str] = None - state: Optional[str] = None - query_plan: Optional[list[QueryPlanEntry]] = None - created: Optional[datetime.datetime] = None - started: Optional[datetime.datetime] = None - ended: Optional[datetime.datetime] = None - progress_bar: Optional[str] = _DEFAULT + billing_project: str | None = None + location: str | None = None + job_id: str | None = None + statement_type: str | None = None + state: str | None = None + query_plan: list[google.cloud.bigquery.job.query.QueryPlanEntry] | None = None + created: datetime.datetime | None = None + started: datetime.datetime | None = None + ended: datetime.datetime | None = None + progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent, - progress_bar: Optional[str] = _DEFAULT, + progress_bar: Literal["default", "auto", "notebook", "terminal"] + | None = _DEFAULT, ): return cls( billing_project=event.billing_project, @@ -212,24 +216,25 @@ def from_bqclient( class BigQueryFinishedEvent(ExecutionRunning): """Query finished successfully.""" - billing_project: Optional[str] = None - location: Optional[str] = None - query_id: Optional[str] = None - job_id: Optional[str] = None - destination: Optional[google.cloud.bigquery.table.TableReference] = None - total_rows: Optional[int] = None - total_bytes_processed: Optional[int] = None - slot_millis: Optional[int] = None - created: Optional[datetime.datetime] = None - started: Optional[datetime.datetime] = None - ended: Optional[datetime.datetime] = None - progress_bar: Optional[str] = _DEFAULT + billing_project: str | None = None + location: str | None = None + query_id: str | None = None + job_id: str | None = None + destination: google.cloud.bigquery.table.TableReference | None = None + total_rows: int | None = None + total_bytes_processed: int | None = None + slot_millis: int | None = None + created: datetime.datetime | None = None + started: datetime.datetime | None = None + ended: datetime.datetime | None = None + progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent, - progress_bar: Optional[str] = _DEFAULT, + progress_bar: Literal["default", "auto", "notebook", "terminal"] + | None = _DEFAULT, ): return cls( billing_project=event.billing_project, From 13b75e7019519bd87700aafe8146f9263bf7dee3 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 8 May 2026 21:06:31 +0000 Subject: [PATCH 13/21] format code --- packages/bigframes/bigframes/core/events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index fa1f86e3d568..4d17fe24a487 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -30,9 +30,7 @@ class Subscriber: - def __init__( - self, callback: Callable[[Event], None], *, publisher: Publisher - ): # noqa: E501 + def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): # noqa: E501 self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() From 7b9f24940fef514ddb926572a866d93aaf3a7228 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 8 May 2026 21:08:36 +0000 Subject: [PATCH 14/21] format code --- packages/bigframes/bigframes/core/events.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 4d17fe24a487..fa1f86e3d568 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -30,7 +30,9 @@ class Subscriber: - def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): # noqa: E501 + def __init__( + self, callback: Callable[[Event], None], *, publisher: Publisher + ): # noqa: E501 self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() From dd69a8d3a89c3f551dfcfa5136245da81a75b78c Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 8 May 2026 21:24:38 +0000 Subject: [PATCH 15/21] fix mypy --- packages/bigframes/bigframes/core/events.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index fa1f86e3d568..28204736e785 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -26,13 +26,11 @@ import bigframes.session.executor -_DEFAULT = "default" +_DEFAULT: Literal["default"] = "default" class Subscriber: - def __init__( - self, callback: Callable[[Event], None], *, publisher: Publisher - ): # noqa: E501 + def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): # noqa: E501 self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() From a3e4ac5321d9ec0a9f8c4e7e990f525c0535c167 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 8 May 2026 21:25:57 +0000 Subject: [PATCH 16/21] update code for lint --- packages/bigframes/bigframes/core/events.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 28204736e785..1e3330771858 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -30,7 +30,9 @@ class Subscriber: - def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): # noqa: E501 + def __init__( + self, callback: Callable[[Event], None], *, publisher: Publisher + ): # noqa: E501 self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() From f393a0dbca159ca2b91cef3c31971b5e4c4aac42 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 8 May 2026 21:31:45 +0000 Subject: [PATCH 17/21] format code --- packages/bigframes/bigframes/core/events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 1e3330771858..28204736e785 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -30,9 +30,7 @@ class Subscriber: - def __init__( - self, callback: Callable[[Event], None], *, publisher: Publisher - ): # noqa: E501 + def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): # noqa: E501 self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() From d001a9b9a6898fafa0f4308cbe44fa04c1b4f5af Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 11 May 2026 22:15:55 +0000 Subject: [PATCH 18/21] refactor: remove progress_bar from events --- packages/bigframes/bigframes/core/events.py | 16 ---------------- .../bigframes/bigframes/formatting_helpers.py | 5 +---- .../bigframes/session/_io/bigquery/__init__.py | 6 +----- .../tests/unit/test_formatting_helpers.py | 13 ------------- 4 files changed, 2 insertions(+), 38 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 28204736e785..80f82297deae 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -127,14 +127,11 @@ class BigQuerySentEvent(ExecutionRunning): location: str | None = None job_id: str | None = None request_id: str | None = None - progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent, - progress_bar: Literal["default", "auto", "notebook", "terminal"] - | None = _DEFAULT, ): return cls( query=event.query, @@ -142,7 +139,6 @@ def from_bqclient( location=event.location, job_id=event.job_id, request_id=event.request_id, - progress_bar=progress_bar, ) @@ -155,14 +151,11 @@ class BigQueryRetryEvent(ExecutionRunning): location: str | None = None job_id: str | None = None request_id: str | None = None - progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent, - progress_bar: Literal["default", "auto", "notebook", "terminal"] - | None = _DEFAULT, ): return cls( query=event.query, @@ -170,7 +163,6 @@ def from_bqclient( location=event.location, job_id=event.job_id, request_id=event.request_id, - progress_bar=progress_bar, ) @@ -187,14 +179,11 @@ class BigQueryReceivedEvent(ExecutionRunning): created: datetime.datetime | None = None started: datetime.datetime | None = None ended: datetime.datetime | None = None - progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent, - progress_bar: Literal["default", "auto", "notebook", "terminal"] - | None = _DEFAULT, ): return cls( billing_project=event.billing_project, @@ -206,7 +195,6 @@ def from_bqclient( created=event.created, started=event.started, ended=event.ended, - progress_bar=progress_bar, ) @@ -225,14 +213,11 @@ class BigQueryFinishedEvent(ExecutionRunning): created: datetime.datetime | None = None started: datetime.datetime | None = None ended: datetime.datetime | None = None - progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = _DEFAULT @classmethod def from_bqclient( cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent, - progress_bar: Literal["default", "auto", "notebook", "terminal"] - | None = _DEFAULT, ): return cls( billing_project=event.billing_project, @@ -246,7 +231,6 @@ def from_bqclient( created=event.created, started=event.started, ended=event.ended, - progress_bar=progress_bar, ) diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py index 8b78e775d56c..cef14d39a3f6 100644 --- a/packages/bigframes/bigframes/formatting_helpers.py +++ b/packages/bigframes/bigframes/formatting_helpers.py @@ -152,10 +152,7 @@ def progress_callback( # This will allow cleanup to continue. return - # Prioritize progress_bar set on the event, falling back to thread-local option. - progress_bar = getattr(event, "progress_bar", bigframes.core.events._DEFAULT) - if progress_bar == bigframes.core.events._DEFAULT: - progress_bar = bigframes._config.options.display.progress_bar + progress_bar = bigframes._config.options.display.progress_bar if progress_bar == "auto": progress_bar = "notebook" if in_ipython() else "terminal" diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index 703cb4704fec..9db639161844 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -245,10 +245,6 @@ def add_and_trim_labels(job_config, session=None): def create_bq_event_callback(publisher): - import bigframes._config - - progress_bar = bigframes._config.options.display.progress_bar - event_map = { google.cloud.bigquery._job_helpers.QueryFinishedEvent: bigframes.core.events.BigQueryFinishedEvent, google.cloud.bigquery._job_helpers.QueryReceivedEvent: bigframes.core.events.BigQueryReceivedEvent, @@ -260,7 +256,7 @@ def publish_bq_event(event): bf_event = bigframes.core.events.BigQueryUnknownEvent(event) for bq_type, bf_type in event_map.items(): if isinstance(event, bq_type): - bf_event = bf_type.from_bqclient(event, progress_bar=progress_bar) # type: ignore + bf_event = bf_type.from_bqclient(event) # type: ignore break publisher.publish(bf_event) diff --git a/packages/bigframes/tests/unit/test_formatting_helpers.py b/packages/bigframes/tests/unit/test_formatting_helpers.py index 67be9398d241..d539e6d5bee5 100644 --- a/packages/bigframes/tests/unit/test_formatting_helpers.py +++ b/packages/bigframes/tests/unit/test_formatting_helpers.py @@ -214,19 +214,6 @@ def test_get_job_url(): assert actual_url == expected_url -def test_progress_callback_respects_event_progress_bar(): - event = bfevents.BigQuerySentEvent( - query="SELECT * FROM my_table", - progress_bar=None, - ) - - with mock.patch("bigframes._config.options.display.progress_bar", "terminal"): - with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False): - with mock.patch("builtins.print") as mock_print: - formatting_helpers.progress_callback(event) - mock_print.assert_not_called() - - def test_progress_callback_falls_back_to_global(): event = bfevents.BigQuerySentEvent( query="SELECT * FROM my_table", From 63778404c4d8e1b41393b059976ea6646913b7a9 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 11 May 2026 23:15:31 +0000 Subject: [PATCH 19/21] refactor: use EventEnvelope for progress_bar context --- packages/bigframes/bigframes/core/events.py | 14 +++++++++++--- .../bigframes/bigframes/formatting_helpers.py | 7 +++++-- .../session/_io/bigquery/__init__.py | 19 ++++++++++++++----- .../bigframes/bigframes/session/metrics.py | 7 ++++++- .../tests/unit/test_formatting_helpers.py | 16 +++++++++++++++- 5 files changed, 51 insertions(+), 12 deletions(-) diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 80f82297deae..3b3e6013f7a3 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -30,7 +30,9 @@ class Subscriber: - def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): # noqa: E501 + def __init__( + self, callback: Callable[[Event], None], *, publisher: Publisher + ): # noqa: E501 self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() @@ -83,16 +85,22 @@ def unsubscribe(self, subscriber: Subscriber): with self._subscribers_lock: self._subscribers.remove(subscriber) - def publish(self, event: Event): + def publish(self, envelope: "EventEnvelope"): with self._subscribers_lock: for subscriber in self._subscribers: - subscriber(event) + subscriber(envelope) class Event: pass +@dataclasses.dataclass(frozen=True) +class EventEnvelope: + event: Event + progress_bar: Literal["default", "auto", "notebook", "terminal"] | None = None + + @dataclasses.dataclass(frozen=True) class SessionClosed(Event): session_id: str diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py index cef14d39a3f6..ce0112a74a50 100644 --- a/packages/bigframes/bigframes/formatting_helpers.py +++ b/packages/bigframes/bigframes/formatting_helpers.py @@ -138,7 +138,7 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]): def progress_callback( - event: bigframes.core.events.Event, + envelope: bigframes.core.events.EventEnvelope, ): """Displays a progress bar while the query is running""" global current_display_id @@ -152,7 +152,10 @@ def progress_callback( # This will allow cleanup to continue. return - progress_bar = bigframes._config.options.display.progress_bar + event = envelope.event + progress_bar = envelope.progress_bar + if progress_bar == bigframes.core.events._DEFAULT: + progress_bar = bigframes._config.options.display.progress_bar if progress_bar == "auto": progress_bar = "notebook" if in_ipython() else "terminal" diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index 9db639161844..ed80c3f3f08f 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -245,6 +245,10 @@ def add_and_trim_labels(job_config, session=None): def create_bq_event_callback(publisher): + import bigframes._config + + progress_bar = bigframes._config.options.display.progress_bar + event_map = { google.cloud.bigquery._job_helpers.QueryFinishedEvent: bigframes.core.events.BigQueryFinishedEvent, google.cloud.bigquery._job_helpers.QueryReceivedEvent: bigframes.core.events.BigQueryReceivedEvent, @@ -258,7 +262,8 @@ def publish_bq_event(event): if isinstance(event, bq_type): bf_event = bf_type.from_bqclient(event) # type: ignore break - publisher.publish(bf_event) + envelope = bigframes.core.events.EventEnvelope(event=bf_event, progress_bar=progress_bar) + publisher.publish(envelope) return publish_bq_event @@ -276,7 +281,8 @@ def start_query_with_client( query_with_job: Literal[True], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: + ... @overload @@ -292,7 +298,8 @@ def start_query_with_client( query_with_job: Literal[False], publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + ... @overload @@ -309,7 +316,8 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: + ... @overload @@ -326,7 +334,8 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + ... def start_query_with_client( diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py index d2682bbcaf7f..206d3da2f4d8 100644 --- a/packages/bigframes/bigframes/session/metrics.py +++ b/packages/bigframes/bigframes/session/metrics.py @@ -236,13 +236,18 @@ def count_job_stats( exec_seconds=exec_seconds, ) - def on_event(self, event: Any): + def on_event(self, envelope: Any): try: import bigframes.core.events from bigframes.session.executor import LocalExecuteResult except ImportError: return + if isinstance(envelope, bigframes.core.events.EventEnvelope): + event = envelope.event + else: + event = envelope + if isinstance(event, bigframes.core.events.ExecutionFinished): if event.result and isinstance(event.result, LocalExecuteResult): self.execution_count += 1 diff --git a/packages/bigframes/tests/unit/test_formatting_helpers.py b/packages/bigframes/tests/unit/test_formatting_helpers.py index d539e6d5bee5..a90d372e9dbe 100644 --- a/packages/bigframes/tests/unit/test_formatting_helpers.py +++ b/packages/bigframes/tests/unit/test_formatting_helpers.py @@ -218,9 +218,23 @@ def test_progress_callback_falls_back_to_global(): event = bfevents.BigQuerySentEvent( query="SELECT * FROM my_table", ) + envelope = bfevents.EventEnvelope(event=event, progress_bar=bfevents._DEFAULT) with mock.patch("bigframes._config.options.display.progress_bar", "terminal"): with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False): with mock.patch("builtins.print") as mock_print: - formatting_helpers.progress_callback(event) + formatting_helpers.progress_callback(envelope) mock_print.assert_called_once() + + +def test_progress_callback_respects_envelope_progress_bar(): + event = bfevents.BigQuerySentEvent( + query="SELECT * FROM my_table", + ) + envelope = bfevents.EventEnvelope(event=event, progress_bar=None) + + with mock.patch("bigframes._config.options.display.progress_bar", "terminal"): + with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False): + with mock.patch("builtins.print") as mock_print: + formatting_helpers.progress_callback(envelope) + mock_print.assert_not_called() From 55845312dbbfa06df1eb1d3ff46f0b09f2e742ff Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 11 May 2026 23:29:45 +0000 Subject: [PATCH 20/21] style: fix line length lints in formatting_helpers and __init__ --- .../bigframes/bigframes/formatting_helpers.py | 75 ++++++++++++----- .../bigframes/functions/_function_session.py | 8 +- .../bigframes/bigframes/functions/function.py | 12 +-- .../session/_io/bigquery/__init__.py | 81 ++++++++++++++----- 4 files changed, 127 insertions(+), 49 deletions(-) diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py index ce0112a74a50..86e50967e52f 100644 --- a/packages/bigframes/bigframes/formatting_helpers.py +++ b/packages/bigframes/bigframes/formatting_helpers.py @@ -71,7 +71,10 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]): if query_job is None: return "No job information available" if query_job.dry_run: - return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}" + return ( + f"Computation deferred. Computation will process " + f"{get_formatted_bytes(query_job.total_bytes_processed)}" + ) res = "Query Job Info" for key, value in query_job_prop_pairs.items(): job_val = getattr(query_job, value) @@ -105,11 +108,15 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]): if query_job is None: return "No job information available" if query_job.dry_run: - return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}" + return ( + f"Computation deferred. Computation will process " + f"{get_formatted_bytes(query_job.total_bytes_processed)}" + ) # We can reuse the plaintext repr for now or make a nicer table. - # For deferred mode consistency, let's just wrap the text in a pre block or similar, - # but the request implies we want a distinct HTML representation if possible. + # For deferred mode consistency, let's just wrap the text in a pre + # block or similar, but the request implies we want a distinct HTML + # representation if possible. # However, existing repr_query_job returns a simple string. # Let's format it as a simple table or list. @@ -123,7 +130,10 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]): location=query_job.location, job_id=query_job.job_id, ) - res += f'
  • Job: {query_job.job_id}
  • ' + res += ( + f'
  • Job: ' + f"{query_job.job_id}
  • " + ) elif key == "Slot Time": res += f"
  • {key}: {get_formatted_time(job_val)}
  • " elif key == "Bytes Processed": @@ -138,7 +148,7 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]): def progress_callback( - envelope: bigframes.core.events.EventEnvelope, + envelope: Any, ): """Displays a progress bar while the query is running""" global current_display_id @@ -147,13 +157,19 @@ def progress_callback( import bigframes._config import bigframes.core.events except ImportError: - # Since this gets called from __del__, skip if the import fails to avoid + # Since this gets called from __del__, skip if the import fails + # to avoid # ImportError: sys.meta_path is None, Python is likely shutting down. # This will allow cleanup to continue. return - event = envelope.event - progress_bar = envelope.progress_bar + if isinstance(envelope, bigframes.core.events.EventEnvelope): + event = envelope.event + progress_bar = envelope.progress_bar + else: + event = envelope + progress_bar = bigframes.core.events._DEFAULT + if progress_bar == bigframes.core.events._DEFAULT: progress_bar = bigframes._config.options.display.progress_bar @@ -235,7 +251,8 @@ def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None): job.result() job.reload() display.update_display( - display.HTML(get_base_job_loading_html(job)), display_id=display_id + display.HTML(get_base_job_loading_html(job)), + display_id=display_id, ) elif progress_bar == "terminal": inital_loading_bar = get_base_job_loading_string(job) @@ -289,7 +306,10 @@ def render_job_link_html( job_id=job_id, ) if job_url: - job_link = f' [Job {project_id}:{location}.{job_id} details]' + job_link = ( + f' [' + f"Job {project_id}:{location}.{job_id} details]" + ) else: job_link = "" return job_link @@ -326,7 +346,10 @@ def get_job_url( """ if project_id is None or location is None or job_id is None: return None - return f"""https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults""" + return ( + f"https://console.cloud.google.com/bigquery?project={project_id}" + f"&j=bq:{location}:{job_id}&page=queryresults" + ) def render_bqquery_sent_event_html( @@ -351,7 +374,10 @@ def render_bqquery_sent_event_html( job_id=event.job_id, request_id=event.request_id, ) - query_text_details = f"
    SQL
    {html.escape(event.query)}
    " + query_text_details = ( + f"
    SQL
    "
    +        f"{html.escape(event.query)}
    " + ) return f""" Query started{query_id}.{job_link}{query_text_details} @@ -400,7 +426,10 @@ def render_bqquery_retry_event_html( job_id=event.job_id, request_id=event.request_id, ) - query_text_details = f"
    SQL
    {html.escape(event.query)}
    " + query_text_details = ( + f"
    SQL
    "
    +        f"{html.escape(event.query)}
    " + ) return f""" Retrying query{query_id}.{job_link}{query_text_details} @@ -446,7 +475,10 @@ def render_bqquery_received_event_html( query_plan_details = "" if event.query_plan: plan_str = "\n".join([str(entry) for entry in event.query_plan]) - query_plan_details = f"
    Query Plan
    {html.escape(plan_str)}
    " + query_plan_details = ( + f"
    Query Plan
    "
    +            f"{html.escape(plan_str)}
    " + ) return f""" Query{query_id} is {event.state}.{job_link}{query_plan_details} @@ -509,7 +541,9 @@ def render_bqquery_finished_event_plaintext( bytes_str = "" if event.total_bytes_processed is not None: - bytes_str = f" {humanize.naturalsize(event.total_bytes_processed)} processed." + bytes_str = ( + f" {humanize.naturalsize(event.total_bytes_processed)} processed." + ) slot_time_str = "" if event.slot_millis is not None: @@ -575,7 +609,9 @@ def get_formatted_time(val): Duration string """ try: - return humanize.naturaldelta(datetime.timedelta(milliseconds=float(val))) + return humanize.naturaldelta( + datetime.timedelta(milliseconds=float(val)) + ) except Exception: return val @@ -594,7 +630,10 @@ def get_formatted_bytes(val): def get_bytes_processed_string(val: Any): - """Try to get bytes processed string. Return empty if passed non int value""" + """Try to get bytes processed string. + + Return empty if passed non int value. + """ bytes_processed_string = "" if isinstance(val, int): bytes_processed_string = f"""{get_formatted_bytes(val)} processed. """ diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index d715c650486d..b4773b93e2fa 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -20,18 +20,18 @@ import inspect import sys import threading +import warnings from typing import ( + TYPE_CHECKING, Any, - cast, Dict, Literal, Mapping, Optional, Sequence, - TYPE_CHECKING, Union, + cast, ) -import warnings import google.api_core.exceptions from google.cloud import ( @@ -41,9 +41,9 @@ resourcemanager_v3, ) -from bigframes import clients import bigframes.exceptions as bfe import bigframes.formatting_helpers as bf_formatting +from bigframes import clients from bigframes.functions import function as bq_functions from bigframes.functions import udf_def diff --git a/packages/bigframes/bigframes/functions/function.py b/packages/bigframes/bigframes/functions/function.py index f197579113ae..ac6d3c541dbd 100644 --- a/packages/bigframes/bigframes/functions/function.py +++ b/packages/bigframes/bigframes/functions/function.py @@ -14,10 +14,9 @@ from __future__ import annotations -import logging -from typing import Callable, Optional, Protocol, runtime_checkable, TYPE_CHECKING - import dataclasses +import logging +from typing import TYPE_CHECKING, Callable, Optional, Protocol, runtime_checkable import google.api_core.exceptions from google.cloud import bigquery @@ -28,8 +27,8 @@ if TYPE_CHECKING: import bigframes.core.col - from bigframes.session import Session import bigframes.series + from bigframes.session import Session logger = logging.getLogger(__name__) @@ -163,7 +162,8 @@ class Udf(Protocol): """ @property - def udf_def(self) -> udf_def.BigqueryUdf: ... + def udf_def(self) -> udf_def.BigqueryUdf: + ... class BigqueryCallableRoutine: @@ -192,8 +192,8 @@ def __call__(self, *args, **kwargs): if self._local_fun: return self._local_fun(*args, **kwargs) # avoid circular imports - from bigframes.core.compile.sqlglot import sql as sg_sql import bigframes.session._io.bigquery as bf_io_bigquery + from bigframes.core.compile.sqlglot import sql as sg_sql args_string = ", ".join([sg_sql.to_sql(sg_sql.literal(v)) for v in args]) sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})" diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index ed80c3f3f08f..b6e3b72af7fc 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -22,7 +22,16 @@ import textwrap import types import typing -from typing import Dict, Iterable, Literal, Mapping, Optional, Tuple, Union, overload +from typing import ( + Dict, + Iterable, + Literal, + Mapping, + Optional, + Tuple, + Union, + overload, +) import bigframes_vendored.google_cloud_bigquery.retry as third_party_gcb_retry import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq @@ -38,7 +47,10 @@ from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.logging import log_adapter -CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." +CHECK_DRIVE_PERMISSIONS = ( + "\nCheck https://cloud.google.com/bigquery/docs/" + "query-drive-data#Google_Drive_permissions." +) IO_ORDERING_ID = "bqdf_row_nums" @@ -85,7 +97,10 @@ def create_job_configs_labels( def create_export_data_statement( - table_id: str, uri: str, format: str, export_options: Dict[str, Union[bool, str]] + table_id: str, + uri: str, + format: str, + export_options: Dict[str, Union[bool, str]], ) -> str: all_options: Dict[str, Union[bool, str]] = { "uri": uri, @@ -142,7 +157,8 @@ def create_temp_table( destination.encryption_configuration = bigquery.EncryptionConfiguration( kms_key_name=kms_key ) - # Ok if already exists, since this will only happen from retries internal to this method + # Ok if already exists, since this will only happen from retries + # internal to this method # as the requested table id has a random UUID4 component. bqclient.create_table(destination, exists_ok=True) return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" @@ -165,7 +181,8 @@ def create_temp_view( destination.expires = expiration destination.view_query = sql - # Ok if already exists, since this will only happen from retries internal to this method + # Ok if already exists, since this will only happen from retries + # internal to this method # as the requested table id has a random UUID4 component. bqclient.create_table(destination, exists_ok=True) return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" @@ -199,7 +216,10 @@ def bq_field_to_type_sql(field: bigquery.SchemaField): if field.mode == "REPEATED": nested_type = bq_field_to_type_sql( bigquery.SchemaField( - field.name, field.field_type, mode="NULLABLE", fields=field.fields + field.name, + field.field_type, + mode="NULLABLE", + fields=field.fields, ) ) return f"ARRAY<{nested_type}>" @@ -232,8 +252,9 @@ def format_option(key: str, value: Union[bool, str]) -> str: def add_and_trim_labels(job_config, session=None): """ - Add additional labels to the job configuration and trim the total number of labels - to ensure they do not exceed MAX_LABELS_COUNT labels per job. + Add additional labels to the job configuration and trim the total + number of labels to ensure they do not exceed MAX_LABELS_COUNT labels + per job. """ api_methods = log_adapter.get_and_reset_api_methods( dry_run=job_config.dry_run, session=session @@ -250,10 +271,18 @@ def create_bq_event_callback(publisher): progress_bar = bigframes._config.options.display.progress_bar event_map = { - google.cloud.bigquery._job_helpers.QueryFinishedEvent: bigframes.core.events.BigQueryFinishedEvent, - google.cloud.bigquery._job_helpers.QueryReceivedEvent: bigframes.core.events.BigQueryReceivedEvent, - google.cloud.bigquery._job_helpers.QueryRetryEvent: bigframes.core.events.BigQueryRetryEvent, - google.cloud.bigquery._job_helpers.QuerySentEvent: bigframes.core.events.BigQuerySentEvent, + google.cloud.bigquery._job_helpers.QueryFinishedEvent: ( + bigframes.core.events.BigQueryFinishedEvent + ), + google.cloud.bigquery._job_helpers.QueryReceivedEvent: ( + bigframes.core.events.BigQueryReceivedEvent + ), + google.cloud.bigquery._job_helpers.QueryRetryEvent: ( + bigframes.core.events.BigQueryRetryEvent + ), + google.cloud.bigquery._job_helpers.QuerySentEvent: ( + bigframes.core.events.BigQuerySentEvent + ), } def publish_bq_event(event): @@ -262,7 +291,9 @@ def publish_bq_event(event): if isinstance(event, bq_type): bf_event = bf_type.from_bqclient(event) # type: ignore break - envelope = bigframes.core.events.EventEnvelope(event=bf_event, progress_bar=progress_bar) + envelope = bigframes.core.events.EventEnvelope( + event=bf_event, progress_bar=progress_bar + ) publisher.publish(envelope) return publish_bq_event @@ -352,7 +383,7 @@ def start_query_with_client( # google-cloud-bigquery version with # https://github.com/googleapis/python-bigquery/pull/2256 merged, likely # version 3.36.0 or later. - job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY, + job_retry: google.api_core.retry.Retry = (third_party_gcb_retry.DEFAULT_JOB_RETRY), publisher: bigframes.core.events.Publisher, session=None, ) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: @@ -425,7 +456,9 @@ def start_query_with_client( def delete_tables_matching_session_id( - client: bigquery.Client, dataset: bigquery.DatasetReference, session_id: str + client: bigquery.Client, + dataset: bigquery.DatasetReference, + session_id: str, ) -> None: """Searches within the dataset for tables conforming to the expected session_id form, and instructs bigquery to delete them. @@ -479,7 +512,8 @@ def create_bq_dataset_reference( The project id of the project to create the dataset in. Returns: - bigquery.DatasetReference: The constructed reference to the anonymous dataset. + bigquery.DatasetReference: The constructed reference to the + anonymous dataset. """ job_config = google.cloud.bigquery.QueryJobConfig() @@ -513,7 +547,8 @@ def is_query(query_or_table: str) -> bool: def is_table_with_wildcard_suffix(query_or_table: str) -> bool: - """Determine if `query_or_table` is a table and contains a wildcard suffix.""" + """Determine if `query_or_table` is a table and contains a wildcard + suffix.""" return not is_query(query_or_table) and query_or_table.endswith("*") @@ -529,7 +564,8 @@ def to_query( from_item = f"({query_or_table})" else: # Table ID can have 1, 2, 3, or 4 parts. Quoting all parts to be safe. - # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#identifiers + # See: + # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#identifiers parts = query_or_table.split(".") from_item = ".".join(f"`{part}`" for part in parts) @@ -579,7 +615,8 @@ def compile_filters(filters: third_party_pandas_gbq.FiltersType) -> str: "!=": "!=", } - # If single layer filter, add another pseudo layer. So the single layer represents "and" logic. + # If single layer filter, add another pseudo layer. So the single + # layer represents "and" logic. filters_list: list = list(filters) if isinstance(filters_list[0], tuple) and ( len(filters_list[0]) == 0 or not isinstance(list(filters_list[0])[0], tuple) @@ -596,14 +633,16 @@ def compile_filters(filters: third_party_pandas_gbq.FiltersType) -> str: for filter_item in group: if not isinstance(filter_item, tuple) or (len(filter_item) != 3): raise ValueError( - f"Elements of filters must be tuples of length 3, but got {repr(filter_item)}.", + f"Elements of filters must be tuples of length 3, " + f"but got {repr(filter_item)}.", ) column, operator, value = filter_item if not isinstance(column, str): raise ValueError( - f"Column name should be a string, but received '{column}' of type {type(column).__name__}." + f"Column name should be a string, but received " + f"'{column}' of type {type(column).__name__}." ) if operator not in valid_operators: From d372472b339a93cc7f1cbf3f4668ee56363d5d88 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Tue, 12 May 2026 02:47:26 +0000 Subject: [PATCH 21/21] format file --- packages/bigframes/bigframes/formatting_helpers.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py index 86e50967e52f..1676d6dc35a1 100644 --- a/packages/bigframes/bigframes/formatting_helpers.py +++ b/packages/bigframes/bigframes/formatting_helpers.py @@ -541,9 +541,8 @@ def render_bqquery_finished_event_plaintext( bytes_str = "" if event.total_bytes_processed is not None: - bytes_str = ( - f" {humanize.naturalsize(event.total_bytes_processed)} processed." - ) + size_str = humanize.naturalsize(event.total_bytes_processed) + bytes_str = f" {size_str} processed." slot_time_str = "" if event.slot_millis is not None: @@ -609,9 +608,8 @@ def get_formatted_time(val): Duration string """ try: - return humanize.naturaldelta( - datetime.timedelta(milliseconds=float(val)) - ) + delta = datetime.timedelta(milliseconds=float(val)) + return humanize.naturaldelta(delta) except Exception: return val