Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/adapter/src/coord/statement_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use mz_compute_client::controller::error::CollectionLookupError;
use mz_controller_types::ClusterId;
use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
use mz_ore::task::spawn;
use mz_ore::{cast::CastFrom, cast::CastInto};
use mz_ore::{cast::CastFrom, cast::CastInto, soft_panic_or_log};
use mz_repr::adt::timestamp::TimestampLike;
use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
use mz_sql::plan::Params;
Expand Down Expand Up @@ -395,6 +395,9 @@ impl Coordinator {
/// (because it was not sampled). Requiring the opaque `StatementLoggingId` type,
/// which is only instantiated by `begin_statement_execution` if the statement is actually logged,
/// should prevent this.
///
/// It is also an error to end the same execution twice; the duplicate end is
/// reported and ignored, keeping the first end.
pub(crate) fn end_statement_execution(
&mut self,
id: StatementLoggingId,
Expand All @@ -408,13 +411,17 @@ impl Coordinator {
ended_at: now,
};

let began_record = self
.statement_logging
.executions_begun
.remove(&uuid)
.expect(
"matched `begin_statement_execution` and `end_statement_execution` invocations",
let Some(began_record) = self.statement_logging.executions_begun.remove(&uuid) else {
// A `StatementLoggingId` is only minted when a begin is logged, so
// a missing entry means this execution was already ended: some bug
// ended it twice. That's worth a loud report, but statement
// logging must never abort environmentd in production.
soft_panic_or_log!(
"duplicate end_statement_execution for statement {uuid}, reason: {:?}",
ended_record.reason
);
return;
};
for (row, diff) in
Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
{
Expand Down
Loading