Skip to content

V2 rewrite (beta): DuckDB Engine Support with Benchmark#255

Open
chenliu0831 wants to merge 14 commits into
v2_rewritefrom
v2_engine
Open

V2 rewrite (beta): DuckDB Engine Support with Benchmark#255
chenliu0831 wants to merge 14 commits into
v2_rewritefrom
v2_engine

Conversation

@chenliu0831
Copy link
Copy Markdown
Contributor

@chenliu0831 chenliu0831 commented Jan 20, 2026

Issue #, if available: #128

Description of changes:

Adds DuckDB as a lightweight, JVM-free backend for PyDeequ 2.0 with optional dependency installation support. The overall design is inspired by DuckDQ project mentioned in #128 (actually most credit needs to go to that project). The stateful aggregation for streaming DQ monitoring is not implemented yet (i.e. MetricsRepository).

Other notable changes:

  • Restructured pyproject.toml to support optional dependencies. pip install pydeequ[duckdb] - DuckDB backend (no JVM required). Core package now has minimal dependencies (numpy, pandas, protobuf)
  • Engine Parity tests between Spark and DuckDB engine. Some HLL/quantile differences exists because of algorithm difference. More details in Engines.md
  • Benchmark tooling.
  • Comprehensive test suite.

See https://github.com/awslabs/python-deequ/blob/v2_engine/README.md and https://github.com/awslabs/python-deequ/blob/v2_engine/docs/architecture.md for more background.

Benchmark

See https://github.com/awslabs/python-deequ/blob/v2_engine/BENCHMARK.md for more details.

benchmark_chart

chenliu0831 and others added 10 commits January 19, 2026 22:13
- All runners take engine in constructor, onData(table=, dataframe=) for data binding
- Spark protobuf builders become private (_Spark*RunBuilder)
- Refactor connect() to use isinstance() instead of string matching
- Fix ApproxQuantile alias collision and metric name mismatch
- Scale up test datasets for reliable HLL tolerance validation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Poetry 1.7.1 doesn't support PEP 621 [project] table format,
causing 'name' parse error. Replace with uv pip install.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Handle Spark Connect session type in connect() (separate class
  from pyspark.sql.SparkSession)
- Remove manual server start/stop from CI; conftest fixture handles it
- Accept NaN for non-numeric profile stats from Spark

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Generated by AI (model: us.anthropic.claude-opus-4-6-v1, prompt: 8c93b14f) — may not be fully accurate. Reply if this doesn't help.

Additional feedback:

tests/helpers/spark_server.py:42 — Same hardcoded developer-machine-specific paths as benchmark/config.py. The spark_home default /Volumes/workplace/deequ_rewrite/spark-3.5.0-bin-hadoop3 and deequ_jar default will fail for any other developer. Consider requiring these via environment variables with no fallback, or raising a clear error when not set.

pydeequ/engines/operators/scan_operators.py:21SizeOperator does not inherit from ScanOperator — it inherits directly from the mixins. This means it doesn't have the column attribute or the abstract method contracts from ScanOperator. While it works because it implements the same interface manually, this breaks the type hierarchy. Consider either inheriting from ScanOperator (passing column='*') or documenting why it's intentionally different.

pydeequ/engines/operators/scan_operators.py:355ComplianceOperator also does not inherit from ScanOperator, same issue as SizeOperator. It inherits directly from the mixins, bypassing the base class contract.

pydeequ/engines/operators/scan_operators.py:393CorrelationOperator does not inherit from ScanOperator either. Same structural inconsistency.

pydeequ/v2/profiles.py:100 — The EngineColumnProfilerRunBuilder has no withKLLProfiling() or setKLLParameters() methods, but the ColumnProfilerRunner docstring (line 86) shows .withKLLProfiling() as an example. Users following the docstring will get an AttributeError. Either add stub methods that raise NotImplementedError with a clear message, or update the docstring.

pydeequ/v2/verification.py:97EngineVerificationRunBuilder has an addAnalyzer method but run() only calls self._engine.run_checks(self._checks) — the self._analyzers list is collected but never used. Either remove addAnalyzer or pass analyzers to the engine.

