diff --git a/misc/python/materialize/checks/all_checks/iceberg_sink.py b/misc/python/materialize/checks/all_checks/iceberg_sink.py new file mode 100644 index 0000000000000..91d7c47a3c571 --- /dev/null +++ b/misc/python/materialize/checks/all_checks/iceberg_sink.py @@ -0,0 +1,534 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +from textwrap import dedent + +from materialize.checks.actions import Testdrive +from materialize.checks.checks import Check, externally_idempotent +from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD +from materialize.checks.executors import Executor +from materialize.mz_version import MzVersion + + +def schemas() -> str: + return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + + +def schemas_null() -> str: + return dedent(""" + $ set keyschema={ + "type": "record", + "name": "Key", + "fields": [ + {"name": "key1", "type": "string"} + ] + } + + $ set schema={ + "type" : "record", + "name" : "test", + "fields" : [ + {"name":"f1", "type":["null", "string"]}, + {"name":"f2", "type":["long", "null"]} + ] + } + """) + + +@externally_idempotent(False) +class IcebergSinkUpsert(Check): + def _can_run(self, e: Executor) -> bool: + return self.base_version >= MzVersion.parse_mz("v26.10.0-dev") + + def initialize(self) -> Testdrive: + return Testdrive(schemas() + dedent(""" + $ kafka-create-topic topic=iceberg-sink-source + + $ kafka-ingest format=avro key-format=avro topic=iceberg-sink-source key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "U2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"} + + $ kafka-ingest format=avro key-format=avro topic=iceberg-sink-source key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "D2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"} + + $ kafka-ingest format=avro key-format=avro topic=iceberg-sink-source key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "U3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"} + + $ kafka-ingest format=avro key-format=avro topic=iceberg-sink-source key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "D3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"} + + > CREATE SOURCE iceberg_sink_source_src + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-iceberg-sink-source-${testdrive.seed}') + > CREATE TABLE iceberg_sink_source FROM SOURCE iceberg_sink_source_src (REFERENCE "testdrive-iceberg-sink-source-${testdrive.seed}") + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE UPSERT + + > CREATE MATERIALIZED VIEW iceberg_sink_source_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM iceberg_sink_source GROUP BY LEFT(key1, 2), LEFT(f1, 1); + + > CREATE SINK iceberg_sink_sink1 FROM iceberg_sink_source_view + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE 'iceberg_sink_sink1' + ) + USING AWS CONNECTION aws_conn + KEY (l_k, l_v) + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + + > SELECT * FROM iceberg_sink_source_view; + D2 A 1000 + D3 A 1000 + U2 A 1000 + U3 A 1000 + + > SELECT messages_committed >= 1 + FROM mz_internal.mz_sink_statistics + JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id + WHERE mz_sinks.name = 'iceberg_sink_sink1'; + true + """)) + + def manipulate(self) -> list[Testdrive]: + return [ + Testdrive(schemas() + dedent(s)) + for s in [ + """ + $ kafka-ingest format=avro key-format=avro topic=iceberg-sink-source key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "I2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"} + {"key1": "U2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"} + {"key1": "D2${kafka-ingest.iteration}"} + + > CREATE SINK iceberg_sink_sink2 FROM iceberg_sink_source_view + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE 'iceberg_sink_sink2' + ) + USING AWS CONNECTION aws_conn + KEY (l_k, l_v) + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + + > SELECT * FROM iceberg_sink_source_view; + I2 B 1000 + U3 A 1000 + U2 B 1000 + D3 A 1000 + + > SELECT messages_committed >= 1 + FROM mz_internal.mz_sink_statistics + JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id + WHERE mz_sinks.name = 'iceberg_sink_sink2'; + true + """, + """ + $ kafka-ingest format=avro key-format=avro topic=iceberg-sink-source key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "I3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"} + {"key1": "U3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"} + {"key1": "D3${kafka-ingest.iteration}"} + + > CREATE SINK iceberg_sink_sink3 FROM iceberg_sink_source_view + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE 'iceberg_sink_sink3' + ) + USING AWS CONNECTION aws_conn + KEY (l_k, l_v) + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + + > SELECT * FROM iceberg_sink_source_view; + I2 B 1000 + I3 C 1000 + U2 B 1000 + U3 C 1000 + + > SELECT messages_committed >= 1 + FROM mz_internal.mz_sink_statistics + JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id + WHERE mz_sinks.name = 'iceberg_sink_sink3'; + true + """, + ] + ] + + def validate(self) -> Testdrive: + return Testdrive(dedent(""" + $ set-sql-timeout duration=60s + > SELECT * FROM iceberg_sink_source_view; + I2 B 1000 + I3 C 1000 + U2 B 1000 + U3 C 1000 + + $ set-from-sql var=iceberg_user + SELECT user FROM iceberg_credentials + + $ set-from-sql var=iceberg_key + SELECT key FROM iceberg_credentials + + $ duckdb-execute name=iceberg + CREATE SECRET s3_secret (TYPE S3, KEY_ID '${iceberg_user}', SECRET '${iceberg_key}', ENDPOINT 'minio:9000', URL_STYLE 'path', USE_SSL false, REGION 'minio'); + SET unsafe_enable_version_guessing = true; + + $ duckdb-query name=iceberg + SELECT * FROM iceberg_scan('s3://test-bucket/default_namespace/iceberg_sink_sink1') ORDER BY 1 + I2 B 1000 + I3 C 1000 + U2 B 1000 + U3 C 1000 + + $ duckdb-query name=iceberg + SELECT * FROM iceberg_scan('s3://test-bucket/default_namespace/iceberg_sink_sink2') ORDER BY 1 + I2 B 1000 + I3 C 1000 + U2 B 1000 + U3 C 1000 + + $ duckdb-query name=iceberg + SELECT * FROM iceberg_scan('s3://test-bucket/default_namespace/iceberg_sink_sink3') ORDER BY 1 + I2 B 1000 + I3 C 1000 + U2 B 1000 + U3 C 1000 + """)) + + +@externally_idempotent(False) +class IcebergSinkTables(Check): + def _can_run(self, e: Executor) -> bool: + return self.base_version >= MzVersion.parse_mz("v26.10.0-dev") + + def initialize(self) -> Testdrive: + return Testdrive(schemas() + dedent(""" + > CREATE TABLE iceberg_sink_large_transaction_table (f1 INTEGER, f2 TEXT, PRIMARY KEY (f1)); + > CREATE DEFAULT INDEX ON iceberg_sink_large_transaction_table; + + > INSERT INTO iceberg_sink_large_transaction_table SELECT generate_series, REPEAT('x', 1024) FROM generate_series(1, 100000); + + # Can be slow with a large transaction + $ set-sql-timeout duration=120s + + > CREATE MATERIALIZED VIEW iceberg_sink_large_transaction_view AS SELECT f1 - 1 AS f1 , f2 FROM iceberg_sink_large_transaction_table; + + > CREATE CLUSTER iceberg_sink_large_transaction_sink_cluster SIZE 'scale=1,workers=4'; + + > CREATE SINK iceberg_sink_large_transaction_sink + IN CLUSTER iceberg_sink_large_transaction_sink_cluster + FROM iceberg_sink_large_transaction_view + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE 'iceberg_sink_large_transaction_sink' + ) + USING AWS CONNECTION aws_conn + KEY (f1) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + + > SELECT messages_committed >= 100000 + FROM mz_internal.mz_sink_statistics + JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id + WHERE mz_sinks.name = 'iceberg_sink_large_transaction_sink'; + true + """)) + + def manipulate(self) -> list[Testdrive]: + return [ + Testdrive(schemas() + dedent(s)) + for s in [ + """ + $ set-sql-timeout duration=120s + + $ set-from-sql var=committed + SELECT messages_committed::text FROM mz_internal.mz_sink_statistics JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id WHERE mz_sinks.name = 'iceberg_sink_large_transaction_sink' + + > UPDATE iceberg_sink_large_transaction_table SET f2 = REPEAT('y', 1024) + + > SELECT messages_committed >= ${committed} + 200000 + FROM mz_internal.mz_sink_statistics + JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id + WHERE mz_sinks.name = 'iceberg_sink_large_transaction_sink'; + true + """, + """ + $ set-sql-timeout duration=120s + + $ set-from-sql var=committed + SELECT messages_committed::text FROM mz_internal.mz_sink_statistics JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id WHERE mz_sinks.name = 'iceberg_sink_large_transaction_sink' + > UPDATE iceberg_sink_large_transaction_table SET f2 = REPEAT('z', 1024) + + > SELECT messages_committed >= ${committed} + 200000 + FROM mz_internal.mz_sink_statistics + JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id + WHERE mz_sinks.name = 'iceberg_sink_large_transaction_sink'; + true + """, + ] + ] + + def validate(self) -> Testdrive: + return Testdrive(dedent(""" + $ set-from-sql var=iceberg_user + SELECT user FROM iceberg_credentials + + $ set-from-sql var=iceberg_key + SELECT key FROM iceberg_credentials + + $ duckdb-execute name=iceberg + CREATE SECRET s3_secret (TYPE S3, KEY_ID '${iceberg_user}', SECRET '${iceberg_key}', ENDPOINT 'minio:9000', URL_STYLE 'path', USE_SSL false, REGION 'minio'); + SET unsafe_enable_version_guessing = true; + + # TODO: How to verify the data? + # 12:1: error: executing DuckDB query: Not implemented Error: Equality deletes need the relevant columns to be selected: Error code 1: Unknown error code + # $ duckdb-query name=iceberg + # SELECT f2, count(*) FROM iceberg_scan('s3://test-bucket/default_namespace/iceberg_sink_large_transaction_sink') GROUP BY f2 + # zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz 100000 + """)) + + +@externally_idempotent(False) +class AlterIcebergSinkMv(Check): + """Check ALTER SINK with materialized views""" + + def _can_run(self, e: Executor) -> bool: + return self.base_version >= MzVersion.parse_mz("v26.10.0-dev") + + def initialize(self) -> Testdrive: + return Testdrive(dedent(""" + > CREATE TABLE table_alter_iceberg_mv1 (a INT); + > INSERT INTO table_alter_iceberg_mv1 VALUES (0) + > CREATE MATERIALIZED VIEW mv_alter_iceberg1 AS SELECT * FROM table_alter_iceberg_mv1 + > CREATE SINK sink_alter_iceberg_mv FROM mv_alter_iceberg1 + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE 'sink_alter_iceberg_mv' + ) + USING AWS CONNECTION aws_conn + KEY (a) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + """)) + + def manipulate(self) -> list[Testdrive]: + return [ + Testdrive(dedent(s)) + for s in [ + """ + > CREATE TABLE table_alter_iceberg_mv2 (a INT); + > CREATE MATERIALIZED VIEW mv_alter_iceberg2 AS SELECT * FROM table_alter_iceberg_mv2 + + $ set-from-sql var=running_count + SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_iceberg_mv'; + + > ALTER SINK sink_alter_iceberg_mv SET FROM mv_alter_iceberg2; + + > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_iceberg_mv'; + true + + > INSERT INTO table_alter_iceberg_mv1 VALUES (10) + > INSERT INTO table_alter_iceberg_mv2 VALUES (11) + """, + """ + > CREATE TABLE table_alter_iceberg_mv3 (a INT); + > CREATE MATERIALIZED VIEW mv_alter_iceberg3 AS SELECT * FROM table_alter_iceberg_mv3 + + $ set-from-sql var=running_count + SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_iceberg_mv'; + + > ALTER SINK sink_alter_iceberg_mv SET FROM mv_alter_iceberg3; + + > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_iceberg_mv'; + true + + > INSERT INTO table_alter_iceberg_mv1 VALUES (100) + > INSERT INTO table_alter_iceberg_mv2 VALUES (101) + > INSERT INTO table_alter_iceberg_mv3 VALUES (102) + """, + ] + ] + + def validate(self) -> Testdrive: + return Testdrive(dedent(""" + $ set-from-sql var=iceberg_user + SELECT user FROM iceberg_credentials + + $ set-from-sql var=iceberg_key + SELECT key FROM iceberg_credentials + + $ duckdb-execute name=iceberg + CREATE SECRET s3_secret (TYPE S3, KEY_ID '${iceberg_user}', SECRET '${iceberg_key}', ENDPOINT 'minio:9000', URL_STYLE 'path', USE_SSL false, REGION 'minio'); + SET unsafe_enable_version_guessing = true; + + $ duckdb-query name=iceberg + SELECT * FROM iceberg_scan('s3://test-bucket/default_namespace/sink_alter_iceberg_mv') ORDER BY 1 + 0 + 11 + 102 + """)) + + +@externally_idempotent(False) +class AlterIcebergSinkWebhook(Check): + """Check ALTER SINK with webhook sources""" + + def _can_run(self, e: Executor) -> bool: + return self.base_version >= MzVersion.parse_mz("v26.10.0-dev") + + def initialize(self) -> Testdrive: + return Testdrive(dedent(""" + >[version>=14700] CREATE CLUSTER iceberg_sink_webhook_cluster SIZE 'scale=1,workers=1', REPLICATION FACTOR 2; + >[version<14700] CREATE CLUSTER iceberg_sink_webhook_cluster SIZE 'scale=1,workers=1', REPLICATION FACTOR 1; + > CREATE SOURCE iceberg_webhook_alter1 IN CLUSTER iceberg_sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT; + > CREATE SINK iceberg_sink_alter_wh FROM iceberg_webhook_alter1 + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE 'iceberg_sink_alter_wh' + ) + USING AWS CONNECTION aws_conn + KEY (body) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + $ webhook-append database=materialize schema=public name=iceberg_webhook_alter1 + 1 + """)) + + def manipulate(self) -> list[Testdrive]: + return [ + Testdrive(dedent(s)) + for s in [ + """ + > CREATE SOURCE iceberg_webhook_alter2 IN CLUSTER iceberg_sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT; + + $ set-from-sql var=running_count + SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'iceberg_sink_alter_wh'; + + > ALTER SINK iceberg_sink_alter_wh SET FROM iceberg_webhook_alter2; + + > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'iceberg_sink_alter_wh'; + true + + $ webhook-append database=materialize schema=public name=iceberg_webhook_alter2 + 2 + """, + """ + > CREATE SOURCE iceberg_webhook_alter3 IN CLUSTER iceberg_sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT; + + $ set-from-sql var=running_count + SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'iceberg_sink_alter_wh'; + + > ALTER SINK iceberg_sink_alter_wh SET FROM iceberg_webhook_alter3; + + > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'iceberg_sink_alter_wh'; + true + + $ webhook-append database=materialize schema=public name=iceberg_webhook_alter3 + 3 + """, + ] + ] + + def validate(self) -> Testdrive: + return Testdrive(dedent(""" + $ set-from-sql var=iceberg_user + SELECT user FROM iceberg_credentials + + $ set-from-sql var=iceberg_key + SELECT key FROM iceberg_credentials + + $ duckdb-execute name=iceberg + CREATE SECRET s3_secret (TYPE S3, KEY_ID '${iceberg_user}', SECRET '${iceberg_key}', ENDPOINT 'minio:9000', URL_STYLE 'path', USE_SSL false, REGION 'minio'); + SET unsafe_enable_version_guessing = true; + + $ duckdb-query name=iceberg + SELECT * FROM iceberg_scan('s3://test-bucket/default_namespace/iceberg_sink_alter_wh') ORDER BY 1 + 1 + 2 + 3 + """)) + + +@externally_idempotent(False) +class AlterIcebergSinkOrder(Check): + """Check ALTER SINK with a table created after the sink, see incident 131""" + + def _can_run(self, e: Executor) -> bool: + return self.base_version >= MzVersion.parse_mz("v26.10.0-dev") + + def initialize(self) -> Testdrive: + return Testdrive(dedent(""" + > CREATE TABLE iceberg_table_alter_order1 (x int, y string) + > CREATE SINK iceberg_sink_alter_order FROM iceberg_table_alter_order1 + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE 'iceberg_sink_alter_order' + ) + USING AWS CONNECTION aws_conn + KEY (x, y) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + > INSERT INTO iceberg_table_alter_order1 VALUES (0, 'a') + """)) + + def manipulate(self) -> list[Testdrive]: + return [ + Testdrive(dedent(s)) + for s in [ + """ + > CREATE TABLE iceberg_table_alter_order2 (x int, y string) + + $ set-from-sql var=running_count + SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'iceberg_sink_alter_order'; + + > ALTER SINK iceberg_sink_alter_order SET FROM iceberg_table_alter_order2; + + > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'iceberg_sink_alter_order'; + true + + > INSERT INTO iceberg_table_alter_order2 VALUES (1, 'b') + > INSERT INTO iceberg_table_alter_order1 VALUES (10, 'aa') + > INSERT INTO iceberg_table_alter_order2 VALUES (11, 'bb') + """, + """ + > CREATE TABLE iceberg_table_alter_order3 (x int, y string) + + $ set-from-sql var=running_count + SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'iceberg_sink_alter_order'; + + > ALTER SINK iceberg_sink_alter_order SET FROM iceberg_table_alter_order3; + + > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'iceberg_sink_alter_order'; + true + + > INSERT INTO iceberg_table_alter_order3 VALUES (2, 'c') + > INSERT INTO iceberg_table_alter_order3 VALUES (12, 'cc') + > INSERT INTO iceberg_table_alter_order1 VALUES (100, 'aaa') + > INSERT INTO iceberg_table_alter_order2 VALUES (101, 'bbb') + > INSERT INTO iceberg_table_alter_order3 VALUES (102, 'ccc') + """, + ] + ] + + def validate(self) -> Testdrive: + return Testdrive(dedent(""" + $ set-from-sql var=iceberg_user + SELECT user FROM iceberg_credentials + + $ set-from-sql var=iceberg_key + SELECT key FROM iceberg_credentials + + $ duckdb-execute name=iceberg + CREATE SECRET s3_secret (TYPE S3, KEY_ID '${iceberg_user}', SECRET '${iceberg_key}', ENDPOINT 'minio:9000', URL_STYLE 'path', USE_SSL false, REGION 'minio'); + SET unsafe_enable_version_guessing = true; + + $ duckdb-query name=iceberg + SELECT * FROM iceberg_scan('s3://test-bucket/default_namespace/iceberg_sink_alter_order') ORDER BY 1 + 0 a + 1 b + 2 c + 11 bb + 12 cc + 102 ccc + """)) diff --git a/misc/python/materialize/checks/all_checks/sink.py b/misc/python/materialize/checks/all_checks/kafka_sink.py similarity index 99% rename from misc/python/materialize/checks/all_checks/sink.py rename to misc/python/materialize/checks/all_checks/kafka_sink.py index 6dc12750436d9..7d43ccb853327 100644 --- a/misc/python/materialize/checks/all_checks/sink.py +++ b/misc/python/materialize/checks/all_checks/kafka_sink.py @@ -41,7 +41,7 @@ def schemas_null() -> str: @externally_idempotent(False) -class SinkUpsert(Check): +class KafkaSinkUpsert(Check): """Basic Check on sinks from an upsert source""" def initialize(self) -> Testdrive: @@ -194,7 +194,7 @@ def validate(self) -> Testdrive: @externally_idempotent(False) -class SinkTables(Check): +class KafkaSinkTables(Check): """Sink and re-ingest a large transaction from a table source""" def initialize(self) -> Testdrive: @@ -289,7 +289,7 @@ def validate(self) -> Testdrive: @externally_idempotent(False) -class SinkNullDefaults(Check): +class KafkaSinkNullDefaults(Check): """Check on an Avro sink with NULL DEFAULTS""" def initialize(self) -> Testdrive: @@ -526,7 +526,7 @@ def validate(self) -> Testdrive: @externally_idempotent(False) -class SinkComments(Check): +class KafkaSinkComments(Check): """Check on an Avro sink with comments""" def initialize(self) -> Testdrive: @@ -792,7 +792,7 @@ def validate(self) -> Testdrive: @externally_idempotent(False) -class SinkAutoCreatedTopicConfig(Check): +class KafkaSinkAutoCreatedTopicConfig(Check): """Check on a sink with auto-created topic configuration""" def initialize(self) -> Testdrive: @@ -1348,7 +1348,7 @@ def validate(self) -> Testdrive: @externally_idempotent(False) -class SinkFormat(Check): +class KafkaSinkFormat(Check): """Check SINK with KEY FORMAT and VALUE FORMAT""" def initialize(self) -> Testdrive: @@ -1402,7 +1402,7 @@ def validate(self) -> Testdrive: @externally_idempotent(False) -class SinkPartitionByDebezium(Check): +class KafkaSinkPartitionByDebezium(Check): """Check SINK with ENVELOPE DEBEZIUM and PARTITION BY""" def initialize(self) -> Testdrive: diff --git a/misc/python/materialize/checks/mzcompose_actions.py b/misc/python/materialize/checks/mzcompose_actions.py index d9eb24c1562a3..f57c36307256e 100644 --- a/misc/python/materialize/checks/mzcompose_actions.py +++ b/misc/python/materialize/checks/mzcompose_actions.py @@ -16,6 +16,7 @@ from materialize.checks.executors import Executor from materialize.docker import image_registry from materialize.mz_version import MzVersion +from materialize.mzcompose.helpers.iceberg import setup_polaris_for_iceberg from materialize.mzcompose.services.clusterd import Clusterd from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized from materialize.mzcompose.services.ssh_bastion_host import ( @@ -228,6 +229,29 @@ def join(self, e: Executor) -> None: e.join(self.handle) +class SetupIcebergTesting(MzcomposeAction): + def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None: + self.handle: Any | None = None + self.mz_service = mz_service + self.scenario = scenario + + def execute(self, e: Executor) -> None: + c = e.mzcompose_composition() + iceberg_credentials = setup_polaris_for_iceberg(c) + input = dedent( + f""" + > CREATE VIEW iceberg_credentials AS SELECT '{iceberg_credentials[0]}' AS user, '{iceberg_credentials[1]}' AS key; + > CREATE SECRET iceberg_secret AS '{iceberg_credentials[1]}' + > CREATE CONNECTION aws_conn TO AWS (ACCESS KEY ID = '{iceberg_credentials[0]}', SECRET ACCESS KEY = SECRET iceberg_secret, ENDPOINT = 'http://minio:9000/', REGION = 'us-east-1'); + > CREATE CONNECTION polaris_conn TO ICEBERG CATALOG (CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', CREDENTIAL = 'root:root', WAREHOUSE = 'default_catalog', SCOPE = 'PRINCIPAL_ROLE:ALL');""" + ) + + self.handle = e.testdrive(input=input, mz_service=self.mz_service) + + def join(self, e: Executor) -> None: + e.join(self.handle) + + class KillMz(MzcomposeAction): def __init__( self, mz_service: str = "materialized", capture_logs: bool = False diff --git a/misc/python/materialize/checks/scenarios.py b/misc/python/materialize/checks/scenarios.py index bc4afc1e33284..dcdc47b74317d 100644 --- a/misc/python/materialize/checks/scenarios.py +++ b/misc/python/materialize/checks/scenarios.py @@ -19,6 +19,7 @@ ConfigureMz, KillClusterdCompute, KillMz, + SetupIcebergTesting, SetupSqlServerTesting, StartClusterdCompute, StartMz, @@ -90,6 +91,7 @@ def run(self) -> None: actions.insert(0, ConfigureMz(self)) sql_server_testing_setup = False + iceberg_setup = self.base_version() < MzVersion.parse_mz("v26.10.0-dev") for index, action in enumerate(actions): # Implicitly call configure to raise version-dependent limits if isinstance(action, StartMz) and not action.deploy_generation: @@ -103,6 +105,13 @@ def run(self) -> None: SetupSqlServerTesting(self, mz_service=action.mz_service), ) sql_server_testing_setup = True + if not iceberg_setup: + # Can only be run once + actions.insert( + index + 1, + SetupIcebergTesting(self, mz_service=action.mz_service), + ) + iceberg_setup = True elif isinstance(action, ReplaceEnvironmentdStatefulSet): actions.insert(index + 1, ConfigureMz(self)) diff --git a/misc/python/materialize/mzcompose/helpers/iceberg.py b/misc/python/materialize/mzcompose/helpers/iceberg.py index 4a5af574293c7..b31391f28293d 100644 --- a/misc/python/materialize/mzcompose/helpers/iceberg.py +++ b/misc/python/materialize/mzcompose/helpers/iceberg.py @@ -187,6 +187,11 @@ def setup_polaris_for_iceberg( key = create_minio_user(username, minio_alias, c) + # Create a dedicated database for Polaris so its internal tables don't + # interfere with FOR ALL TABLES publications in PG CDC tests. + c.up("postgres") + c.exec("postgres", "psql", "-U", "postgres", "-c", "CREATE DATABASE polaris") + with c.override( Polaris( extra_environment=[ diff --git a/misc/python/materialize/mzcompose/services/polaris.py b/misc/python/materialize/mzcompose/services/polaris.py index 6c37b49d680a6..662ea809185c5 100644 --- a/misc/python/materialize/mzcompose/services/polaris.py +++ b/misc/python/materialize/mzcompose/services/polaris.py @@ -24,7 +24,7 @@ def __init__( "POLARIS_PERSISTENCE_TYPE=relational-jdbc", "QUARKUS_DATASOURCE_USERNAME=postgres", "QUARKUS_DATASOURCE_PASSWORD=postgres", - "QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://postgres:5432/postgres", + "QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://postgres:5432/polaris", "POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES=5", "POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS=100", "POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_DURATION_IN_MS=5000", @@ -57,7 +57,7 @@ def __init__( environment: list[str] = [ "QUARKUS_DATASOURCE_USERNAME=postgres", "QUARKUS_DATASOURCE_PASSWORD=postgres", - "QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://postgres:5432/postgres", + "QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://postgres:5432/polaris", "POLARIS_PERSISTENCE_TYPE=relational-jdbc", "POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES=5", "POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS=100", diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 210b66072f338..9b28cd840b4de 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -150,6 +150,27 @@ def __init__(self, rng: random.Random, composition: Composition | None): def run(self, exe: Executor) -> bool: raise NotImplementedError + def create_system_connection( + self, exe: Executor, num_attempts: int = 10 + ) -> Connection: + try: + conn = psycopg.connect( + host=exe.db.host, + port=exe.db.ports[ + "mz_system" if exe.mz_service == "materialized" else "mz_system2" + ], + user="mz_system", + dbname="materialize", + ) + conn.autocommit = True + return conn + except: + if num_attempts == 0: + raise + else: + time.sleep(1) + return self.create_system_connection(exe, num_attempts - 1) + def errors_to_ignore(self, exe: Executor) -> list[str]: result = [ "permission denied for", @@ -1867,27 +1888,6 @@ def run(self, exe: Executor) -> bool: except Exception as e: raise QueryError(str(e), "FlipFlags") - def create_system_connection( - self, exe: Executor, num_attempts: int = 10 - ) -> Connection: - try: - conn = psycopg.connect( - host=exe.db.host, - port=exe.db.ports[ - "mz_system" if exe.mz_service == "materialized" else "mz_system2" - ], - user="mz_system", - dbname="materialize", - ) - conn.autocommit = True - return conn - except: - if num_attempts == 0: - raise - else: - time.sleep(1) - return self.create_system_connection(exe, num_attempts - 1) - def flip_flag(self, conn: Connection, flag_name: str, flag_value: str) -> None: with conn.cursor() as cur: cur.execute( @@ -2999,6 +2999,35 @@ def run(self, exe: Executor) -> bool: return True +class CheckSinkAction(Action): + def run(self, exe: Executor) -> bool: + try: + conn = self.create_system_connection(exe) + try: + with conn.cursor() as cur: + cur.execute( + "SELECT name, type, last_status_change_at, status, error, details FROM mz_internal.mz_sink_statuses WHERE status not in ('running', 'starting', NULL)" + ) + results = cur.fetchall() + if results: + results_str = "\n".join( + [ + f"{name} ({sink_type}) changed status at {last_status_change_at} to {status}: {error} (details: {details})" + for name, sink_type, last_status_change_at, status, error, details in results + ] + ) + raise ValueError(f"Sinks are in a bad state:\n{results_str}") + finally: + conn.close() + except: + if exe.db.scenario not in ( + Scenario.Kill, + Scenario.ZeroDowntimeDeploy, + ): + raise + return True + + class DropIcebergSinkAction(Action): def errors_to_ignore(self, exe: Executor) -> list[str]: return [ @@ -3278,6 +3307,7 @@ def __init__( (DropIcebergSinkAction, 4), (CreateKafkaSourceAction, 4), (DropKafkaSourceAction, 4), + (CheckSinkAction, 1), # TODO: Reenable when database-issues#8237 is fixed # (CreateMySqlSourceAction, 4), # (DropMySqlSourceAction, 4), diff --git a/misc/python/materialize/util.py b/misc/python/materialize/util.py index c6a112cf61e5d..ec826c363e183 100644 --- a/misc/python/materialize/util.py +++ b/misc/python/materialize/util.py @@ -222,6 +222,8 @@ def parse_pg_conn_string(conn_string: str) -> PgConnInfo: "aws-access-key-id=", "aws-secret-access-key=", "default-sql-server-password=", + "Authorization: Bearer ", + "client_secret=", # Not a secret, but too spammy, filter too "CLUSTER_REPLICA_SIZES", "cluster-replica-sizes=", diff --git a/misc/shlib/shlib.bash b/misc/shlib/shlib.bash index 9bc7a8dbd8134..8b2d2ea1b3ece 100644 --- a/misc/shlib/shlib.bash +++ b/misc/shlib/shlib.bash @@ -294,6 +294,7 @@ trufflehog_jq_filter_common() { .Raw != "postgresql://materialize:AbC123dEf@ep-cool-darkness-123456.us-east-2.aws.neon.tech:5432" and .Raw != "d3aa325086974cdfb3912f28e5a8c168" and .Raw != "jdbc:postgresql://postgres:5432/postgres" and + .Raw != "jdbc:postgresql://postgres:5432/polaris" and .Raw != "RPSsql12345" and .Raw != "RPSsql1234" and .Raw != "RPSsql123" and diff --git a/test/limits/mzcompose.py b/test/limits/mzcompose.py index 8db1d30695a78..2fed97c422a48 100644 --- a/test/limits/mzcompose.py +++ b/test/limits/mzcompose.py @@ -29,14 +29,17 @@ Service, WorkflowArgumentParser, ) +from materialize.mzcompose.helpers.iceberg import setup_polaris_for_iceberg from materialize.mzcompose.services.balancerd import Balancerd from materialize.mzcompose.services.clusterd import Clusterd from materialize.mzcompose.services.cockroach import Cockroach from materialize.mzcompose.services.frontegg import FronteggMock from materialize.mzcompose.services.kafka import Kafka from materialize.mzcompose.services.materialized import Materialized +from materialize.mzcompose.services.minio import Mc, Minio from materialize.mzcompose.services.mysql import MySql from materialize.mzcompose.services.mz import Mz +from materialize.mzcompose.services.polaris import Polaris, PolarisBootstrap from materialize.mzcompose.services.postgres import Postgres from materialize.mzcompose.services.schema_registry import SchemaRegistry from materialize.mzcompose.services.sql_server import ( @@ -98,17 +101,22 @@ def header(cls) -> None: print("$ postgres-execute connection=mz_system") print("DROP SCHEMA IF EXISTS public CASCADE;") print(f"CREATE SCHEMA public /* {cls} */;") - print("GRANT ALL PRIVILEGES ON SCHEMA public TO materialize") - print(f'GRANT ALL PRIVILEGES ON SCHEMA public TO "{ADMIN_USER}"') - print( - f'GRANT ALL PRIVILEGES ON CLUSTER single_replica_cluster TO "{ADMIN_USER}";', - ) - print( - f'GRANT ALL PRIVILEGES ON CLUSTER single_worker_cluster TO "{ADMIN_USER}";', - ) - print( - f'GRANT ALL PRIVILEGES ON CLUSTER quickstart TO "{ADMIN_USER}";', - ) + for user in ["materialize", ADMIN_USER]: + print(f'GRANT ALL PRIVILEGES ON SCHEMA public TO "{user}"') + print(f'GRANT ALL PRIVILEGES ON SCHEMA public2 TO "{user}"') + print( + f'GRANT ALL PRIVILEGES ON TABLE public2.iceberg_credentials TO "{user}"' + ) + print(f'GRANT ALL PRIVILEGES ON SECRET public2.iceberg_secret TO "{user}"') + print( + f'GRANT ALL PRIVILEGES ON CLUSTER single_replica_cluster TO "{user}";', + ) + print( + f'GRANT ALL PRIVILEGES ON CLUSTER single_worker_cluster TO "{user}";', + ) + print( + f'GRANT ALL PRIVILEGES ON CLUSTER quickstart TO "{user}";', + ) @classmethod def body(cls) -> None: @@ -678,6 +686,134 @@ def body(cls) -> None: ) +class IcebergSinks(Generator): + COUNT = min(Generator.COUNT, 100) # polaris gets overloaded easily + + @classmethod + def body(cls) -> None: + print("$ set-sql-timeout duration=300s") + print("$ postgres-execute connection=mz_system") + print(f"ALTER SYSTEM SET max_materialized_views = {cls.COUNT * 10};") + print("$ postgres-execute connection=mz_system") + print(f"ALTER SYSTEM SET max_tables = {cls.COUNT * 10};") + print("$ postgres-execute connection=mz_system") + print(f"ALTER SYSTEM SET max_sinks = {cls.COUNT * 10};") + print("$ postgres-execute connection=mz_system") + print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};") + for i in cls.all(): + print(f"> CREATE TABLE t{i} (f1 INT)") + print(f"> INSERT INTO t{i} VALUES ({i})") + + print(dedent(""" + $ set-from-sql var=iceberg_user + SELECT user FROM public2.iceberg_credentials + + $ set-from-sql var=iceberg_key + SELECT key FROM public2.iceberg_credentials + + > CREATE CONNECTION aws_conn TO AWS (ACCESS KEY ID = '{iceberg_user}', SECRET ACCESS KEY = SECRET public2.iceberg_secret, ENDPOINT = 'http://minio:9000/', REGION = 'us-east-1'); + + > CREATE CONNECTION polaris_conn TO ICEBERG CATALOG (CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', CREDENTIAL = 'root:root', WAREHOUSE = 'default_catalog', SCOPE = 'PRINCIPAL_ROLE:ALL'); + + $ duckdb-execute name=iceberg + CREATE SECRET s3_secret (TYPE S3, KEY_ID '${iceberg_user}', SECRET '${iceberg_key}', ENDPOINT 'minio:9000', URL_STYLE 'path', USE_SSL false, REGION 'minio'); + SET unsafe_enable_version_guessing = true; + """)) + + for i in cls.all(): + print(dedent(f""" + > CREATE SINK s{i} + IN CLUSTER single_replica_cluster + FROM t{i} + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE 'sink-{i}' + ) + USING AWS CONNECTION aws_conn + KEY (f1) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + """)) + + for i in cls.all(): + print(dedent(f""" + > SELECT messages_committed >= 1 + FROM mz_internal.mz_sink_statistics + JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id + WHERE mz_sinks.name = 's{i}'; + true + + $ set-from-sql var=iceberg_user + SELECT user FROM public2.iceberg_credentials + + $ duckdb-query name=iceberg + SELECT * FROM iceberg_scan('s3://test-bucket/default_namespace/sink-{i}') + {i} + """)) + + +class IcebergSinksSameSource(Generator): + COUNT = min(Generator.COUNT, 100) # polaris gets overloaded easily + + @classmethod + def body(cls) -> None: + print("$ set-sql-timeout duration=300s") + print("$ postgres-execute connection=mz_system") + print(f"ALTER SYSTEM SET max_sinks = {cls.COUNT * 10};") + print("$ postgres-execute connection=mz_system") + print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};") + print("> CREATE TABLE t1 (f1 INT)") + print("> INSERT INTO t1 VALUES (1)") + + print(dedent(""" + $ set-from-sql var=iceberg_user + SELECT user FROM public2.iceberg_credentials + + $ set-from-sql var=iceberg_key + SELECT key FROM public2.iceberg_credentials + + > CREATE CONNECTION aws_conn TO AWS (ACCESS KEY ID = '{iceberg_user}', SECRET ACCESS KEY = SECRET public2.iceberg_secret, ENDPOINT = 'http://minio:9000/', REGION = 'us-east-1'); + + > CREATE CONNECTION polaris_conn TO ICEBERG CATALOG (CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', CREDENTIAL = 'root:root', WAREHOUSE = 'default_catalog', SCOPE = 'PRINCIPAL_ROLE:ALL'); + + $ duckdb-execute name=iceberg + CREATE SECRET s3_secret (TYPE S3, KEY_ID '${iceberg_user}', SECRET '${iceberg_key}', ENDPOINT 'minio:9000', URL_STYLE 'path', USE_SSL false, REGION 'minio'); + SET unsafe_enable_version_guessing = true; + + """)) + + for i in cls.all(): + print(dedent(f""" + > CREATE SINK s{i} + IN CLUSTER single_replica_cluster + FROM t1 + INTO ICEBERG CATALOG CONNECTION polaris_conn ( + NAMESPACE 'default_namespace', + TABLE 'sink-{i}' + ) + USING AWS CONNECTION aws_conn + KEY (f1) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + """)) + + for i in cls.all(): + print(dedent(f""" + > SELECT messages_committed >= 1 + FROM mz_internal.mz_sink_statistics + JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id + WHERE mz_sinks.name = 's{i}'; + true + + $ set-from-sql var=iceberg_user + SELECT user FROM public2.iceberg_credentials + + $ duckdb-query name=iceberg + SELECT * FROM iceberg_scan('s3://test-bucket/default_namespace/sink-{i}') + 1 + """)) + + class Columns(Generator): @classmethod def body(cls) -> None: @@ -1567,7 +1703,7 @@ def body(cls) -> None: print(f"CREATE TABLE t{i} (c int);") print(f"ALTER TABLE t{i} REPLICA IDENTITY FULL;") print(f"INSERT INTO t{i} VALUES ({i});") - print("CREATE PUBLICATION mz_source FOR ALL TABLES;") + print("CREATE PUBLICATION mz_source FOR TABLES IN SCHEMA public;") print("> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'") print("""> CREATE CONNECTION pg TO POSTGRES ( HOST postgres, @@ -1607,7 +1743,7 @@ def body(cls) -> None: print(f"CREATE TABLE t{i} (c int);") print(f"ALTER TABLE t{i} REPLICA IDENTITY FULL;") print(f"INSERT INTO t{i} VALUES ({i});") - print("CREATE PUBLICATION mz_source FOR ALL TABLES;") + print("CREATE PUBLICATION mz_source FOR TABLES IN SCHEMA public;") print("> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'") print("""> CREATE CONNECTION pg TO POSTGRES ( HOST postgres, @@ -1645,7 +1781,7 @@ def body(cls) -> None: print(f"CREATE TABLE t{i} (c int);") print(f"ALTER TABLE t{i} REPLICA IDENTITY FULL;") print(f"INSERT INTO t{i} VALUES ({i});") - print("CREATE PUBLICATION mz_source FOR ALL TABLES;") + print("CREATE PUBLICATION mz_source FOR TABLES IN SCHEMA public;") print("> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'") print("""> CREATE CONNECTION pg TO POSTGRES ( HOST postgres, @@ -1828,6 +1964,10 @@ def app_password(email: str) -> str: ), MySql(), SqlServer(), + Polaris(), + PolarisBootstrap(), + Minio(), + Mc(), SchemaRegistry(), # We create all sources, sinks and dataflows by default with SIZE 'scale=1,workers=1' # The workflow_instance_size workflow is testing multi-process clusters @@ -1912,6 +2052,7 @@ def app_password(email: str) -> str: "postgres", "mysql", "sql-server", + "minio", "materialized", "balancerd", "frontegg-mock", @@ -1930,6 +2071,8 @@ def setup(c: Composition, workers: int) -> None: c.up(*service_names) setup_sql_server_testing(c) + iceberg_credentials = setup_polaris_for_iceberg(c) + c.sql( "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;", port=6877, @@ -1986,6 +2129,10 @@ def setup(c: Composition, workers: int) -> None: GRANT ALL PRIVILEGES ON CLUSTER single_replica_cluster TO materialize; GRANT ALL PRIVILEGES ON CLUSTER single_replica_cluster TO "{ADMIN_USER}"; GRANT ALL PRIVILEGES ON CLUSTER quickstart TO "{ADMIN_USER}"; + + CREATE SCHEMA public2; + CREATE VIEW public2.iceberg_credentials AS SELECT '{iceberg_credentials[0]}' AS user, '{iceberg_credentials[1]}' AS key; + CREATE SECRET public2.iceberg_secret AS '{iceberg_credentials[1]}'; """, port=6877, user="mz_system", diff --git a/test/platform-checks/mzcompose.py b/test/platform-checks/mzcompose.py index 741eb7c3bc7fb..4583021622033 100644 --- a/test/platform-checks/mzcompose.py +++ b/test/platform-checks/mzcompose.py @@ -40,6 +40,7 @@ from materialize.mzcompose.services.minio import Mc, Minio from materialize.mzcompose.services.mysql import MySql from materialize.mzcompose.services.persistcli import Persistcli +from materialize.mzcompose.services.polaris import Polaris, PolarisBootstrap from materialize.mzcompose.services.postgres import Postgres, PostgresMetadata from materialize.mzcompose.services.schema_registry import SchemaRegistry from materialize.mzcompose.services.sql_server import SqlServer @@ -104,6 +105,8 @@ def create_mzs( Postgres(volumes=["secrets:/certs:ro"]), MySql(), SqlServer(), + Polaris(), + PolarisBootstrap(), Kafka( # The Self-Managed upgrade scenarios exercise historical Mz versions # whose embedded librdkafka cannot SCRAM-auth against Kafka 4.x. Pin