Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
534 changes: 534 additions & 0 deletions misc/python/materialize/checks/all_checks/iceberg_sink.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def schemas_null() -> str:


@externally_idempotent(False)
class SinkUpsert(Check):
class KafkaSinkUpsert(Check):
"""Basic Check on sinks from an upsert source"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -194,7 +194,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkTables(Check):
class KafkaSinkTables(Check):
"""Sink and re-ingest a large transaction from a table source"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -289,7 +289,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkNullDefaults(Check):
class KafkaSinkNullDefaults(Check):
"""Check on an Avro sink with NULL DEFAULTS"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -526,7 +526,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkComments(Check):
class KafkaSinkComments(Check):
"""Check on an Avro sink with comments"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -792,7 +792,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkAutoCreatedTopicConfig(Check):
class KafkaSinkAutoCreatedTopicConfig(Check):
"""Check on a sink with auto-created topic configuration"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -1348,7 +1348,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkFormat(Check):
class KafkaSinkFormat(Check):
"""Check SINK with KEY FORMAT and VALUE FORMAT"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -1402,7 +1402,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkPartitionByDebezium(Check):
class KafkaSinkPartitionByDebezium(Check):
"""Check SINK with ENVELOPE DEBEZIUM and PARTITION BY"""

def initialize(self) -> Testdrive:
Expand Down
24 changes: 24 additions & 0 deletions misc/python/materialize/checks/mzcompose_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from materialize.checks.executors import Executor
from materialize.docker import image_registry
from materialize.mz_version import MzVersion
from materialize.mzcompose.helpers.iceberg import setup_polaris_for_iceberg
from materialize.mzcompose.services.clusterd import Clusterd
from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
from materialize.mzcompose.services.ssh_bastion_host import (
Expand Down Expand Up @@ -228,6 +229,29 @@ def join(self, e: Executor) -> None:
e.join(self.handle)


class SetupIcebergTesting(MzcomposeAction):
def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
self.handle: Any | None = None
self.mz_service = mz_service
self.scenario = scenario

def execute(self, e: Executor) -> None:
c = e.mzcompose_composition()
iceberg_credentials = setup_polaris_for_iceberg(c)
input = dedent(
f"""
> CREATE VIEW iceberg_credentials AS SELECT '{iceberg_credentials[0]}' AS user, '{iceberg_credentials[1]}' AS key;
> CREATE SECRET iceberg_secret AS '{iceberg_credentials[1]}'
> CREATE CONNECTION aws_conn TO AWS (ACCESS KEY ID = '{iceberg_credentials[0]}', SECRET ACCESS KEY = SECRET iceberg_secret, ENDPOINT = 'http://minio:9000/', REGION = 'us-east-1');
> CREATE CONNECTION polaris_conn TO ICEBERG CATALOG (CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', CREDENTIAL = 'root:root', WAREHOUSE = 'default_catalog', SCOPE = 'PRINCIPAL_ROLE:ALL');"""
)

self.handle = e.testdrive(input=input, mz_service=self.mz_service)

def join(self, e: Executor) -> None:
e.join(self.handle)


class KillMz(MzcomposeAction):
def __init__(
self, mz_service: str = "materialized", capture_logs: bool = False
Expand Down
9 changes: 9 additions & 0 deletions misc/python/materialize/checks/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ConfigureMz,
KillClusterdCompute,
KillMz,
SetupIcebergTesting,
SetupSqlServerTesting,
StartClusterdCompute,
StartMz,
Expand Down Expand Up @@ -90,6 +91,7 @@ def run(self) -> None:
actions.insert(0, ConfigureMz(self))

sql_server_testing_setup = False
iceberg_setup = self.base_version() < MzVersion.parse_mz("v26.10.0-dev")
for index, action in enumerate(actions):
# Implicitly call configure to raise version-dependent limits
if isinstance(action, StartMz) and not action.deploy_generation:
Expand All @@ -103,6 +105,13 @@ def run(self) -> None:
SetupSqlServerTesting(self, mz_service=action.mz_service),
)
sql_server_testing_setup = True
if not iceberg_setup:
# Can only be run once
actions.insert(
index + 1,
SetupIcebergTesting(self, mz_service=action.mz_service),
)
iceberg_setup = True
elif isinstance(action, ReplaceEnvironmentdStatefulSet):
actions.insert(index + 1, ConfigureMz(self))

Expand Down
5 changes: 5 additions & 0 deletions misc/python/materialize/mzcompose/helpers/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ def setup_polaris_for_iceberg(

key = create_minio_user(username, minio_alias, c)

# Create a dedicated database for Polaris so its internal tables don't
# interfere with FOR ALL TABLES publications in PG CDC tests.
c.up("postgres")
c.exec("postgres", "psql", "-U", "postgres", "-c", "CREATE DATABASE polaris")

with c.override(
Polaris(
extra_environment=[
Expand Down
4 changes: 2 additions & 2 deletions misc/python/materialize/mzcompose/services/polaris.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(
"POLARIS_PERSISTENCE_TYPE=relational-jdbc",
"QUARKUS_DATASOURCE_USERNAME=postgres",
"QUARKUS_DATASOURCE_PASSWORD=postgres",
"QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://postgres:5432/postgres",
"QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://postgres:5432/polaris",
"POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES=5",
"POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS=100",
"POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_DURATION_IN_MS=5000",
Expand Down Expand Up @@ -57,7 +57,7 @@ def __init__(
environment: list[str] = [
"QUARKUS_DATASOURCE_USERNAME=postgres",
"QUARKUS_DATASOURCE_PASSWORD=postgres",
"QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://postgres:5432/postgres",
"QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://postgres:5432/polaris",
"POLARIS_PERSISTENCE_TYPE=relational-jdbc",
"POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES=5",
"POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS=100",
Expand Down
72 changes: 51 additions & 21 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,27 @@ def __init__(self, rng: random.Random, composition: Composition | None):
def run(self, exe: Executor) -> bool:
raise NotImplementedError

def create_system_connection(
self, exe: Executor, num_attempts: int = 10
) -> Connection:
try:
conn = psycopg.connect(
host=exe.db.host,
port=exe.db.ports[
"mz_system" if exe.mz_service == "materialized" else "mz_system2"
],
user="mz_system",
dbname="materialize",
)
conn.autocommit = True
return conn
except:
if num_attempts == 0:
raise
else:
time.sleep(1)
return self.create_system_connection(exe, num_attempts - 1)

def errors_to_ignore(self, exe: Executor) -> list[str]:
result = [
"permission denied for",
Expand Down Expand Up @@ -1867,27 +1888,6 @@ def run(self, exe: Executor) -> bool:
except Exception as e:
raise QueryError(str(e), "FlipFlags")

def create_system_connection(
self, exe: Executor, num_attempts: int = 10
) -> Connection:
try:
conn = psycopg.connect(
host=exe.db.host,
port=exe.db.ports[
"mz_system" if exe.mz_service == "materialized" else "mz_system2"
],
user="mz_system",
dbname="materialize",
)
conn.autocommit = True
return conn
except:
if num_attempts == 0:
raise
else:
time.sleep(1)
return self.create_system_connection(exe, num_attempts - 1)

def flip_flag(self, conn: Connection, flag_name: str, flag_value: str) -> None:
with conn.cursor() as cur:
cur.execute(
Expand Down Expand Up @@ -2999,6 +2999,35 @@ def run(self, exe: Executor) -> bool:
return True


class CheckSinkAction(Action):
def run(self, exe: Executor) -> bool:
try:
conn = self.create_system_connection(exe)
Comment thread
def- marked this conversation as resolved.
try:
with conn.cursor() as cur:
cur.execute(
"SELECT name, type, last_status_change_at, status, error, details FROM mz_internal.mz_sink_statuses WHERE status not in ('running', 'starting', NULL)"
)
results = cur.fetchall()
if results:
results_str = "\n".join(
[
f"{name} ({sink_type}) changed status at {last_status_change_at} to {status}: {error} (details: {details})"
for name, sink_type, last_status_change_at, status, error, details in results
]
)
raise ValueError(f"Sinks are in a bad state:\n{results_str}")
finally:
conn.close()
except:
if exe.db.scenario not in (
Scenario.Kill,
Scenario.ZeroDowntimeDeploy,
):
raise
return True


class DropIcebergSinkAction(Action):
def errors_to_ignore(self, exe: Executor) -> list[str]:
return [
Expand Down Expand Up @@ -3278,6 +3307,7 @@ def __init__(
(DropIcebergSinkAction, 4),
(CreateKafkaSourceAction, 4),
(DropKafkaSourceAction, 4),
(CheckSinkAction, 1),
# TODO: Reenable when database-issues#8237 is fixed
# (CreateMySqlSourceAction, 4),
# (DropMySqlSourceAction, 4),
Expand Down
2 changes: 2 additions & 0 deletions misc/python/materialize/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ def parse_pg_conn_string(conn_string: str) -> PgConnInfo:
"aws-access-key-id=",
"aws-secret-access-key=",
"default-sql-server-password=",
"Authorization: Bearer ",
"client_secret=",
# Not a secret, but too spammy, filter too
"CLUSTER_REPLICA_SIZES",
"cluster-replica-sizes=",
Expand Down
1 change: 1 addition & 0 deletions misc/shlib/shlib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ trufflehog_jq_filter_common() {
.Raw != "postgresql://materialize:AbC123dEf@ep-cool-darkness-123456.us-east-2.aws.neon.tech:5432" and
.Raw != "d3aa325086974cdfb3912f28e5a8c168" and
.Raw != "jdbc:postgresql://postgres:5432/postgres" and
.Raw != "jdbc:postgresql://postgres:5432/polaris" and
.Raw != "RPSsql12345" and
.Raw != "RPSsql1234" and
.Raw != "RPSsql123" and
Expand Down
Loading
Loading