pydeequ/engines/suggestions/registry.py:82_register_default_rules() is called at module import time, populating the class-level _rules list. But _rules is defined as a class variable _rules: List[SuggestionRule] = []. If _register_default_rules() is called multiple times (e.g., during test reloads), rules will be duplicated. Add a guard or use clear() before registering.

tests/engines/fixtures/datasets.py:62create_df_missing generates 1200 rows with att1 having a pattern of 2 non-null then 2 null (50% complete), but the EXPECTED_VALUES dict on line 432 says ("df_missing", "Completeness", "att1"): 0.5 and the comment says # 6/12. The comment 6/12 is stale from an older 12-row version — the actual ratio is 600/1200. Update the comment.

Comment thread benchmark/config.py Outdated
Comment thread pydeequ/engines/__init__.py
Comment thread pydeequ/engines/__init__.py
Comment thread pydeequ/engines/duckdb.py
Comment thread pydeequ/engines/constraints/base.py
Comment thread pydeequ/engines/constraints/batch_evaluator.py
Review fixes: drop duplicate enum aliases, require env vars instead of
hardcoded paths, register a view for read_parquet, use epsilon comparison
for the default evaluator, fix scan operator inheritance, drop dead
addAnalyzer, guard duplicate rule registration.

Architecture: add DuckDBEngine.for_dataframe, compress shallow evaluators
behind AnalyzerEvaluator, split compute_metrics into per-family planners,
move operator construction into the factory registry, collapse the engine
run-builders into runners (with aliases for backwards compat).

Update v2 example script to use pandas DataFrame API.
Copy link
Copy Markdown

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Generated by AI (model: us.anthropic.claude-opus-4-6-v1, prompt: 416310f3) — may not be fully accurate. Reply if this doesn't help.

Additional feedback:

pydeequ/engines/operators/factory.py:56BUG: The _by_column builder passes getattr(an, "column", None) to operators like EntropyOperator which is a GroupingOperator expecting a single string column. However, GroupingOperator.__init__ takes columns: List[str]. Passing a single string (or None) instead of a list will break EntropyOperator when created via this factory path.

In base.py line 108: class GroupingOperator has def __init__(self, columns: List[str], ...). The _by_column helper returns lambda an: operator_cls(getattr(an, "column", None), where=...). In the registry: "Entropy": _by_column(EntropyOperator). So EntropyOperator("some_col", where=None) is called, but EntropyOperator inherits from GroupingOperator which expects columns to be a List[str]. Looking at grouping_operators.py (referenced in __init__.py), EntropyOperator likely takes a single column string in its own __init__ (since it's listed separately from multi-column operators), but the base class signature mismatch means self.columns would be set to a string rather than a list, breaking get_grouping_columns() and instance property which does ",".join(self.columns) — joining characters of the string.

pydeequ/engines/suggestions/registry.py:55EDGE_CASE: The deduplication check any(type(r) is rule_type for r in cls._rules) uses is identity check on types, which works for normal classes but would fail for dynamically-created classes or mocks. More importantly, if _rules is a class variable shared across all instances/imports, and the module is imported from different paths (e.g., relative vs absolute), the type(r) is rule_type check could fail because the class objects would be different. However, this is a minor concern — the real issue is that this guard silently swallows legitimate re-registrations if someone intentionally wants to replace a rule.

Line 55: if any(type(r) is rule_type for r in cls._rules): return. The docstring says this prevents duplicates during pytest reloads, which is a valid use case. But there's no way to force re-registration or replace a rule if needed.

pydeequ/v2/verification.py:90DESIGN: Collapsing the runner and builder into a single mutable class means calling onData() mutates self._engine in place. If a user reuses the same VerificationSuite instance with different tables (e.g., suite.onData(table='a').run() then suite.onData(table='b').run()), the second call overwrites the engine from the first. This is fine for the fluent one-shot pattern shown in examples, but surprising if someone stores the suite object and reuses it.

Line 100: self._engine = _bind_engine(self._engine, table=table, dataframe=dataframe) mutates the instance. Similarly, self._checks accumulates across calls to addCheck without being reset between run() calls.

pydeequ/v2/verification.py:106DESIGN: The VerificationSuite no longer has addAnalyzer (it was removed in this refactor), but the backwards-compatible alias EngineVerificationRunBuilder = VerificationSuite means any code that previously called EngineVerificationRunBuilder.addAnalyzer() will now get an AttributeError. The old class had both addCheck and addAnalyzer.

The incremental diff removes addAnalyzer from the class (previously on EngineVerificationRunBuilder). The alias at line 145 (EngineVerificationRunBuilder = VerificationSuite) means existing callers of .addAnalyzer() will break.

@chenliu0831
Copy link
Copy Markdown
Contributor Author

Generated by AI (model: us.anthropic.claude-opus-4-6-v1, prompt: 416310f3) — may not be fully accurate. Reply if this doesn't help.

Additional feedback:

pydeequ/engines/operators/factory.py:56BUG: The _by_column builder passes getattr(an, "column", None) to operators like EntropyOperator which is a GroupingOperator expecting a single string column. However, GroupingOperator.__init__ takes columns: List[str]. Passing a single string (or None) instead of a list will break EntropyOperator when created via this factory path.

In base.py line 108: class GroupingOperator has def __init__(self, columns: List[str], ...). The _by_column helper returns lambda an: operator_cls(getattr(an, "column", None), where=...). In the registry: "Entropy": _by_column(EntropyOperator). So EntropyOperator("some_col", where=None) is called, but EntropyOperator inherits from GroupingOperator which expects columns to be a List[str]. Looking at grouping_operators.py (referenced in __init__.py), EntropyOperator likely takes a single column string in its own __init__ (since it's listed separately from multi-column operators), but the base class signature mismatch means self.columns would be set to a string rather than a list, breaking get_grouping_columns() and instance property which does ",".join(self.columns) — joining characters of the string.

This is not a valid finding.

pydeequ/engines/suggestions/registry.py:55EDGE_CASE: The deduplication check any(type(r) is rule_type for r in cls._rules) uses is identity check on types, which works for normal classes but would fail for dynamically-created classes or mocks. More importantly, if _rules is a class variable shared across all instances/imports, and the module is imported from different paths (e.g., relative vs absolute), the type(r) is rule_type check could fail because the class objects would be different. However, this is a minor concern — the real issue is that this guard silently swallows legitimate re-registrations if someone intentionally wants to replace a rule.

Line 55: if any(type(r) is rule_type for r in cls._rules): return. The docstring says this prevents duplicates during pytest reloads, which is a valid use case. But there's no way to force re-registration or replace a rule if needed.

Not worth fix.

pydeequ/v2/verification.py:106DESIGN: The VerificationSuite no longer has addAnalyzer (it was removed in this refactor), but the backwards-compatible alias EngineVerificationRunBuilder = VerificationSuite means any code that previously called EngineVerificationRunBuilder.addAnalyzer() will now get an AttributeError. The old class had both addCheck and addAnalyzer.

The incremental diff removes addAnalyzer from the class (previously on EngineVerificationRunBuilder). The alias at line 145 (EngineVerificationRunBuilder = VerificationSuite) means existing callers of .addAnalyzer() will break.

No need to think about breaking change for now.

pydeequ/v2/verification.py:90DESIGN: Collapsing the runner and builder into a single mutable class means calling onData() mutates self._engine in place. If a user reuses the same VerificationSuite instance with different tables (e.g., suite.onData(table='a').run() then suite.onData(table='b').run()), the second call overwrites the engine from the first. This is fine for the fluent one-shot pattern shown in examples, but surprising if someone stores the suite object and reuses it.

Line 100: self._engine = _bind_engine(self._engine, table=table, dataframe=dataframe) mutates the instance. Similarly, self._checks accumulates across calls to addCheck without being reset between run() calls.

This seems valid.

After the run-builder collapse, calling onData() twice on the same
VerificationSuite/AnalysisRunner/ColumnProfilerRunner/ConstraintSuggestionRunner
mutated state in place, so reusing a suite across tables leaked checks
and engine bindings between runs. Restore the pre-refactor semantics by
returning a fresh runner from onData() with the engine rebound.
Copy link
Copy Markdown

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Generated by AI (model: us.anthropic.claude-opus-4-6-v1, prompt: 416310f3) — may not be fully accurate. Reply if this doesn't help.

Additional feedback:

pydeequ/v2/verification.py:108BUG: onData() now returns a fresh VerificationSuite, but addCheck() mutates self._checks on that new instance. Since the new suite is created via VerificationSuite(engine), its _checks list starts empty. This is correct for the fluent pattern VerificationSuite(engine).onData(...).addCheck(...).run(). However, if a user adds checks before calling onData(), those checks are lost because onData() returns a brand-new suite without copying self._checks.

Line 108: return VerificationSuite(_bind_engine(...)) creates a new instance. The constructor (visible in full_source_files) initializes self._checks = []. So suite = VerificationSuite(engine); suite.addCheck(check); result = suite.onData(table='t').run() — the check is lost because onData returns a new suite with an empty _checks list.

pydeequ/v2/verification.py:148BUG: Same issue as VerificationSuite.onData(): AnalysisRunner.onData() returns a fresh AnalysisRunner instance, discarding any analyzers added before onData() is called. The pattern runner.addAnalyzer(Size()).onData(table='t').run() loses the analyzer.

Line 148: return AnalysisRunner(_bind_engine(...)) creates a new instance with self._analyzers = []. Any analyzers added via addAnalyzer() before onData() are on the old instance and not carried over.

pydeequ/v2/profiles.py:106BUG: Same issue: ColumnProfilerRunner.onData() returns a fresh runner, discarding any state set before onData() (e.g., restrictToColumns, withLowCardinalityHistogramThreshold). The pattern ColumnProfilerRunner(engine).restrictToColumns(['a']).onData(table='t').run() loses the column restriction.

Line 106: return ColumnProfilerRunner(bound) creates a new instance. The new runner's _restrict_to_columns and _low_cardinality_threshold are reset to defaults (None/0). Any prior calls to restrictToColumns() or withLowCardinalityHistogramThreshold() on the original instance are lost.

pydeequ/v2/suggestions.py:102BUG: Same issue: ConstraintSuggestionRunner.onData() returns a fresh runner, discarding any rules added via addConstraintRules() before onData(). The pattern ConstraintSuggestionRunner(engine).addConstraintRules(Rules.DEFAULT).onData(table='t').run() loses the rules.

Line 102: return ConstraintSuggestionRunner(bound) creates a new instance with self._rules = []. Any rules added before onData() are on the old instance.

After making onData() return a fresh runner, calls made before onData
(addCheck, addAnalyzer, restrictToColumns, withLowCardinality,
addConstraintRules) silently dropped the configured state. Copy the
relevant lists/flags into the new instance so call order doesn't matter.
Copy link
Copy Markdown

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Generated by AI (model: us.anthropic.claude-opus-4-6-v1, prompt: 416310f3) — may not be fully accurate. Reply if this doesn't help.

Additional feedback:

pydeequ/v2/verification.py:118DESIGN: The onData() method now copies self._checks into the new suite, which fixes the earlier feedback about state not being carried over. However, this creates a shallow copy of the list but not of the Check objects themselves. If a user mutates a Check object after calling onData() (e.g., adding more constraints to it), the mutation will affect both the original and the bound suite since they share the same Check references. This is likely acceptable for the fluent API pattern but worth documenting.

Line 118: bound._checks = list(self._checks) creates a new list but Check objects are shared references. The Check class accumulates _constraints via addCheck pattern in checks.py.

pydeequ/v2/suggestions.py:110EDGE_CASE: bound._rules = list(self._rules) copies the list, but if _rules contains mutable Rules enum values this is fine. However, if a user calls addConstraintRules on the original runner after onData(), the bound runner won't see those additions. This is the correct behavior for the documented pattern but differs from the VerificationSuite where addCheck is typically called before onData. The docstring says "Rules and column restrictions set before onData are carried" which is accurate.

Line 110: bound._rules = list(self._rules) and line 111: bound._restrict_to_columns = self._restrict_to_columns. The docstring at line 94-97 correctly documents this behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant