diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index be02547a2d..6d8415fd7e 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -91,6 +91,7 @@ import org.opensearch.sql.ast.tree.StreamWindow; import org.opensearch.sql.ast.tree.SubqueryAlias; import org.opensearch.sql.ast.tree.TableFunction; +import org.opensearch.sql.ast.tree.Timewrap; import org.opensearch.sql.ast.tree.Transpose; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.Union; @@ -301,6 +302,10 @@ public T visitChart(Chart node, C context) { return visitChildren(node, context); } + public T visitTimewrap(Timewrap node, C context) { + return visitChildren(node, context); + } + public T visitRegex(Regex node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java b/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java new file mode 100644 index 0000000000..88c151db33 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.SpanUnit; + +/** AST node representing the timewrap command. */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +public class Timewrap extends UnresolvedPlan { + private final SpanUnit unit; + private final int value; + private final String align; // "end" or "now" + private final String series; // "relative", "short", or "exact" + private final String timeFormat; // format string for series=exact, nullable + private final Literal spanLiteral; // original span literal for display + + private UnresolvedPlan child; + + @Override + public UnresolvedPlan attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitTimewrap(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 7ca7ab0930..3f81cdbae4 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -44,6 +44,18 @@ public class CalcitePlanContext { /** This thread local variable is only used to skip script encoding in script pushdown. */ public static final ThreadLocal skipEncoding = ThreadLocal.withInitial(() -> false); + /** When true, the execution engine strips all-null columns from the result (used by timewrap). */ + public static final ThreadLocal stripNullColumns = ThreadLocal.withInitial(() -> false); + + /** + * Timewrap span unit name for column renaming in the execution engine. When set, the execution + * engine uses __base_offset__ to compute absolute period names (e.g., "501days_before"). + */ + public static final ThreadLocal timewrapUnitName = new ThreadLocal<>(); + + /** Timewrap series mode: "relative", "short", or "exact". */ + public static final ThreadLocal timewrapSeries = new ThreadLocal<>(); + /** Thread-local switch that tells whether the current query prefers legacy behavior. */ private static final ThreadLocal legacyPreferredFlag = ThreadLocal.withInitial(() -> true); @@ -169,6 +181,7 @@ public static void run(Runnable action, Settings settings) { action.run(); } finally { legacyPreferredFlag.remove(); + clearTimewrapSignals(); } } @@ -179,6 +192,17 @@ public static boolean isLegacyPreferred() { return legacyPreferredFlag.get(); } + /** + * Resets the timewrap thread-locals set by {@code CalciteRelNodeVisitor.visitTimewrap}. Called + * from the query lifecycle's {@code finally} on every path (execute, explain, and exceptions) so + * the signals never leak onto the next query that reuses this pooled worker thread. + */ + public static void clearTimewrapSignals() { + stripNullColumns.set(false); + timewrapUnitName.set(null); + timewrapSeries.set(null); + } + public void putRexLambdaRefMap(Map candidateMap) { this.rexLambdaRefMap.putAll(candidateMap); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index c4bb8bcd91..13ccdf1519 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Streams; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -160,6 +161,7 @@ import org.opensearch.sql.ast.tree.StreamWindow; import org.opensearch.sql.ast.tree.SubqueryAlias; import org.opensearch.sql.ast.tree.TableFunction; +import org.opensearch.sql.ast.tree.Timewrap; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.Trendline.TrendlineType; import org.opensearch.sql.ast.tree.Union; @@ -176,6 +178,7 @@ import org.opensearch.sql.calcite.utils.JoinAndLookupUtils; import org.opensearch.sql.calcite.utils.PPLHintUtils; import org.opensearch.sql.calcite.utils.PlanUtils; +import org.opensearch.sql.calcite.utils.TimewrapUtils; import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; import org.opensearch.sql.calcite.utils.WildcardUtils; import org.opensearch.sql.common.error.ErrorCode; @@ -187,6 +190,7 @@ import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.HighlightExpression; import org.opensearch.sql.expression.function.BuiltinFunctionName; +import org.opensearch.sql.expression.function.PPLBuiltinOperators; import org.opensearch.sql.expression.function.PPLFuncImpTable; import org.opensearch.sql.expression.parse.RegexCommonUtils; import org.opensearch.sql.utils.ParseUtils; @@ -3823,6 +3827,148 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) { return relBuilder.peek(); } + @Override + public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) { + visitChildren(node, context); + + // Signal the execution engine to strip all-null columns and rename with absolute offsets + CalcitePlanContext.stripNullColumns.set(true); + CalcitePlanContext.timewrapUnitName.set( + TimewrapUtils.unitBaseName(node.getUnit(), node.getValue()) + "|_before"); + CalcitePlanContext.timewrapSeries.set(node.getSeries()); + + RelBuilder b = context.relBuilder; + RexBuilder rx = context.rexBuilder; + + List fieldNames = + b.peek().getRowType().getFieldNames().stream().filter(f -> !isMetadataField(f)).toList(); + String tsFieldName = fieldNames.get(0); + List valueFieldNames = fieldNames.subList(1, fieldNames.size()); + + boolean variableLength = TimewrapUtils.isVariableLengthUnit(node.getUnit()); + RelDataType bigintType = rx.getTypeFactory().createSqlType(SqlTypeName.BIGINT); + + RexNode periodNum; + RexNode displayTimestamp; + RexNode baseOffset; + + if (variableLength) { + // --- Variable-length units (month, quarter, year): EXTRACT-based calendar arithmetic --- + RexNode tsField = b.field(tsFieldName); + RexNode tsUnitNum = + TimewrapUtils.calendarUnitNumber(rx, tsField, node.getUnit(), node.getValue()); + + b.projectPlus(b.aggregateCall(SqlStdOperatorTable.MAX, tsField).over().as("__max_ts__")); + RexNode maxTs = b.field("__max_ts__"); + RexNode maxUnitNum = + TimewrapUtils.calendarUnitNumber(rx, maxTs, node.getUnit(), node.getValue()); + + periodNum = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall(SqlStdOperatorTable.MINUS, maxUnitNum, tsUnitNum), + rx.makeExactLiteral(BigDecimal.ONE, bigintType)); + + RexNode tsEpoch = + rx.makeCast(bigintType, rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, tsField), true); + RexNode unitStartEpoch = TimewrapUtils.calendarUnitStartEpoch(rx, tsField, node.getUnit()); + RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MINUS, tsEpoch, unitStartEpoch); + RexNode maxUnitStartEpoch = TimewrapUtils.calendarUnitStartEpoch(rx, maxTs, node.getUnit()); + RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, maxUnitStartEpoch, offsetSec); + displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch); + + long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000; + Long referenceEpoch = null; + if ("end".equals(node.getAlign())) { + referenceEpoch = TimewrapUtils.extractTimestampUpperBound(node); + } + if (referenceEpoch == null) { + referenceEpoch = nowEpochSec; + } + long refUnitNum = + TimewrapUtils.calendarUnitNumberFromEpoch( + referenceEpoch, node.getUnit(), node.getValue()); + RexNode refUnitNumLit = rx.makeBigintLiteral(BigDecimal.valueOf(refUnitNum)); + baseOffset = rx.makeCall(SqlStdOperatorTable.MINUS, refUnitNumLit, maxUnitNum); + + } else { + // --- Fixed-length units (sec, min, hr, day, week): epoch-based arithmetic --- + long spanSec = TimewrapUtils.spanToSeconds(node.getUnit(), node.getValue()); + + RexNode tsEpochExpr = + rx.makeCast( + bigintType, + rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, b.field(tsFieldName)), + true); + b.projectPlus( + b.alias(tsEpochExpr, "__ts_epoch__"), + b.aggregateCall(SqlStdOperatorTable.MAX, tsEpochExpr).over().as("__max_epoch__")); + + RexNode tsEpoch = b.field("__ts_epoch__"); + RexNode maxEpoch = b.field("__max_epoch__"); + RexNode spanLit = rx.makeBigintLiteral(BigDecimal.valueOf(spanSec)); + + RexNode diff = rx.makeCall(SqlStdOperatorTable.MINUS, maxEpoch, tsEpoch); + periodNum = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall(SqlStdOperatorTable.DIVIDE, diff, spanLit), + rx.makeExactLiteral(BigDecimal.ONE, bigintType)); + + RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MOD, tsEpoch, spanLit); + RexNode latestPeriodStart = + rx.makeCall( + SqlStdOperatorTable.MINUS, + maxEpoch, + rx.makeCall(SqlStdOperatorTable.MOD, maxEpoch, spanLit)); + RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, latestPeriodStart, offsetSec); + displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch); + + long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000; + Long referenceEpoch = null; + if ("end".equals(node.getAlign())) { + referenceEpoch = TimewrapUtils.extractTimestampUpperBound(node); + } + if (referenceEpoch == null) { + referenceEpoch = nowEpochSec; + } + RexNode refLit = rx.makeBigintLiteral(BigDecimal.valueOf(referenceEpoch)); + // Floor-divide (ref - maxEpoch) by span: integer DIVIDE truncates toward zero, which is wrong + // when the reference is below maxEpoch (e.g. align=now over future-dated data) — it would + // shift period labels by one across the latest/before/after boundary. Cast to DOUBLE and + // FLOOR + // to get true floor division, then back to BIGINT. + RelDataType doubleType = rx.getTypeFactory().createSqlType(SqlTypeName.DOUBLE); + RexNode refDiff = rx.makeCall(SqlStdOperatorTable.MINUS, refLit, maxEpoch); + RexNode refDiffDouble = rx.makeCast(doubleType, refDiff, true); + baseOffset = + rx.makeCast( + bigintType, + rx.makeCall( + SqlStdOperatorTable.FLOOR, + rx.makeCall(SqlStdOperatorTable.DIVIDE, refDiffDouble, spanLit)), + true); + } + + // Step 3: Project [display_timestamp, value_columns..., base_offset, period] + // base_offset is included in the group key so it survives the PIVOT + List projections = new ArrayList<>(); + projections.add(b.alias(displayTimestamp, tsFieldName)); + for (String vf : valueFieldNames) { + projections.add(b.field(vf)); + } + projections.add(b.alias(baseOffset, "__base_offset__")); + projections.add(b.alias(periodNum, "__period__")); + b.project(projections); + + // Step 4: Sort by offset, then period (execution engine will pivot) + // No Calcite PIVOT -- the execution engine pivots dynamically after reading all rows. + // Output schema: [display_timestamp, value_columns..., __base_offset__, __period__] + b.sort(b.field(tsFieldName), b.field("__period__")); + + return b.peek(); + } + /** * Aggregate by column split then rank by grand total (summed value of each category). The output * is [col-split, grand-total, row-number] diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapPivot.java b/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapPivot.java new file mode 100644 index 0000000000..c4712ff957 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapPivot.java @@ -0,0 +1,208 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.data.model.ExprNullValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine.Schema.Column; + +/** + * Pivots the unpivoted rows produced by {@code CalciteRelNodeVisitor.visitTimewrap} into the + * Splunk-style period columns timewrap returns. + * + *

The timewrap RelNode intentionally does NOT pivot — the set of period columns is only known + * once all rows are read. Instead it emits rows of shape {@code [display_ts, value_col(s)..., + * __base_offset__, __period__]} and signals this post-processing step via the thread-locals on + * {@link CalcitePlanContext}. This helper performs the dynamic pivot so both execution engines (the + * v2 {@code OpenSearchExecutionEngine} and the analytics-route {@code AnalyticsExecutionEngine}) + * produce identical output. + */ +public final class TimewrapPivot { + + private TimewrapPivot() {} + + /** Result of a pivot: the rebuilt columns and rows. */ + public record Result(List columns, List values) {} + + /** + * Returns true when the current query is a timewrap query whose results need pivoting. Reads the + * {@link CalcitePlanContext} thread-locals set by {@code visitTimewrap}. + */ + public static boolean isTimewrap() { + return Boolean.TRUE.equals(CalcitePlanContext.stripNullColumns.get()) + && CalcitePlanContext.timewrapUnitName.get() != null; + } + + /** + * Pivots the unpivoted timewrap rows into period columns. If the input is empty or the + * bookkeeping columns are absent, the input is returned unchanged. + * + * @param columns the unpivoted columns {@code [display_ts, value_col(s)..., __base_offset__, + * __period__]} + * @param values the unpivoted rows + * @param unitInfo the {@code visitTimewrap} unit descriptor + * ("spanValue|singular|plural|_before"); null means this is not a timewrap query and the + * input is returned unchanged + * @param seriesMode the timewrap {@code series} mode (relative / short / exact); may be null + */ + public static Result pivot( + List columns, List values, String unitInfo, String seriesMode) { + if (unitInfo == null || values.isEmpty()) { + return new Result(columns, values); + } + + // Locate the bookkeeping and value columns. visitTimewrap always emits + // [display_ts, value_col(s)..., __base_offset__, __period__], so column 0 is the timestamp, + // __period__/__base_offset__ are bookkeeping, and every other column is a value column. + int tsIdx = 0; + int periodIdx = -1; + int baseOffsetIdx = -1; + List valueIdxs = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + String name = columns.get(i).getName(); + if ("__period__".equals(name)) periodIdx = i; + else if ("__base_offset__".equals(name)) baseOffsetIdx = i; + else if (i > 0) valueIdxs.add(i); + } + if (periodIdx < 0 || baseOffsetIdx < 0) { + return new Result(columns, values); + } + + // Read __base_offset__ (constant across all rows). + long baseOffset = 0; + ExprValue boVal = values.getFirst().tupleValue().get("__base_offset__"); + if (boVal != null && !boVal.isNull()) { + baseOffset = boVal.longValue(); + } + + // Collect distinct periods (sorted descending = oldest first in output) and precompute each + // period's name once. The name depends only on (period, baseOffset, unitInfo, seriesMode), so + // computing it inside the per-row loop would repeat the same split/parse/switch + // O(rows x valueCols) times. + Set periodSet = new TreeSet<>(Collections.reverseOrder()); + for (ExprValue row : values) { + ExprValue pv = row.tupleValue().get("__period__"); + if (pv != null && !pv.isNull()) { + periodSet.add(pv.longValue()); + } + } + List periods = new ArrayList<>(periodSet); + Map periodNames = new HashMap<>(); + for (long period : periods) { + periodNames.put(period, renameTimewrapPeriod(period, baseOffset, unitInfo, seriesMode)); + } + + // Value column names. + List valueColNames = new ArrayList<>(); + for (int vi : valueIdxs) { + valueColNames.add(columns.get(vi).getName()); + } + + // Build output column names: [ts, val1_period1, val1_period2, ..., val2_period1, ...]. + // Splunk order: for each period, all value columns (oldest period first). + List outColNames = new ArrayList<>(); + outColNames.add(columns.get(tsIdx).getName()); + List outColTypes = new ArrayList<>(); + outColTypes.add(columns.get(tsIdx).getExprType()); + + for (long period : periods) { + for (int vi = 0; vi < valueColNames.size(); vi++) { + outColNames.add(valueColNames.get(vi) + "_" + periodNames.get(period)); + outColTypes.add(columns.get(valueIdxs.get(vi)).getExprType()); + } + } + + // Group rows by display_ts, pivot periods into columns. LinkedHashMap preserves the + // ts-sorted insertion order Calcite produced. + Map> pivoted = new LinkedHashMap<>(); + String tsColName = columns.get(tsIdx).getName(); + for (ExprValue row : values) { + Map tuple = row.tupleValue(); + String tsKey = tuple.get(tsColName).toString(); + long period = tuple.get("__period__").longValue(); + + Map outRow = + pivoted.computeIfAbsent( + tsKey, + k -> { + Map r = new LinkedHashMap<>(); + r.put(outColNames.get(0), tuple.get(tsColName)); + // Initialize all period columns to null. + for (int i = 1; i < outColNames.size(); i++) { + r.put(outColNames.get(i), ExprNullValue.of()); + } + return r; + }); + + // Fill in the value for this period. + String periodName = periodNames.get(period); + for (int vi = 0; vi < valueColNames.size(); vi++) { + String colName = valueColNames.get(vi) + "_" + periodName; + ExprValue val = tuple.get(valueColNames.get(vi)); + if (val != null) { + outRow.put(colName, val); + } + } + } + + // Build output. + List outColumns = new ArrayList<>(); + for (int i = 0; i < outColNames.size(); i++) { + outColumns.add(new Column(outColNames.get(i), null, outColTypes.get(i))); + } + List outValues = new ArrayList<>(); + for (Map outRow : pivoted.values()) { + outValues.add(ExprTupleValue.fromExprValueMap(outRow)); + } + return new Result(outColumns, outValues); + } + + /** + * Generates a period name from a relative period number and base offset. Returns the suffix only + * (no value prefix). E.g., "2days_before", "latest_day", "s2". unitInfo format: + * "spanValue|singular|plural|_before". + */ + private static String renameTimewrapPeriod( + long relativePeriod, long baseOffset, String unitInfo, String seriesMode) { + String[] parts = unitInfo.split("\\|", -1); + if (parts.length < 4) return String.valueOf(relativePeriod); + int spanValue = Integer.parseInt(parts[0]); + String singular = parts[1]; + String plural = parts[2]; + + long absolutePeriod = (baseOffset + relativePeriod - 1) * spanValue; + + String mode = seriesMode == null ? "relative" : seriesMode; + + return switch (mode) { + // series=exact (+ time_format) is not yet implemented; it intentionally falls back to the + // short "s" naming. TODO: format the period start date with time_format. + case "short", "exact" -> "s" + absolutePeriod; + default -> { + if (absolutePeriod == 0) { + yield "latest_" + singular; + } else if (absolutePeriod > 0) { + String unit = absolutePeriod == 1 ? singular : plural; + yield absolutePeriod + unit + "_before"; + } else { + long absPeriod = Math.abs(absolutePeriod); + String unit = absPeriod == 1 ? singular : plural; + yield absPeriod + unit + "_after"; + } + } + }; + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java new file mode 100644 index 0000000000..8c17202731 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java @@ -0,0 +1,356 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.opensearch.sql.ast.Node; +import org.opensearch.sql.ast.expression.And; +import org.opensearch.sql.ast.expression.Compare; +import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.SpanUnit; +import org.opensearch.sql.ast.expression.UnresolvedExpression; +import org.opensearch.sql.ast.tree.Filter; +import org.opensearch.sql.ast.tree.Timewrap; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.expression.function.PPLBuiltinOperators; + +/** Utility methods for the timewrap command's Calcite plan construction. */ +public class TimewrapUtils { + + /** Check if the span unit is variable-length (month, quarter, year). */ + public static boolean isVariableLengthUnit(SpanUnit unit) { + return "M".equals(unit.getName()) || "q".equals(unit.getName()) || "y".equals(unit.getName()); + } + + /** + * Convert a fixed-length span unit and value to seconds. Only fixed-length units (second, minute, + * hour, day, week) are supported; variable-length units (month, quarter, year) have no exact + * second count and must use the calendar arithmetic path ({@link #calendarUnitNumber} / {@link + * #calendarUnitStartEpoch}) instead. + */ + public static long spanToSeconds(SpanUnit unit, int value) { + return switch (unit.getName()) { + case "s" -> value; + case "m" -> value * 60L; + case "h" -> value * 3_600L; + case "d" -> value * 86_400L; + case "w" -> value * 7L * 86_400L; + case "M", "q", "y" -> + throw new IllegalArgumentException( + "Variable-length unit '" + + unit.getName() + + "' cannot be converted to a fixed number of seconds; use the calendar" + + " arithmetic path instead"); + default -> + throw new SemanticCheckException("Unsupported time unit in timewrap: " + unit.getName()); + }; + } + + /** + * Get the timescale base name for column naming. Returns "spanValue|singular|plural" e.g., + * "1|day|days". + */ + public static String unitBaseName(SpanUnit unit, int value) { + String singular = + switch (unit.getName()) { + case "s" -> "second"; + case "m" -> "minute"; + case "h" -> "hour"; + case "d" -> "day"; + case "w" -> "week"; + case "M" -> "month"; + case "q" -> "quarter"; + case "y" -> "year"; + default -> "period"; + }; + String plural = singular + "s"; + return value + "|" + singular + "|" + plural; + } + + /** + * Compute a calendar unit number for a timestamp as a Calcite RexNode. For months: year*12 + + * month. For quarters: year*4 + quarter. For years: year. Divided by spanValue. + */ + public static RexNode calendarUnitNumber( + RexBuilder rx, RexNode tsField, SpanUnit unit, int spanValue) { + RexNode year = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("YEAR"), tsField); + RelDataType bigintType = rx.getTypeFactory().createSqlType(SqlTypeName.BIGINT); + + RexNode unitNum; + switch (unit.getName()) { + case "M" -> { + RexNode month = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("MONTH"), tsField); + unitNum = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + year, + rx.makeExactLiteral(BigDecimal.valueOf(12), bigintType)), + month); + } + case "q" -> { + RexNode month = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("MONTH"), tsField); + RexNode quarter = + rx.makeCall( + SqlStdOperatorTable.DIVIDE, + rx.makeCall( + SqlStdOperatorTable.MINUS, + month, + rx.makeExactLiteral(BigDecimal.ONE, bigintType)), + rx.makeExactLiteral(BigDecimal.valueOf(3), bigintType)); + unitNum = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + year, + rx.makeExactLiteral(BigDecimal.valueOf(4), bigintType)), + quarter); + } + case "y" -> unitNum = year; + default -> throw new SemanticCheckException("Not a variable-length unit: " + unit.getName()); + } + + if (spanValue > 1) { + unitNum = + rx.makeCall( + SqlStdOperatorTable.DIVIDE, + unitNum, + rx.makeExactLiteral(BigDecimal.valueOf(spanValue), bigintType)); + } + return unitNum; + } + + /** + * Compute the epoch seconds of the start of the calendar unit containing a timestamp. Month: + * first day of the month. Quarter: first day of the quarter (precise with leap year). Year: Jan + * 1. + */ + public static RexNode calendarUnitStartEpoch(RexBuilder rx, RexNode tsField, SpanUnit unit) { + RelDataType bigintType = rx.getTypeFactory().createSqlType(SqlTypeName.BIGINT); + RexNode tsEpoch = + rx.makeCast(bigintType, rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, tsField), true); + RexNode hour = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("HOUR"), tsField); + RexNode minute = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("MINUTE"), tsField); + RexNode second = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("SECOND"), tsField); + RexNode one = rx.makeExactLiteral(BigDecimal.ONE, bigintType); + RexNode sec86400 = rx.makeExactLiteral(BigDecimal.valueOf(86400), bigintType); + + RexNode timeWithinDay = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + hour, + rx.makeExactLiteral(BigDecimal.valueOf(3600), bigintType)), + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + minute, + rx.makeExactLiteral(BigDecimal.valueOf(60), bigintType))), + second); + + if ("M".equals(unit.getName())) { + RexNode dayOfMonth = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("DAY"), tsField); + return rx.makeCall( + SqlStdOperatorTable.MINUS, + rx.makeCall( + SqlStdOperatorTable.MINUS, + tsEpoch, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + rx.makeCall(SqlStdOperatorTable.MINUS, dayOfMonth, one), + sec86400)), + timeWithinDay); + } else if ("y".equals(unit.getName())) { + RexNode doy = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("DOY"), tsField); + return rx.makeCall( + SqlStdOperatorTable.MINUS, + rx.makeCall( + SqlStdOperatorTable.MINUS, + tsEpoch, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + rx.makeCall(SqlStdOperatorTable.MINUS, doy, one), + sec86400)), + timeWithinDay); + } else { + // Quarter: precise day-within-quarter via cumulative day lookup + leap year + RexNode doy = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("DOY"), tsField); + RexNode month = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("MONTH"), tsField); + RexNode year = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("YEAR"), tsField); + + RexNode monthsIntoQuarter = + rx.makeCall( + SqlStdOperatorTable.MOD, + rx.makeCall(SqlStdOperatorTable.MINUS, month, one), + rx.makeExactLiteral(BigDecimal.valueOf(3), bigintType)); + RexNode quarterStartMonth = rx.makeCall(SqlStdOperatorTable.MINUS, month, monthsIntoQuarter); + + RexNode cumDaysBeforeQS = cumDaysBeforeMonth(rx, quarterStartMonth, year, bigintType); + RexNode quarterStartDOY = rx.makeCall(SqlStdOperatorTable.PLUS, cumDaysBeforeQS, one); + RexNode dayWithinQuarter = rx.makeCall(SqlStdOperatorTable.MINUS, doy, quarterStartDOY); + + return rx.makeCall( + SqlStdOperatorTable.MINUS, + rx.makeCall( + SqlStdOperatorTable.MINUS, + tsEpoch, + rx.makeCall(SqlStdOperatorTable.MULTIPLY, dayWithinQuarter, sec86400)), + timeWithinDay); + } + } + + /** + * Build a CASE expression for cumulative days before a given month, with leap year handling. + * Month 1→0, Month 2→31, Month 3→59+leap, ..., Month 12→334+leap. + */ + public static RexNode cumDaysBeforeMonth( + RexBuilder rx, RexNode month, RexNode year, RelDataType bigintType) { + RexNode mod4 = + rx.makeCall( + SqlStdOperatorTable.MOD, year, rx.makeExactLiteral(BigDecimal.valueOf(4), bigintType)); + RexNode mod100 = + rx.makeCall( + SqlStdOperatorTable.MOD, + year, + rx.makeExactLiteral(BigDecimal.valueOf(100), bigintType)); + RexNode mod400 = + rx.makeCall( + SqlStdOperatorTable.MOD, + year, + rx.makeExactLiteral(BigDecimal.valueOf(400), bigintType)); + RexNode zero = rx.makeExactLiteral(BigDecimal.ZERO, bigintType); + RexNode isLeap = + rx.makeCall( + SqlStdOperatorTable.CASE, + rx.makeCall( + SqlStdOperatorTable.AND, + rx.makeCall(SqlStdOperatorTable.EQUALS, mod4, zero), + rx.makeCall( + SqlStdOperatorTable.OR, + rx.makeCall(SqlStdOperatorTable.NOT_EQUALS, mod100, zero), + rx.makeCall(SqlStdOperatorTable.EQUALS, mod400, zero))), + rx.makeExactLiteral(BigDecimal.ONE, bigintType), + zero); + + int[] cumDays = {0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334}; + + List caseArgs = new ArrayList<>(); + for (int m = 1; m <= 12; m++) { + caseArgs.add( + rx.makeCall( + SqlStdOperatorTable.EQUALS, + month, + rx.makeExactLiteral(BigDecimal.valueOf(m), bigintType))); + RexNode days = rx.makeExactLiteral(BigDecimal.valueOf(cumDays[m - 1]), bigintType); + if (m >= 3) { + days = rx.makeCall(SqlStdOperatorTable.PLUS, days, isLeap); + } + caseArgs.add(days); + } + caseArgs.add(zero); + + return rx.makeCall(SqlStdOperatorTable.CASE, caseArgs.toArray(new RexNode[0])); + } + + /** Compute calendar unit number from an epoch at plan time (Java). */ + public static long calendarUnitNumberFromEpoch(long epochSec, SpanUnit unit, int spanValue) { + java.time.Instant instant = java.time.Instant.ofEpochSecond(epochSec); + java.time.ZonedDateTime zdt = instant.atZone(java.time.ZoneOffset.UTC); + long unitNum; + switch (unit.getName()) { + case "M" -> unitNum = zdt.getYear() * 12L + zdt.getMonthValue(); + case "q" -> unitNum = zdt.getYear() * 4L + (zdt.getMonthValue() - 1) / 3; + case "y" -> unitNum = zdt.getYear(); + default -> throw new SemanticCheckException("Not a variable-length unit: " + unit.getName()); + } + return unitNum / spanValue; + } + + /** + * Walk the AST from a Timewrap node to the deepest Filter node and extract the timestamp upper + * bound. The frontend time picker always appends the timestamp filter as the first pipe (closest + * to source), making it the deepest Filter in the AST chain: + * + *

+   *   Timewrap -> Chart -> [user filters] -> Filter(@timestamp >= X AND @timestamp <= Y) -> Source
+   * 
+ * + * We walk all Filter nodes and return the last (deepest) timestamp upper bound found. This + * ensures user filters like `where age > 30` between timechart and the time picker filter don't + * interfere. + */ + public static Long extractTimestampUpperBound(Timewrap node) { + Node current = node; + Long lastBound = null; + while (current != null && !current.getChild().isEmpty()) { + current = current.getChild().get(0); + if (current instanceof Filter filter) { + Long bound = findUpperBound(filter.getCondition()); + if (bound != null) { + lastBound = bound; + } + } + } + return lastBound; + } + + private static Long findUpperBound(UnresolvedExpression expr) { + if (expr instanceof And and) { + Long left = findUpperBound(and.getLeft()); + Long right = findUpperBound(and.getRight()); + if (left != null && right != null) return Math.min(left, right); + return left != null ? left : right; + } + if (expr instanceof Compare cmp) { + String op = cmp.getOperator(); + if (("<=".equals(op) || "<".equals(op)) && isTimestampField(cmp.getLeft())) { + return parseTimestampLiteral(cmp.getRight()); + } + if ((">=".equals(op) || ">".equals(op)) && isTimestampField(cmp.getRight())) { + return parseTimestampLiteral(cmp.getLeft()); + } + } + return null; + } + + private static boolean isTimestampField(UnresolvedExpression expr) { + if (expr instanceof Field field) { + String name = field.getField().toString(); + return "@timestamp".equals(name) || "timestamp".equals(name); + } + return false; + } + + private static Long parseTimestampLiteral(UnresolvedExpression expr) { + if (expr instanceof Literal lit && lit.getValue() instanceof String s) { + try { + java.time.LocalDateTime ldt = + java.time.LocalDateTime.parse( + s, java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + return ldt.toEpochSecond(java.time.ZoneOffset.UTC); + } catch (java.time.format.DateTimeParseException e) { + try { + return java.time.Instant.parse(s).getEpochSecond(); + } catch (java.time.format.DateTimeParseException ignored) { + return null; + } + } + } + return null; + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java index 18d87fc18d..b98204fb31 100644 --- a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -112,6 +112,11 @@ public void execute( ProfileContext profileCtx = QueryProfiling.current(); long execStart = System.nanoTime(); + // Capture the timewrap pivot signals now: they live in CalcitePlanContext thread-locals set by + // visitTimewrap on this (planning) thread, but the result callback below runs on a different + // analytics worker thread. Clearing them here keeps this thread's thread-locals clean. + TimewrapSignals timewrap = TimewrapSignals.captureAndClear(); + planExecutor.execute( plan, queryCtx, @@ -125,10 +130,12 @@ public void onResponse(Iterable rows) { List fields = plan.getRowType().getFieldList(); List results = convertRows(rows, fields); Schema schema = buildSchema(fields, results); + QueryResponse response = + timewrap.pivot(new QueryResponse(schema, results, Cursor.None)); profileCtx .getOrCreateMetric(MetricName.EXECUTE) .set(System.nanoTime() - execStart); - listener.onResponse(new QueryResponse(schema, results, Cursor.None)); + listener.onResponse(response); } catch (Exception e) { listener.onFailure(e); } @@ -172,6 +179,9 @@ public void executeWithProfile( ProfileContext profileCtx = QueryProfiling.current(); long execStart = System.nanoTime(); + // See execute(): capture the timewrap pivot signals on this planning thread. + TimewrapSignals timewrap = TimewrapSignals.captureAndClear(); + planExecutor.executeWithProfile( plan, queryCtx, @@ -182,7 +192,7 @@ public void onResponse(ProfiledResult result) { // ProfiledResult delivers the profile on BOTH success and failure paths // so users get stage timing visibility even when a query partially fails. profileCtx.getOrCreateMetric(MetricName.EXECUTE).set(System.nanoTime() - execStart); - QueryResponse response = buildProfiledResponse(plan, result); + QueryResponse response = buildProfiledResponse(plan, result, timewrap); listener.onResponse(response); } catch (Exception e) { listener.onFailure(e); @@ -196,12 +206,13 @@ public void onFailure(Exception e) { }); } - private QueryResponse buildProfiledResponse(RelNode plan, ProfiledResult result) { + private QueryResponse buildProfiledResponse( + RelNode plan, ProfiledResult result, TimewrapSignals timewrap) { List fields = plan.getRowType().getFieldList(); List results = result.rows() != null ? convertRows(result.rows(), fields) : List.of(); Schema schema = buildSchema(fields, results); - QueryResponse response = new QueryResponse(schema, results, Cursor.None); + QueryResponse response = timewrap.pivot(new QueryResponse(schema, results, Cursor.None)); response.setProfile(result.profile()); if (!result.isSuccess()) { response.setError(result.failure()); diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/TimewrapSignals.java b/core/src/main/java/org/opensearch/sql/executor/analytics/TimewrapSignals.java new file mode 100644 index 0000000000..d7907e93d8 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/TimewrapSignals.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.utils.TimewrapPivot; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.executor.ExecutionEngine.Schema; + +/** + * A snapshot of the timewrap pivot signals that {@code CalciteRelNodeVisitor.visitTimewrap} stores + * in {@link CalcitePlanContext} thread-locals. + * + *

On the analytics route, planning and result-conversion run on different threads — the result + * callback fires on an analytics worker pool, not the SQL worker thread that planned the query. + * Capturing the signals at execute() entry (on the planning thread) and carrying them into the + * callback keeps the pivot correct across that thread hop. Capturing also clears the thread-locals + * so they don't leak onto the planning thread's next query. + */ +public final class TimewrapSignals { + + private final boolean active; + private final String unitName; + private final String series; + + private TimewrapSignals(boolean active, String unitName, String series) { + this.active = active; + this.unitName = unitName; + this.series = series; + } + + /** Captures the current thread's timewrap signals and clears the thread-locals. */ + public static TimewrapSignals captureAndClear() { + boolean active = TimewrapPivot.isTimewrap(); + String unitName = CalcitePlanContext.timewrapUnitName.get(); + String series = CalcitePlanContext.timewrapSeries.get(); + CalcitePlanContext.clearTimewrapSignals(); + return new TimewrapSignals(active, unitName, series); + } + + /** + * Applies the timewrap pivot to {@code response} if this snapshot is from a timewrap query; + * otherwise returns it unchanged. The returned response carries over the input's profile/error. + */ + public QueryResponse pivot(QueryResponse response) { + if (!active) { + return response; + } + TimewrapPivot.Result pivoted = + TimewrapPivot.pivot( + response.getSchema().getColumns(), response.getResults(), unitName, series); + QueryResponse out = + new QueryResponse(new Schema(pivoted.columns()), pivoted.values(), response.getCursor()); + out.setProfile(response.getProfile()); + out.setError(response.getError()); + return out; + } +} diff --git a/core/src/test/java/org/opensearch/sql/calcite/utils/TimewrapSignalsLeakTest.java b/core/src/test/java/org/opensearch/sql/calcite/utils/TimewrapSignalsLeakTest.java new file mode 100644 index 0000000000..eef98cdc4b --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/calcite/utils/TimewrapSignalsLeakTest.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; + +import com.google.common.collect.ImmutableMap; +import java.util.List; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.common.setting.Settings.Key; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.ExecutionEngine.Schema.Column; + +/** + * Leak guard for the timewrap pivot signals. {@code visitTimewrap} stashes its pivot state in the + * static {@link CalcitePlanContext} thread-locals read by {@link TimewrapPivot#isTimewrap()}. These + * are set on a pooled worker thread, so if they are not cleared at the end of the query lifecycle + * they would leak onto the next, non-timewrap query that reuses the same thread and wrongly pivot + * its result. This test asserts the lifecycle guard ({@code CalcitePlanContext.run}'s finally) + * clears them. + */ +public class TimewrapSignalsLeakTest { + + @AfterEach + public void cleanUp() { + CalcitePlanContext.clearTimewrapSignals(); + } + + @Test + public void timewrapSignalsDoNotLeakOntoNextQueryOnSameThread() { + Settings settings = mock(Settings.class); + lenient().when(settings.getSettingValue(Key.PPL_SYNTAX_LEGACY_PREFERRED)).thenReturn(false); + + // First "query" behaves like a timewrap query: it sets the pivot signals mid-flight, exactly as + // visitTimewrap does. run()'s finally must clear them before returning. + CalcitePlanContext.run( + () -> { + CalcitePlanContext.stripNullColumns.set(true); + CalcitePlanContext.timewrapUnitName.set("1|day|days|_before"); + CalcitePlanContext.timewrapSeries.set("relative"); + }, + settings); + + // Same thread, next query: the pivot gate must be closed. + assertFalse( + TimewrapPivot.isTimewrap(), + "timewrap signals leaked onto the next query on the same pooled thread"); + + // And a plain (non-timewrap) result must pass through the pivot untouched — no __base_offset__ + // or __period__ artifact columns, same row. + List columns = + List.of( + new Column("host", null, ExprCoreType.STRING), + new Column("count", null, ExprCoreType.LONG)); + List rows = + List.of(ExprValueUtils.tupleValue(ImmutableMap.of("host", "h1", "count", 42L))); + + TimewrapPivot.Result result = + TimewrapPivot.pivot( + columns, + rows, + CalcitePlanContext.timewrapUnitName.get(), + CalcitePlanContext.timewrapSeries.get()); + + assertEquals( + List.of("host", "count"), + result.columns().stream().map(Column::getName).toList(), + "non-timewrap result gained artifact columns after a prior timewrap query"); + assertEquals(rows, result.values()); + } +} diff --git a/docs/category.json b/docs/category.json index ac1dda966d..1e45f3e65b 100644 --- a/docs/category.json +++ b/docs/category.json @@ -45,6 +45,7 @@ "user/ppl/cmd/syntax.md", "user/ppl/cmd/chart.md", "user/ppl/cmd/timechart.md", + "user/ppl/cmd/timewrap.md", "user/ppl/cmd/top.md", "user/ppl/cmd/trendline.md", "user/ppl/cmd/transpose.md", diff --git a/docs/user/dql/metadata.rst b/docs/user/dql/metadata.rst index e4f55ef1b3..92aaa0c7db 100644 --- a/docs/user/dql/metadata.rst +++ b/docs/user/dql/metadata.rst @@ -35,7 +35,7 @@ Example 1: Show All Indices Information SQL query:: os> SHOW TABLES LIKE '%' - fetched rows / total rows = 24/24 + fetched rows / total rows = 25/25 +----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------+ | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | |----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------| @@ -48,7 +48,7 @@ SQL query:: | docTestCluster | null | events_many_hosts | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | events_null | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | json_test | BASE TABLE | null | null | null | null | null | null | - | docTestCluster | null | mvcombine_data | BASE TABLE | null | null | null | null | null | null | + | docTestCluster | null | mvcombine_data | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nested | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nyc_taxi | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | occupation | BASE TABLE | null | null | null | null | null | null | @@ -59,6 +59,7 @@ SQL query:: | docTestCluster | null | time_data | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | time_data2 | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | time_test | BASE TABLE | null | null | null | null | null | null | + | docTestCluster | null | timewrap_test | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | weblogs | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | wildcard | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | work_information | BASE TABLE | null | null | null | null | null | null | diff --git a/docs/user/ppl/cmd/timewrap.md b/docs/user/ppl/cmd/timewrap.md new file mode 100644 index 0000000000..6e4b9414a7 --- /dev/null +++ b/docs/user/ppl/cmd/timewrap.md @@ -0,0 +1,175 @@ + +# timewrap + +The `timewrap` command reshapes `timechart` output by wrapping each time period into a separate data series. This enables side-by-side comparisons of the same metric across recurring time intervals, such as day-over-day or week-over-week analysis. + +## Syntax + +The `timewrap` command has the following syntax: + +```syntax +... | timechart ... | timewrap [align=end|now] +``` + +## Parameters + +The `timewrap` command supports the following parameters. + +| Parameter | Required/Optional | Description | +| --- | --- | --- | +| `` | Required | The wrapping interval, in the form `[int]`. If the integer is omitted, `1` is assumed. For example, `1day`, `2week`, or just `day`. For a complete list of supported time units, see [Time units](#time-units). | +| `align` | Optional | Controls the reference point for period alignment and column naming. Default is `end`. `end` aligns to the search end time (the upper bound of the `where` clause on `@timestamp`, or current time if no time filter). `now` always aligns to the current query execution time. | + +## Notes + +The following considerations apply when using the `timewrap` command: + +* The `timewrap` command must follow a `timechart` command. It is a post-processing command that reshapes timechart output. +* Column names follow the pattern `__before` for periods before the reference point, `_latest_` for the period containing the reference point, and `__after` for periods after the reference point. +* Column order is oldest first (leftmost) to newest (rightmost). +* Only columns with data are included in the output. Unused period columns are automatically removed. +* Incomplete periods (where data does not span the full wrap interval) show `null` for missing time offsets. +* Only `timechart` without the `BY` clause is currently supported. The `BY` clause is a future enhancement. + +### Time units + +The following time units are available for the `` parameter: + +* Seconds (`s`, `sec`, `second`, `secs`, `seconds`) +* Minutes (`m`, `min`, `minute`, `mins`, `minutes`) --- note: `m` means minutes, not months +* Hours (`h`, `hr`, `hour`, `hrs`, `hours`) +* Days (`d`, `day`, `days`) +* Weeks (`w`, `week`, `weeks`) + +Variable-length time units (`month`, `quarter`, `year`) are not yet supported. + +### Column naming + +Column names are constructed from the aggregation function name and an absolute time offset from the reference point: + +| Position relative to reference | Column name format | Example | +| --- | --- | --- | +| Before the reference point | `__before` | `sum(requests)_2days_before` | +| At the reference point | `_latest_` | `sum(requests)_latest_day` | +| After the reference point | `__after` | `sum(requests)_1day_after` | + +## Example 1: Day-over-day comparison + +The following query compares the sum of requests per 6-hour interval across 3 days: + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-03 18:00:00' +| timechart span=6h sum(requests) +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+----------------------------+---------------------------+--------------------------+ +| @timestamp | sum(requests)_2days_before | sum(requests)_1day_before | sum(requests)_latest_day | +|---------------------+----------------------------+---------------------------+--------------------------| +| 2024-07-03 00:00:00 | 180 | 205 | 165 | +| 2024-07-03 06:00:00 | 240 | 260 | 225 | +| 2024-07-03 12:00:00 | 310 | 330 | 285 | +| 2024-07-03 18:00:00 | 190 | 215 | 165 | ++---------------------+----------------------------+---------------------------+--------------------------+ +``` + +Each column represents one day of data. The `latest_day` column contains the most recent period (July 3). The `2days_before` column contains the oldest period (July 1). + +## Example 2: Comparing averages across 2 days + +The following query compares the average requests per 6-hour interval: + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-02 18:00:00' +| timechart span=6h avg(requests) +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+---------------------------+--------------------------+ +| @timestamp | avg(requests)_1day_before | avg(requests)_latest_day | +|---------------------+---------------------------+--------------------------| +| 2024-07-02 00:00:00 | 90.0 | 102.5 | +| 2024-07-02 06:00:00 | 120.0 | 130.0 | +| 2024-07-02 12:00:00 | 155.0 | 165.0 | +| 2024-07-02 18:00:00 | 95.0 | 107.5 | ++---------------------+---------------------------+--------------------------+ +``` + +## Example 3: Single day produces one period + +When all data fits within a single wrap interval, only one period column is produced: + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-01 18:00:00' +| timechart span=6h sum(requests) +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+--------------------------+ +| @timestamp | sum(requests)_latest_day | +|---------------------+--------------------------| +| 2024-07-01 00:00:00 | 180 | +| 2024-07-01 06:00:00 | 240 | +| 2024-07-01 12:00:00 | 310 | +| 2024-07-01 18:00:00 | 190 | ++---------------------+--------------------------+ +``` + +## Example 4: Count events day-over-day + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-03 18:00:00' +| timechart span=6h count() +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+----------------------+---------------------+--------------------+ +| @timestamp | count()_2days_before | count()_1day_before | count()_latest_day | +|---------------------+----------------------+---------------------+--------------------| +| 2024-07-03 00:00:00 | 2 | 2 | 2 | +| 2024-07-03 06:00:00 | 2 | 2 | 2 | +| 2024-07-03 12:00:00 | 2 | 2 | 2 | +| 2024-07-03 18:00:00 | 2 | 2 | 2 | ++---------------------+----------------------+---------------------+--------------------+ +``` + +## Example 5: Comparing errors across 2 days + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-02 00:00:00' and @timestamp <= '2024-07-03 18:00:00' +| timechart span=6h sum(errors) +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+-------------------------+------------------------+ +| @timestamp | sum(errors)_1day_before | sum(errors)_latest_day | +|---------------------+-------------------------+------------------------| +| 2024-07-03 00:00:00 | 4 | 1 | +| 2024-07-03 06:00:00 | 6 | 3 | +| 2024-07-03 12:00:00 | 9 | 6 | +| 2024-07-03 18:00:00 | 3 | 1 | ++---------------------+-------------------------+------------------------+ +``` + +## Limitations + +The `timewrap` command has the following limitations: + +* The `timewrap` command must follow a `timechart` command. Using it after any other command results in an error. +* Only `timechart` without the `BY` clause is supported. The `BY` clause (column split) is a future enhancement. +* Variable-length time units (`month`, `quarter`, `year`) are not yet supported. Use fixed-length units (`s`, `m`, `h`, `d`, `w`). diff --git a/doctest/test_data/timewrap_test.json b/doctest/test_data/timewrap_test.json new file mode 100644 index 0000000000..d38977f629 --- /dev/null +++ b/doctest/test_data/timewrap_test.json @@ -0,0 +1,24 @@ +{"@timestamp":"2024-07-01T00:00:00Z","host":"web-01","requests":100,"errors":2} +{"@timestamp":"2024-07-01T06:00:00Z","host":"web-01","requests":150,"errors":5} +{"@timestamp":"2024-07-01T12:00:00Z","host":"web-01","requests":200,"errors":3} +{"@timestamp":"2024-07-01T18:00:00Z","host":"web-01","requests":120,"errors":1} +{"@timestamp":"2024-07-01T00:00:00Z","host":"web-02","requests":80,"errors":0} +{"@timestamp":"2024-07-01T06:00:00Z","host":"web-02","requests":90,"errors":1} +{"@timestamp":"2024-07-01T12:00:00Z","host":"web-02","requests":110,"errors":2} +{"@timestamp":"2024-07-01T18:00:00Z","host":"web-02","requests":70,"errors":0} +{"@timestamp":"2024-07-02T00:00:00Z","host":"web-01","requests":110,"errors":3} +{"@timestamp":"2024-07-02T06:00:00Z","host":"web-01","requests":160,"errors":4} +{"@timestamp":"2024-07-02T12:00:00Z","host":"web-01","requests":210,"errors":6} +{"@timestamp":"2024-07-02T18:00:00Z","host":"web-01","requests":130,"errors":2} +{"@timestamp":"2024-07-02T00:00:00Z","host":"web-02","requests":95,"errors":1} +{"@timestamp":"2024-07-02T06:00:00Z","host":"web-02","requests":100,"errors":2} +{"@timestamp":"2024-07-02T12:00:00Z","host":"web-02","requests":120,"errors":3} +{"@timestamp":"2024-07-02T18:00:00Z","host":"web-02","requests":85,"errors":1} +{"@timestamp":"2024-07-03T00:00:00Z","host":"web-01","requests":90,"errors":1} +{"@timestamp":"2024-07-03T06:00:00Z","host":"web-01","requests":140,"errors":2} +{"@timestamp":"2024-07-03T12:00:00Z","host":"web-01","requests":180,"errors":4} +{"@timestamp":"2024-07-03T18:00:00Z","host":"web-01","requests":100,"errors":1} +{"@timestamp":"2024-07-03T00:00:00Z","host":"web-02","requests":75,"errors":0} +{"@timestamp":"2024-07-03T06:00:00Z","host":"web-02","requests":85,"errors":1} +{"@timestamp":"2024-07-03T12:00:00Z","host":"web-02","requests":105,"errors":2} +{"@timestamp":"2024-07-03T18:00:00Z","host":"web-02","requests":65,"errors":0} diff --git a/doctest/test_docs.py b/doctest/test_docs.py index 6283252065..e179c85eb5 100644 --- a/doctest/test_docs.py +++ b/doctest/test_docs.py @@ -59,6 +59,7 @@ 'time_data2': 'time_test_data2.json', 'time_test': 'time_test.json', 'mvcombine_data': 'mvcombine.json', + 'timewrap_test': 'timewrap_test.json', } DEBUG_MODE = os.environ.get('DOCTEST_DEBUG', 'false').lower() == 'true' diff --git a/doctest/test_mapping/timewrap_test.json b/doctest/test_mapping/timewrap_test.json new file mode 100644 index 0000000000..8222188feb --- /dev/null +++ b/doctest/test_mapping/timewrap_test.json @@ -0,0 +1,19 @@ +{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + }, + "host": { + "type": "keyword" + }, + "requests": { + "type": "integer" + }, + "errors": { + "type": "integer" + } + } + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 95c47b9b0b..551603db72 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -563,6 +563,30 @@ public void testExplainTimechartPerDay() throws IOException { assertTrue(result.contains("per_day(cpu_usage)=[SUM($0)]")); } + @Test + public void testExplainTimewrap() throws IOException { + // Pin the align=end reference with a WHERE upper bound so the base_offset literal is + // deterministic (otherwise it falls back to the query clock). + var result = + explainQueryYaml( + "source=events | where @timestamp <= '2024-07-03 18:00:00'" + + " | timechart span=6h avg(cpu_usage) | timewrap 1day"); + String expected = loadExpectedPlan("explain_timewrap.yaml"); + assertYamlEqualsIgnoreId(expected, result); + } + + @Test + public void testExplainTimewrapMonth() throws IOException { + // Variable-length unit (month) exercises the EXTRACT-based calendar arithmetic branch, which + // produces a different plan from the fixed-length epoch-based path above. + var result = + explainQueryYaml( + "source=events | where @timestamp <= '2024-07-03 18:00:00'" + + " | timechart span=1d avg(cpu_usage) | timewrap 1month"); + String expected = loadExpectedPlan("explain_timewrap_month.yaml"); + assertYamlEqualsIgnoreId(expected, result); + } + @Test public void noPushDownForAggOnWindow() throws IOException { enabledOnlyWhenPushdownIsEnabled(); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java new file mode 100644 index 0000000000..9383e56f49 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java @@ -0,0 +1,762 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.util.MatcherUtils.*; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalciteTimewrapCommandIT extends PPLIntegTestCase { + + // Standard WHERE clause covering all test data — simulates frontend time picker + private static final String WHERE_ALL = + " | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-04 06:00:00'"; + private static final String WHERE_JUL1_TO_JUL3 = + " | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-03 18:00:00'"; + private static final String WHERE_JUL2_TO_JUL3 = + " | where @timestamp >= '2024-07-02 00:00:00' and @timestamp <= '2024-07-03 18:00:00'"; + private static final String WHERE_JUL1_ONLY = + " | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-01 18:00:00'"; + private static final String WHERE_JUL1_TO_JUL2 = + " | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-02 18:00:00'"; + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + disallowCalciteFallback(); + loadIndex(Index.TIMEWRAP_TEST); + } + + // --- Day-over-day with different aggregations --- + + @Test + public void testTimewrapDayOverDayWithSum() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_3days_before", "bigint"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + @Test + public void testTimewrapDayOverDayWithAvg() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h avg(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("avg(requests)_3days_before", "double"), + schema("avg(requests)_2days_before", "double"), + schema("avg(requests)_1day_before", "double"), + schema("avg(requests)_latest_day", "double")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 90.0, 102.5, 82.5, 40.0), + rows("2024-07-04 06:00:00", 120.0, 130.0, 112.5, 50.0), + rows("2024-07-04 12:00:00", null, 155.0, 165.0, 142.5), + rows("2024-07-04 18:00:00", null, 95.0, 107.5, 82.5)); + } + + @Test + public void testTimewrapDayOverDayWithCount() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + WHERE_ALL + " | timechart span=6h count() | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_3days_before", "bigint"), + schema("count()_2days_before", "bigint"), + schema("count()_1day_before", "bigint"), + schema("count()_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 2, 2, 2, 2), + rows("2024-07-04 06:00:00", 2, 2, 2, 2), + rows("2024-07-04 12:00:00", null, 2, 2, 2), + rows("2024-07-04 18:00:00", null, 2, 2, 2)); + } + + @Test + public void testTimewrapWithDifferentAggField() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(errors) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(errors)_3days_before", "bigint"), + schema("sum(errors)_2days_before", "bigint"), + schema("sum(errors)_1day_before", "bigint"), + schema("sum(errors)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 2, 4, 1, 0), + rows("2024-07-04 06:00:00", 6, 6, 3, 1), + rows("2024-07-04 12:00:00", null, 5, 9, 6), + rows("2024-07-04 18:00:00", null, 1, 3, 1)); + } + + // --- Incomplete period / null fill --- + + @Test + public void testTimewrapIncompletePeriodNullFill() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_3days_before", "bigint"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + // --- Different timescales --- + + @Test + public void testTimewrapWeekSpanSinglePeriod() throws IOException { + // 3 days of daily data in 1 week -> single period, 3 rows + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=1day sum(requests) | timewrap 1week"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_week", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-01 00:00:00", 920), + rows("2024-07-02 00:00:00", 1010), + rows("2024-07-03 00:00:00", 840)); + } + + @Test + public void testTimewrapTwelveHourSpan() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 12h"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_72hours_before", "bigint"), + schema("sum(requests)_60hours_before", "bigint"), + schema("sum(requests)_48hours_before", "bigint"), + schema("sum(requests)_36hours_before", "bigint"), + schema("sum(requests)_24hours_before", "bigint"), + schema("sum(requests)_12hours_before", "bigint"), + schema("sum(requests)_latest_hour", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 310, 205, 330, 165, 285, 80), + rows("2024-07-04 06:00:00", 240, 190, 260, 215, 225, 165, 100)); + } + + @Test + public void testTimewrapWithMinuteSpan() throws IOException { + loadIndex(Index.EVENTS); + JSONObject result = + executeQuery( + "source=events | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-01 00:04:00' | timechart span=1m count() | timewrap 1min"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_4minutes_before", "bigint"), + schema("count()_3minutes_before", "bigint"), + schema("count()_2minutes_before", "bigint"), + schema("count()_1minute_before", "bigint"), + schema("count()_latest_minute", "bigint")); + verifyDataRows(result, rows("2024-07-01 00:04:00", 1, 1, 1, 1, 1)); + } + + // --- WHERE clause with different time ranges --- + + @Test + public void testTimewrapWithWhereThreeDays() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); + } + + @Test + public void testTimewrapWithWhereTwoDays() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL2_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 205, 165), + rows("2024-07-03 06:00:00", 260, 225), + rows("2024-07-03 12:00:00", 330, 285), + rows("2024-07-03 18:00:00", 215, 165)); + } + + @Test + public void testTimewrapWithWhereSingleDay() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_ONLY + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-01 00:00:00", 180), + rows("2024-07-01 06:00:00", 240), + rows("2024-07-01 12:00:00", 310), + rows("2024-07-01 18:00:00", 190)); + } + + @Test + public void testTimewrapWithWhereAvg() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=6h avg(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("avg(requests)_1day_before", "double"), + schema("avg(requests)_latest_day", "double")); + verifyDataRowsInOrder( + result, + rows("2024-07-02 00:00:00", 90.0, 102.5), + rows("2024-07-02 06:00:00", 120.0, 130.0), + rows("2024-07-02 12:00:00", 155.0, 165.0), + rows("2024-07-02 18:00:00", 95.0, 107.5)); + } + + @Test + public void testTimewrapWithWhere12hSpan() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=6h sum(requests) | timewrap 12h"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_36hours_before", "bigint"), + schema("sum(requests)_24hours_before", "bigint"), + schema("sum(requests)_12hours_before", "bigint"), + schema("sum(requests)_latest_hour", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-02 12:00:00", 180, 310, 205, 330), + rows("2024-07-02 18:00:00", 240, 190, 260, 215)); + } + + @Test + public void testTimewrapWithWhereCount() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h count() | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_2days_before", "bigint"), + schema("count()_1day_before", "bigint"), + schema("count()_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 2, 2, 2), + rows("2024-07-03 06:00:00", 2, 2, 2), + rows("2024-07-03 12:00:00", 2, 2, 2), + rows("2024-07-03 18:00:00", 2, 2, 2)); + } + + @Test + public void testTimewrapWithWhereErrors() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL2_TO_JUL3 + + " | timechart span=6h sum(errors) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(errors)_1day_before", "bigint"), + schema("sum(errors)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 4, 1), + rows("2024-07-03 06:00:00", 6, 3), + rows("2024-07-03 12:00:00", 9, 6), + rows("2024-07-03 18:00:00", 3, 1)); + } + + // --- WHERE upper bound above data: shifts column numbers --- + + @Test + public void testTimewrapWithWhereUpperBoundAboveData() throws IOException { + // WHERE upper bound = July 10 (~5.75 days after max data July 4 06:00) + // baseOffset = floor(Jul10/86400) - floor(Jul4_06/86400) = 5 + // periodFromNow for oldest(rel=4): (5+4-1)*1=8, newest(rel=1): (5+1-1)*1=5 + JSONObject result = + executeQuery( + "source=timewrap_test | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-10 00:00:00' | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_8days_before", "bigint"), + schema("sum(requests)_7days_before", "bigint"), + schema("sum(requests)_6days_before", "bigint"), + schema("sum(requests)_5days_before", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + // --- align=end vs align=now --- + + @Test + public void testTimewrapAlignEndIsDefault() throws IOException { + JSONObject resultDefault = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day"); + JSONObject resultEnd = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day align=end"); + + verifySchema( + resultEnd, + schema("@timestamp", "timestamp"), + schema("sum(requests)_3days_before", "bigint"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + resultEnd, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + verifyDataRowsInOrder( + resultDefault, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + @Test + public void testTimewrapAlignNow() throws IOException { + // align=now uses current time — column names are dynamic + // Extract actual column names from the result for verification + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day align=now"); + + // Get actual column names from result schema + org.json.JSONArray schemaArr = result.getJSONArray("schema"); + String c1 = schemaArr.getJSONObject(1).getString("name"); + String c2 = schemaArr.getJSONObject(2).getString("name"); + String c3 = schemaArr.getJSONObject(3).getString("name"); + String c4 = schemaArr.getJSONObject(4).getString("name"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema(c1, "bigint"), + schema(c2, "bigint"), + schema(c3, "bigint"), + schema(c4, "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + // --- Every timescale --- + + @Test + public void testTimewrapSecondSpan() throws IOException { + // 5 events at minute-level, wrap by 1 minute (60sec) + // timechart span=1m gives 3 buckets (00:00, 01:00, 02:00) + // timewrap 1min: each bucket is in a different 1-minute period → 1 offset row, 3 periods + loadIndex(Index.EVENTS); + JSONObject result = + executeQuery( + "source=events | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-01 00:02:00' | timechart span=1m count() | timewrap 1min"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_2minutes_before", "bigint"), + schema("count()_1minute_before", "bigint"), + schema("count()_latest_minute", "bigint")); + verifyDataRows(result, rows("2024-07-01 00:02:00", 1, 1, 1)); + } + + @Test + public void testTimewrapMinuteSpan() throws IOException { + loadIndex(Index.EVENTS); + JSONObject result = + executeQuery( + "source=events | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-01 00:04:00' | timechart span=1m count() | timewrap 1min"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_4minutes_before", "bigint"), + schema("count()_3minutes_before", "bigint"), + schema("count()_2minutes_before", "bigint"), + schema("count()_1minute_before", "bigint"), + schema("count()_latest_minute", "bigint")); + verifyDataRows(result, rows("2024-07-01 00:04:00", 1, 1, 1, 1, 1)); + } + + @Test + public void testTimewrapHourSpan() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=6h sum(requests) | timewrap 12h"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_36hours_before", "bigint"), + schema("sum(requests)_24hours_before", "bigint"), + schema("sum(requests)_12hours_before", "bigint"), + schema("sum(requests)_latest_hour", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-02 12:00:00", 180, 310, 205, 330), + rows("2024-07-02 18:00:00", 240, 190, 260, 215)); + } + + @Test + public void testTimewrapDaySpan() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); + } + + @Test + public void testTimewrapWeekSpan() throws IOException { + // 2 days of daily data in 1 week -> single period, 2 rows + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=1day sum(requests) | timewrap 1week"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_week", "bigint")); + verifyDataRowsInOrder( + result, rows("2024-07-01 00:00:00", 920), rows("2024-07-02 00:00:00", 1010)); + } + + @Test + public void testTimewrapMonthSpan() throws IOException { + // Jul 1-4 only: all data within same month → single month period + JSONObject result = + executeQuery( + "source=timewrap_test | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-04 06:00:00' | timechart span=1day sum(requests) | timewrap 1month"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_month", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-01 00:00:00", 920), + rows("2024-07-02 00:00:00", 1010), + rows("2024-07-03 00:00:00", 840), + rows("2024-07-04 00:00:00", 180)); + } + + @Test + public void testTimewrapQuarterSpan() throws IOException { + // Jan 15 (Q1) and Apr 15 (Q2) → 2 quarter periods + // With precise day-within-quarter offset: Jan 15 = day 15 of Q1, Apr 15 = day 15 of Q2 + // Both are at the same offset (day 15) → they align on the same row + JSONObject result = + executeQuery( + "source=timewrap_test | where @timestamp >= '2024-01-15 00:00:00' and @timestamp <=" + + " '2024-04-15 12:00:00' | timechart span=1day sum(requests) | timewrap 1quarter"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_1quarter_before", "bigint"), + schema("sum(requests)_latest_quarter", "bigint")); + // Day 15 of each quarter aligns -- both values on the same row + verifyDataRows(result, rows("2024-04-15 00:00:00", 300, 350)); + } + + @Test + public void testTimewrapYearSpan() throws IOException { + // Jan 15 2024 (300) and Jan 15 2025 (400) -- 2 data points in 2 different years + // timechart span=1year: 2 yearly buckets + // timewrap 1year: 2 periods, 1 offset row + JSONObject result = + executeQuery( + "source=timewrap_test | where @timestamp >= '2024-01-15 12:00:00' and @timestamp <=" + + " '2025-01-15 12:00:00' | timechart span=1year sum(requests) | timewrap 1year"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_1year_before", "bigint"), + schema("sum(requests)_latest_year", "bigint")); + // 2024 yearly sum = all 2024 data in WHERE range; 2025 = Jan 15 only (400) + verifyDataRows(result, rows("2025-01-01 00:00:00", 4050, 400)); + } + + // --- series parameter --- + + @Test + public void testTimewrapSeriesRelativeIsDefault() throws IOException { + // series=relative is the default — same as no series parameter + JSONObject resultDefault = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day"); + JSONObject resultRelative = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day series=relative"); + + verifySchema( + resultRelative, + schema("@timestamp", "timestamp"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + resultRelative, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); + verifySchema( + resultDefault, + schema("@timestamp", "timestamp"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + resultDefault, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); + } + + @Test + public void testTimewrapSeriesShort() throws IOException { + // series=short: columns named _s + // With align=end and WHERE upper bound = Jul 3 18:00, baseOffset=0 + // Periods: oldest=2, middle=1, newest=0 + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day series=short"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_s2", "bigint"), + schema("sum(requests)_s1", "bigint"), + schema("sum(requests)_s0", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); + } + + @Test + public void testTimewrapSeriesShortWithCount() throws IOException { + // series=short with count aggregation + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL2_TO_JUL3 + + " | timechart span=6h count() | timewrap 1day series=short"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_s1", "bigint"), + schema("count()_s0", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 2, 2), + rows("2024-07-03 06:00:00", 2, 2), + rows("2024-07-03 12:00:00", 2, 2), + rows("2024-07-03 18:00:00", 2, 2)); + } + + @Test + public void testTimewrapSeriesShortWeekSpan() throws IOException { + // series=short with week span, single period = s0, daily buckets + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=1day sum(requests) | timewrap 1week series=short"); + + verifySchema(result, schema("@timestamp", "timestamp"), schema("sum(requests)_s0", "bigint")); + verifyDataRowsInOrder( + result, rows("2024-07-01 00:00:00", 920), rows("2024-07-02 00:00:00", 1010)); + } + + @Test + public void testTimewrapSeriesShortWithAvg() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=6h avg(requests) | timewrap 1day series=short"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("avg(requests)_s1", "double"), + schema("avg(requests)_s0", "double")); + verifyDataRowsInOrder( + result, + rows("2024-07-02 00:00:00", 90.0, 102.5), + rows("2024-07-02 06:00:00", 120.0, 130.0), + rows("2024-07-02 12:00:00", 155.0, 165.0), + rows("2024-07-02 18:00:00", 95.0, 107.5)); + } + + @Test + public void testTimewrapSeriesShortWithErrors() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL2_TO_JUL3 + + " | timechart span=6h sum(errors) | timewrap 1day series=short"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(errors)_s1", "bigint"), + schema("sum(errors)_s0", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 4, 1), + rows("2024-07-03 06:00:00", 6, 3), + rows("2024-07-03 12:00:00", 9, 6), + rows("2024-07-03 18:00:00", 3, 1)); + } + + // BY clause tests are pending -- blocked by timechart BY output format gap. + // See docs/dev/ppl-timewrap-command.md for design options. +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index ef05923b06..47d7494b3e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -984,7 +984,12 @@ public enum Index { "events_traffic", "events_traffic", getMappingFile("events_traffic_index_mapping.json"), - "src/test/resources/events_traffic.json"); + "src/test/resources/events_traffic.json"), + TIMEWRAP_TEST( + "timewrap_test", + "timewrap_test", + "{\"mappings\":{\"properties\":{\"@timestamp\":{\"type\":\"date\"},\"host\":{\"type\":\"keyword\"},\"requests\":{\"type\":\"integer\"},\"errors\":{\"type\":\"integer\"}}}}", + "src/test/resources/timewrap_test.json"); private final String name; private final String type; diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_timewrap.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_timewrap.yaml new file mode 100644 index 0000000000..62b38aafcf --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_timewrap.yaml @@ -0,0 +1,18 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC]) + LogicalProject(@timestamp=[FROM_UNIXTIME(+(-(MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER (), MOD(MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER (), 86400:BIGINT)), MOD(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL, 86400:BIGINT)))], avg(cpu_usage)=[$1], __base_offset__=[CAST(FLOOR(/(CAST(-(1720029600, MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER ())):DOUBLE NOT NULL, 86400:BIGINT))):BIGINT NOT NULL], __period__=[+(/(-(MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER (), CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL), 86400), 1)]) + LogicalSort(sort0=[$0], dir0=[ASC]) + LogicalProject(@timestamp=[$0], avg(cpu_usage)=[$1]) + LogicalAggregate(group=[{1}], avg(cpu_usage)=[AVG($0)]) + LogicalProject(cpu_usage=[$7], @timestamp0=[SPAN($1, 6, 'h')]) + LogicalFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($7))]) + LogicalFilter(condition=[<=($1, TIMESTAMP('2024-07-03 18:00:00':VARCHAR))]) + CalciteLogicalIndexScan(table=[[OpenSearch, events]]) + physical: | + CalciteEnumerableTopK(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC], fetch=[10000]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[86400:BIGINT], expr#5=[MOD($t3, $t4)], expr#6=[-($t3, $t5)], expr#7=[+($t6, $t2)], expr#8=[FROM_UNIXTIME($t7)], expr#9=[1720029600:BIGINT], expr#10=[-($t9, $t3)], expr#11=[CAST($t10):DOUBLE NOT NULL], expr#12=[/($t11, $t4)], expr#13=[FLOOR($t12)], expr#14=[CAST($t13):BIGINT NOT NULL], expr#15=[-($t3, $t1)], expr#16=[/($t15, $t4)], expr#17=[1:BIGINT], expr#18=[+($t16, $t17)], @timestamp=[$t8], avg(cpu_usage)=[$t0], __base_offset__=[$t14], __period__=[$t18]) + EnumerableWindow(window#0=[window(aggs [MAX($1)])]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[UNIX_TIMESTAMP($t0)], expr#3=[CAST($t2):BIGINT NOT NULL], expr#4=[86400:BIGINT], expr#5=[MOD($t3, $t4)], avg(cpu_usage)=[$t1], $1=[$t3], $2=[$t5]) + CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[FILTER->AND(<=($0, '2024-07-03 18:00:00'), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},avg(cpu_usage)=AVG($0)), SORT->[0]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"range":{"@timestamp":{"from":null,"to":"2024-07-03T18:00:00.000Z","include_lower":true,"include_upper":true,"format":"date_time","boost":1.0}}},{"exists":{"field":"cpu_usage","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"@timestamp0":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"6h"}}}]},"aggregations":{"avg(cpu_usage)":{"avg":{"field":"cpu_usage"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_timewrap_month.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_timewrap_month.yaml new file mode 100644 index 0000000000..dc9b7eb142 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_timewrap_month.yaml @@ -0,0 +1,18 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC]) + LogicalProject(@timestamp=[FROM_UNIXTIME(+(-(-(CAST(UNIX_TIMESTAMP(MAX($0) OVER ())):BIGINT NOT NULL, *(-(EXTRACT('DAY', MAX($0) OVER ()), 1), 86400)), +(+(*(EXTRACT('HOUR', MAX($0) OVER ()), 3600), *(EXTRACT('MINUTE', MAX($0) OVER ()), 60)), EXTRACT('SECOND', MAX($0) OVER ()))), -(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL, -(-(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL, *(-(EXTRACT('DAY', $0), 1), 86400)), +(+(*(EXTRACT('HOUR', $0), 3600), *(EXTRACT('MINUTE', $0), 60)), EXTRACT('SECOND', $0))))))], avg(cpu_usage)=[$1], __base_offset__=[-(24295, +(*(EXTRACT('YEAR', MAX($0) OVER ()), 12), EXTRACT('MONTH', MAX($0) OVER ())))], __period__=[+(-(+(*(EXTRACT('YEAR', MAX($0) OVER ()), 12), EXTRACT('MONTH', MAX($0) OVER ())), +(*(EXTRACT('YEAR', $0), 12), EXTRACT('MONTH', $0))), 1)]) + LogicalSort(sort0=[$0], dir0=[ASC]) + LogicalProject(@timestamp=[$0], avg(cpu_usage)=[$1]) + LogicalAggregate(group=[{1}], avg(cpu_usage)=[AVG($0)]) + LogicalProject(cpu_usage=[$7], @timestamp0=[SPAN($1, 1, 'd')]) + LogicalFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($7))]) + LogicalFilter(condition=[<=($1, TIMESTAMP('2024-07-03 18:00:00':VARCHAR))]) + CalciteLogicalIndexScan(table=[[OpenSearch, events]]) + physical: | + CalciteEnumerableTopK(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC], fetch=[10000]) + EnumerableCalc(expr#0..4=[{inputs}], expr#5=[UNIX_TIMESTAMP($t4)], expr#6=[CAST($t5):BIGINT NOT NULL], expr#7=['DAY'], expr#8=[EXTRACT($t7, $t4)], expr#9=[1:BIGINT], expr#10=[-($t8, $t9)], expr#11=[86400:BIGINT], expr#12=[*($t10, $t11)], expr#13=[-($t6, $t12)], expr#14=['HOUR'], expr#15=[EXTRACT($t14, $t4)], expr#16=[3600:BIGINT], expr#17=[*($t15, $t16)], expr#18=['MINUTE'], expr#19=[EXTRACT($t18, $t4)], expr#20=[60:BIGINT], expr#21=[*($t19, $t20)], expr#22=[+($t17, $t21)], expr#23=['SECOND'], expr#24=[EXTRACT($t23, $t4)], expr#25=[+($t22, $t24)], expr#26=[-($t13, $t25)], expr#27=[+($t26, $t2)], expr#28=[FROM_UNIXTIME($t27)], expr#29=[24295:BIGINT], expr#30=['YEAR'], expr#31=[EXTRACT($t30, $t4)], expr#32=[12:BIGINT], expr#33=[*($t31, $t32)], expr#34=['MONTH'], expr#35=[EXTRACT($t34, $t4)], expr#36=[+($t33, $t35)], expr#37=[-($t29, $t36)], expr#38=[-($t36, $t3)], expr#39=[+($t38, $t9)], @timestamp=[$t28], avg(cpu_usage)=[$t1], __base_offset__=[$t37], __period__=[$t39]) + EnumerableWindow(window#0=[window(aggs [MAX($0)])]) + EnumerableCalc(expr#0..1=[{inputs}], expr#2=[UNIX_TIMESTAMP($t0)], expr#3=[CAST($t2):BIGINT NOT NULL], expr#4=['DAY'], expr#5=[EXTRACT($t4, $t0)], expr#6=[1:BIGINT], expr#7=[-($t5, $t6)], expr#8=[86400:BIGINT], expr#9=[*($t7, $t8)], expr#10=[-($t3, $t9)], expr#11=['HOUR'], expr#12=[EXTRACT($t11, $t0)], expr#13=[3600:BIGINT], expr#14=[*($t12, $t13)], expr#15=['MINUTE'], expr#16=[EXTRACT($t15, $t0)], expr#17=[60:BIGINT], expr#18=[*($t16, $t17)], expr#19=[+($t14, $t18)], expr#20=['SECOND'], expr#21=[EXTRACT($t20, $t0)], expr#22=[+($t19, $t21)], expr#23=[-($t10, $t22)], expr#24=[-($t3, $t23)], expr#25=['YEAR'], expr#26=[EXTRACT($t25, $t0)], expr#27=[12:BIGINT], expr#28=[*($t26, $t27)], expr#29=['MONTH'], expr#30=[EXTRACT($t29, $t0)], expr#31=[+($t28, $t30)], proj#0..1=[{exprs}], $2=[$t24], $3=[$t31]) + CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[FILTER->AND(<=($0, '2024-07-03 18:00:00'), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},avg(cpu_usage)=AVG($0)), SORT->[0]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"range":{"@timestamp":{"from":null,"to":"2024-07-03T18:00:00.000Z","include_lower":true,"include_upper":true,"format":"date_time","boost":1.0}}},{"exists":{"field":"cpu_usage","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"@timestamp0":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"1d"}}}]},"aggregations":{"avg(cpu_usage)":{"avg":{"field":"cpu_usage"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_timewrap.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_timewrap.yaml new file mode 100644 index 0000000000..1775322b2c --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_timewrap.yaml @@ -0,0 +1,22 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC]) + LogicalProject(@timestamp=[FROM_UNIXTIME(+(-(MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER (), MOD(MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER (), 86400:BIGINT)), MOD(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL, 86400:BIGINT)))], avg(cpu_usage)=[$1], __base_offset__=[CAST(FLOOR(/(CAST(-(1720029600, MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER ())):DOUBLE NOT NULL, 86400:BIGINT))):BIGINT NOT NULL], __period__=[+(/(-(MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER (), CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL), 86400), 1)]) + LogicalSort(sort0=[$0], dir0=[ASC]) + LogicalProject(@timestamp=[$0], avg(cpu_usage)=[$1]) + LogicalAggregate(group=[{1}], avg(cpu_usage)=[AVG($0)]) + LogicalProject(cpu_usage=[$7], @timestamp0=[SPAN($1, 6, 'h')]) + LogicalFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($7))]) + LogicalFilter(condition=[<=($1, TIMESTAMP('2024-07-03 18:00:00':VARCHAR))]) + CalciteLogicalIndexScan(table=[[OpenSearch, events]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[86400:BIGINT], expr#5=[MOD($t3, $t4)], expr#6=[-($t3, $t5)], expr#7=[+($t6, $t2)], expr#8=[FROM_UNIXTIME($t7)], expr#9=[1720029600:BIGINT], expr#10=[-($t9, $t3)], expr#11=[CAST($t10):DOUBLE NOT NULL], expr#12=[/($t11, $t4)], expr#13=[FLOOR($t12)], expr#14=[CAST($t13):BIGINT NOT NULL], expr#15=[-($t3, $t1)], expr#16=[/($t15, $t4)], expr#17=[1:BIGINT], expr#18=[+($t16, $t17)], @timestamp=[$t8], avg(cpu_usage)=[$t0], __base_offset__=[$t14], __period__=[$t18]) + EnumerableWindow(window#0=[window(aggs [MAX($1)])]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[=($t2, $t3)], expr#5=[null:DOUBLE], expr#6=[CASE($t4, $t5, $t1)], expr#7=[/($t6, $t2)], expr#8=[UNIX_TIMESTAMP($t0)], expr#9=[CAST($t8):BIGINT NOT NULL], expr#10=[86400:BIGINT], expr#11=[MOD($t9, $t10)], avg(cpu_usage)=[$t7], $1=[$t9], $2=[$t11]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + EnumerableAggregate(group=[{1}], agg#0=[$SUM0($0)], agg#1=[COUNT($0)]) + EnumerableCalc(expr#0..15=[{inputs}], expr#16=[6], expr#17=['h'], expr#18=[SPAN($t1, $t16, $t17)], expr#19=['2024-07-03 18:00:00':EXPR_TIMESTAMP VARCHAR], expr#20=[<=($t1, $t19)], expr#21=[IS NOT NULL($t7)], expr#22=[AND($t20, $t21)], cpu_usage=[$t7], @timestamp0=[$t18], $condition=[$t22]) + CalciteEnumerableIndexScan(table=[[OpenSearch, events]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_timewrap_month.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_timewrap_month.yaml new file mode 100644 index 0000000000..df3e91e740 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_timewrap_month.yaml @@ -0,0 +1,22 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC]) + LogicalProject(@timestamp=[FROM_UNIXTIME(+(-(-(CAST(UNIX_TIMESTAMP(MAX($0) OVER ())):BIGINT NOT NULL, *(-(EXTRACT('DAY', MAX($0) OVER ()), 1), 86400)), +(+(*(EXTRACT('HOUR', MAX($0) OVER ()), 3600), *(EXTRACT('MINUTE', MAX($0) OVER ()), 60)), EXTRACT('SECOND', MAX($0) OVER ()))), -(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL, -(-(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL, *(-(EXTRACT('DAY', $0), 1), 86400)), +(+(*(EXTRACT('HOUR', $0), 3600), *(EXTRACT('MINUTE', $0), 60)), EXTRACT('SECOND', $0))))))], avg(cpu_usage)=[$1], __base_offset__=[-(24295, +(*(EXTRACT('YEAR', MAX($0) OVER ()), 12), EXTRACT('MONTH', MAX($0) OVER ())))], __period__=[+(-(+(*(EXTRACT('YEAR', MAX($0) OVER ()), 12), EXTRACT('MONTH', MAX($0) OVER ())), +(*(EXTRACT('YEAR', $0), 12), EXTRACT('MONTH', $0))), 1)]) + LogicalSort(sort0=[$0], dir0=[ASC]) + LogicalProject(@timestamp=[$0], avg(cpu_usage)=[$1]) + LogicalAggregate(group=[{1}], avg(cpu_usage)=[AVG($0)]) + LogicalProject(cpu_usage=[$7], @timestamp0=[SPAN($1, 1, 'd')]) + LogicalFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($7))]) + LogicalFilter(condition=[<=($1, TIMESTAMP('2024-07-03 18:00:00':VARCHAR))]) + CalciteLogicalIndexScan(table=[[OpenSearch, events]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC]) + EnumerableCalc(expr#0..4=[{inputs}], expr#5=[UNIX_TIMESTAMP($t4)], expr#6=[CAST($t5):BIGINT NOT NULL], expr#7=['DAY'], expr#8=[EXTRACT($t7, $t4)], expr#9=[1:BIGINT], expr#10=[-($t8, $t9)], expr#11=[86400:BIGINT], expr#12=[*($t10, $t11)], expr#13=[-($t6, $t12)], expr#14=['HOUR'], expr#15=[EXTRACT($t14, $t4)], expr#16=[3600:BIGINT], expr#17=[*($t15, $t16)], expr#18=['MINUTE'], expr#19=[EXTRACT($t18, $t4)], expr#20=[60:BIGINT], expr#21=[*($t19, $t20)], expr#22=[+($t17, $t21)], expr#23=['SECOND'], expr#24=[EXTRACT($t23, $t4)], expr#25=[+($t22, $t24)], expr#26=[-($t13, $t25)], expr#27=[+($t26, $t2)], expr#28=[FROM_UNIXTIME($t27)], expr#29=[24295:BIGINT], expr#30=['YEAR'], expr#31=[EXTRACT($t30, $t4)], expr#32=[12:BIGINT], expr#33=[*($t31, $t32)], expr#34=['MONTH'], expr#35=[EXTRACT($t34, $t4)], expr#36=[+($t33, $t35)], expr#37=[-($t29, $t36)], expr#38=[-($t36, $t3)], expr#39=[+($t38, $t9)], @timestamp=[$t28], avg(cpu_usage)=[$t1], __base_offset__=[$t37], __period__=[$t39]) + EnumerableWindow(window#0=[window(aggs [MAX($0)])]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[=($t2, $t3)], expr#5=[null:DOUBLE], expr#6=[CASE($t4, $t5, $t1)], expr#7=[/($t6, $t2)], expr#8=[UNIX_TIMESTAMP($t0)], expr#9=[CAST($t8):BIGINT NOT NULL], expr#10=['DAY'], expr#11=[EXTRACT($t10, $t0)], expr#12=[1:BIGINT], expr#13=[-($t11, $t12)], expr#14=[86400:BIGINT], expr#15=[*($t13, $t14)], expr#16=[-($t9, $t15)], expr#17=['HOUR'], expr#18=[EXTRACT($t17, $t0)], expr#19=[3600:BIGINT], expr#20=[*($t18, $t19)], expr#21=['MINUTE'], expr#22=[EXTRACT($t21, $t0)], expr#23=[60:BIGINT], expr#24=[*($t22, $t23)], expr#25=[+($t20, $t24)], expr#26=['SECOND'], expr#27=[EXTRACT($t26, $t0)], expr#28=[+($t25, $t27)], expr#29=[-($t16, $t28)], expr#30=[-($t9, $t29)], expr#31=['YEAR'], expr#32=[EXTRACT($t31, $t0)], expr#33=[12:BIGINT], expr#34=[*($t32, $t33)], expr#35=['MONTH'], expr#36=[EXTRACT($t35, $t0)], expr#37=[+($t34, $t36)], @timestamp0=[$t0], avg(cpu_usage)=[$t7], $2=[$t30], $3=[$t37]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + EnumerableAggregate(group=[{1}], agg#0=[$SUM0($0)], agg#1=[COUNT($0)]) + EnumerableCalc(expr#0..15=[{inputs}], expr#16=[1], expr#17=['d'], expr#18=[SPAN($t1, $t16, $t17)], expr#19=['2024-07-03 18:00:00':EXPR_TIMESTAMP VARCHAR], expr#20=[<=($t1, $t19)], expr#21=[IS NOT NULL($t7)], expr#22=[AND($t20, $t21)], cpu_usage=[$t7], @timestamp0=[$t18], $condition=[$t22]) + CalciteEnumerableIndexScan(table=[[OpenSearch, events]]) diff --git a/integ-test/src/test/resources/timewrap_test.json b/integ-test/src/test/resources/timewrap_test.json new file mode 100644 index 0000000000..5ba82f8fab --- /dev/null +++ b/integ-test/src/test/resources/timewrap_test.json @@ -0,0 +1,66 @@ +{"index":{"_id":"1"}} +{"@timestamp":"2024-07-01T00:00:00","host":"web-01","requests":100,"errors":2} +{"index":{"_id":"2"}} +{"@timestamp":"2024-07-01T06:00:00","host":"web-01","requests":150,"errors":5} +{"index":{"_id":"3"}} +{"@timestamp":"2024-07-01T12:00:00","host":"web-01","requests":200,"errors":3} +{"index":{"_id":"4"}} +{"@timestamp":"2024-07-01T18:00:00","host":"web-01","requests":120,"errors":1} +{"index":{"_id":"5"}} +{"@timestamp":"2024-07-01T00:00:00","host":"web-02","requests":80,"errors":0} +{"index":{"_id":"6"}} +{"@timestamp":"2024-07-01T06:00:00","host":"web-02","requests":90,"errors":1} +{"index":{"_id":"7"}} +{"@timestamp":"2024-07-01T12:00:00","host":"web-02","requests":110,"errors":2} +{"index":{"_id":"8"}} +{"@timestamp":"2024-07-01T18:00:00","host":"web-02","requests":70,"errors":0} +{"index":{"_id":"9"}} +{"@timestamp":"2024-07-02T00:00:00","host":"web-01","requests":110,"errors":3} +{"index":{"_id":"10"}} +{"@timestamp":"2024-07-02T06:00:00","host":"web-01","requests":160,"errors":4} +{"index":{"_id":"11"}} +{"@timestamp":"2024-07-02T12:00:00","host":"web-01","requests":210,"errors":6} +{"index":{"_id":"12"}} +{"@timestamp":"2024-07-02T18:00:00","host":"web-01","requests":130,"errors":2} +{"index":{"_id":"13"}} +{"@timestamp":"2024-07-02T00:00:00","host":"web-02","requests":95,"errors":1} +{"index":{"_id":"14"}} +{"@timestamp":"2024-07-02T06:00:00","host":"web-02","requests":100,"errors":2} +{"index":{"_id":"15"}} +{"@timestamp":"2024-07-02T12:00:00","host":"web-02","requests":120,"errors":3} +{"index":{"_id":"16"}} +{"@timestamp":"2024-07-02T18:00:00","host":"web-02","requests":85,"errors":1} +{"index":{"_id":"17"}} +{"@timestamp":"2024-07-03T00:00:00","host":"web-01","requests":90,"errors":1} +{"index":{"_id":"18"}} +{"@timestamp":"2024-07-03T06:00:00","host":"web-01","requests":140,"errors":2} +{"index":{"_id":"19"}} +{"@timestamp":"2024-07-03T12:00:00","host":"web-01","requests":180,"errors":4} +{"index":{"_id":"20"}} +{"@timestamp":"2024-07-03T18:00:00","host":"web-01","requests":100,"errors":1} +{"index":{"_id":"21"}} +{"@timestamp":"2024-07-03T00:00:00","host":"web-02","requests":75,"errors":0} +{"index":{"_id":"22"}} +{"@timestamp":"2024-07-03T06:00:00","host":"web-02","requests":85,"errors":1} +{"index":{"_id":"23"}} +{"@timestamp":"2024-07-03T12:00:00","host":"web-02","requests":105,"errors":2} +{"index":{"_id":"24"}} +{"@timestamp":"2024-07-03T18:00:00","host":"web-02","requests":65,"errors":0} +{"index":{"_id":"25"}} +{"@timestamp":"2024-07-04T00:00:00","host":"web-01","requests":50,"errors":0} +{"index":{"_id":"26"}} +{"@timestamp":"2024-07-04T06:00:00","host":"web-01","requests":60,"errors":1} +{"index":{"_id":"27"}} +{"@timestamp":"2024-07-04T00:00:00","host":"web-02","requests":30,"errors":0} +{"index":{"_id":"28"}} +{"@timestamp":"2024-07-04T06:00:00","host":"web-02","requests":40,"errors":0} +{"index":{"_id":"29"}} +{"@timestamp":"2024-01-15T12:00:00","host":"web-01","requests":300,"errors":5} +{"index":{"_id":"30"}} +{"@timestamp":"2024-04-15T12:00:00","host":"web-01","requests":350,"errors":3} +{"index":{"_id":"31"}} +{"@timestamp":"2024-06-15T12:00:00","host":"web-01","requests":200,"errors":2} +{"index":{"_id":"32"}} +{"@timestamp":"2024-08-15T12:00:00","host":"web-01","requests":250,"errors":4} +{"index":{"_id":"33"}} +{"@timestamp":"2025-01-15T12:00:00","host":"web-01","requests":400,"errors":6} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 32b7891d34..d72c07a5f7 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -42,6 +42,7 @@ import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners; import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.calcite.utils.TimewrapPivot; import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.data.model.ExprTupleValue; @@ -320,6 +321,23 @@ private QueryResponse buildResultSet( } columns.add(new Column(columnName, null, exprType)); } + // Timewrap post-processing: pivot unpivoted rows into period columns. The pivot is shared with + // the analytics route (AnalyticsExecutionEngine) so both engines produce identical output. + if (TimewrapPivot.isTimewrap()) { + try { + TimewrapPivot.Result pivoted = + TimewrapPivot.pivot( + columns, + values, + CalcitePlanContext.timewrapUnitName.get(), + CalcitePlanContext.timewrapSeries.get()); + columns = pivoted.columns(); + values = pivoted.values(); + } finally { + CalcitePlanContext.clearTimewrapSignals(); + } + } + Schema schema = new Schema(columns); QueryResponse response = new QueryResponse(schema, values, null); return response; diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 84ee147d34..36cc60d58a 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -222,6 +222,12 @@ private void doExecute( } } catch (Exception e) { closingListener.onFailure(e); + } finally { + // visitTimewrap (run inside planner.plan) sets timewrap thread-locals on this + // worker thread. execute()/executeWithProfile() capture-and-clear them on the + // happy path, but a planning exception bypasses that — clear here so the + // signals never leak onto the next query reusing this pooled thread. + CalcitePlanContext.clearTimewrapSignals(); } }), new TimeValue(0), @@ -269,6 +275,11 @@ private void doExplain( analyticsEngine.explain(plan, mode, planContext, listener); } catch (Exception e) { listener.onFailure(e); + } finally { + // explain plans a timewrap query (visitTimewrap sets thread-locals) but never + // executes, so nothing captures-and-clears them — clear here to avoid leaking + // onto the next query on this pooled thread. + CalcitePlanContext.clearTimewrapSignals(); } }), new TimeValue(0), diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 4bc69a8f29..4f712042ed 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -55,6 +55,12 @@ APPENDCOL: 'APPENDCOL'; ADDTOTALS: 'ADDTOTALS'; ADDCOLTOTALS: 'ADDCOLTOTALS'; GRAPHLOOKUP: 'GRAPHLOOKUP'; +TIMEWRAP: 'TIMEWRAP'; +ALIGN: 'ALIGN'; +SERIES: 'SERIES'; +RELATIVE: 'RELATIVE'; +SHORT: 'SHORT'; +EXACT: 'EXACT'; EDGE: 'EDGE'; MAX_DEPTH: 'MAXDEPTH'; DEPTH_FIELD: 'DEPTHFIELD'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 98f85b0828..1465554206 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -98,6 +98,7 @@ commands | nomvCommand | graphLookupCommand | unionCommand + | timewrapCommand ; commandName @@ -149,6 +150,7 @@ commandName | NOMV | TRANSPOSE | GRAPHLOOKUP + | TIMEWRAP ; searchCommand @@ -355,6 +357,27 @@ transposeParameter | (COLUMN_NAME EQUAL stringLiteral) ; +timewrapCommand + : TIMEWRAP spanLiteral timewrapParameter* + ; + +timewrapParameter + : ALIGN EQUAL timewrapAlign + | SERIES EQUAL timewrapSeries + | TIME_FORMAT EQUAL stringLiteral + ; + +timewrapAlign + : NOW + | END + ; + +timewrapSeries + : RELATIVE + | SHORT + | EXACT + ; + timechartParameter : LIMIT EQUAL integerLiteral diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 3741137f5a..a74423cd41 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -64,6 +64,7 @@ import org.opensearch.sql.ast.expression.SearchAnd; import org.opensearch.sql.ast.expression.SearchExpression; import org.opensearch.sql.ast.expression.SearchGroup; +import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.expression.UnresolvedArgument; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.ast.expression.WindowFrame; @@ -118,6 +119,7 @@ import org.opensearch.sql.ast.tree.StreamWindow; import org.opensearch.sql.ast.tree.SubqueryAlias; import org.opensearch.sql.ast.tree.TableFunction; +import org.opensearch.sql.ast.tree.Timewrap; import org.opensearch.sql.ast.tree.Transpose; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.Union; @@ -822,6 +824,39 @@ public UnresolvedPlan visitTimechartCommand(OpenSearchPPLParser.TimechartCommand .build(); } + /** Timewrap command. */ + @Override + public UnresolvedPlan visitTimewrapCommand(OpenSearchPPLParser.TimewrapCommandContext ctx) { + Literal spanLiteral = (Literal) expressionBuilder.visit(ctx.spanLiteral()); + String spanText = spanLiteral.getValue().toString(); + String valueStr = spanText.replaceAll("[^0-9]", ""); + String unitStr = spanText.replaceAll("[0-9]", ""); + int value = valueStr.isEmpty() ? 1 : Integer.parseInt(valueStr); + SpanUnit unit = SpanUnit.of(unitStr); + if (unit == SpanUnit.UNKNOWN || unit == SpanUnit.NONE) { + throw new SemanticCheckException("Invalid timewrap span unit: " + unitStr); + } + String align = "end"; + String series = "relative"; + String timeFormat = null; + for (var param : ctx.timewrapParameter()) { + if (param.timewrapAlign() != null) { + align = param.timewrapAlign().getText().toLowerCase(); + } else if (param.timewrapSeries() != null) { + series = param.timewrapSeries().getText().toLowerCase(); + } else if (param.TIME_FORMAT() != null) { + timeFormat = param.stringLiteral().getText(); + // Strip surrounding quotes + if (timeFormat.length() >= 2 + && ((timeFormat.startsWith("\"") && timeFormat.endsWith("\"")) + || (timeFormat.startsWith("'") && timeFormat.endsWith("'")))) { + timeFormat = timeFormat.substring(1, timeFormat.length() - 1); + } + } + } + return new Timewrap(unit, value, align, series, timeFormat, spanLiteral); + } + /** Eval command. */ @Override public UnresolvedPlan visitEvalCommand(EvalCommandContext ctx) { diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 4b75d44446..eb99a4b438 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -105,6 +105,7 @@ import org.opensearch.sql.ast.tree.StreamWindow; import org.opensearch.sql.ast.tree.SubqueryAlias; import org.opensearch.sql.ast.tree.TableFunction; +import org.opensearch.sql.ast.tree.Timewrap; import org.opensearch.sql.ast.tree.Transpose; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.Union; @@ -626,6 +627,25 @@ public String visitReverse(Reverse node, String context) { return StringUtils.format("%s | reverse", child); } + @Override + public String visitTimewrap(Timewrap node, String context) { + String child = node.getChild().get(0).accept(this, context); + StringBuilder command = new StringBuilder(); + // span magnitude is masked like other span literals (see visitChart); align/series are + // constrained keywords, not user data, so they are rendered verbatim. + command.append(" | timewrap ").append(MASK_LITERAL); + if (node.getAlign() != null) { + command.append(" align=").append(node.getAlign()); + } + if (node.getSeries() != null) { + command.append(" series=").append(node.getSeries()); + } + if (node.getTimeFormat() != null) { + command.append(" time_format=").append(MASK_LITERAL); + } + return StringUtils.format("%s%s", child, command); + } + @Override public String visitChart(Chart node, String context) { String child = node.getChild().get(0).accept(this, context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimewrapTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimewrapTest.java new file mode 100644 index 0000000000..66027839f8 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimewrapTest.java @@ -0,0 +1,174 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +/** + * Unit tests for the {@code timewrap} command's Calcite plan construction. Timewrap reshapes {@code + * timechart} output, so every query pipes through {@code timechart span=...} first. The pivot + * itself is post-processing in the execution engine (see {@code TimewrapPivot}); these tests cover + * the {@link org.opensearch.sql.calcite.CalciteRelNodeVisitor#visitTimewrap} lowering — the + * unpivoted RelNode and its generated Spark SQL. + */ +public class CalcitePPLTimewrapTest extends CalcitePPLAbstractTest { + + public CalcitePPLTimewrapTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + ImmutableList rows = + ImmutableList.of( + new Object[] {java.sql.Timestamp.valueOf("2024-07-01 00:00:00"), 180}, + new Object[] {java.sql.Timestamp.valueOf("2024-07-01 06:00:00"), 240}, + new Object[] {java.sql.Timestamp.valueOf("2024-07-02 00:00:00"), 205}, + new Object[] {java.sql.Timestamp.valueOf("2024-07-03 00:00:00"), 165}); + schema.add("events", new EventsTable(rows)); + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + // align=end query with a deterministic WHERE upper bound (2024-07-03 18:00:00 = epoch + // 1720029600), so the base_offset reference is stable across runs rather than the query clock. + private static final String TIMEWRAP_DAY = + "source=events | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-03 18:00:00' | timechart span=6h sum(value) | timewrap 1day align=end"; + + @Test + public void testTimewrapDayProducesUnpivotedPlan() { + RelNode root = getRelNode(TIMEWRAP_DAY); + // The pivot is post-processing in the execution engine; the RelNode is intentionally unpivoted: + // [display_ts, value, __base_offset__, __period__], sorted by (display_ts, period). base_offset + // uses FLOOR(.../span) (not truncating integer divide) so future-dated align=now stays correct. + String expectedLogical = + "LogicalSort(sort0=[$0], sort1=[$3], dir0=[ASC], dir1=[ASC])\n" + + " LogicalProject(@timestamp=[FROM_UNIXTIME(+(-(MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT" + + " NOT NULL) OVER (), MOD(MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER ()," + + " 86400:BIGINT)), MOD(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL, 86400:BIGINT)))]," + + " sum(value)=[$1], __base_offset__=[CAST(FLOOR(/(CAST(-(1720029600," + + " MAX(CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL) OVER ())):DOUBLE NOT NULL," + + " 86400:BIGINT))):BIGINT NOT NULL], __period__=[+(/(-(MAX(CAST(UNIX_TIMESTAMP($0))" + + ":BIGINT NOT NULL) OVER (), CAST(UNIX_TIMESTAMP($0)):BIGINT NOT NULL), 86400), 1)])\n" + + " LogicalSort(sort0=[$0], dir0=[ASC])\n" + + " LogicalProject(@timestamp=[$0], sum(value)=[$1])\n" + + " LogicalAggregate(group=[{1}], sum(value)=[SUM($0)])\n" + + " LogicalProject(value=[$1], @timestamp0=[SPAN($0, 6, 'h')])\n" + + " LogicalFilter(condition=[AND(>=($0, TIMESTAMP('2024-07-01" + + " 00:00:00':VARCHAR)), <=($0, TIMESTAMP('2024-07-03 18:00:00':VARCHAR)), IS NOT" + + " NULL($1))])\n" + + " LogicalTableScan(table=[[scott, events]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testTimewrapDaySparkSql() { + RelNode root = getRelNode(TIMEWRAP_DAY); + String expectedSparkSql = + "SELECT FROM_UNIXTIME((MAX(CAST(UNIX_TIMESTAMP(`@timestamp`) AS BIGINT)) OVER (RANGE" + + " BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) -" + + " MOD(MAX(CAST(UNIX_TIMESTAMP(`@timestamp`) AS BIGINT)) OVER (RANGE BETWEEN UNBOUNDED" + + " PRECEDING AND UNBOUNDED FOLLOWING), 86400) + MOD(CAST(UNIX_TIMESTAMP(`@timestamp`)" + + " AS BIGINT), 86400)) `@timestamp`, `sum(value)`, CAST(FLOOR(CAST(1720029600 -" + + " (MAX(CAST(UNIX_TIMESTAMP(`@timestamp`) AS BIGINT)) OVER (RANGE BETWEEN UNBOUNDED" + + " PRECEDING AND UNBOUNDED FOLLOWING)) AS DOUBLE) / 86400) AS BIGINT)" + + " `__base_offset__`, ((MAX(CAST(UNIX_TIMESTAMP(`@timestamp`) AS BIGINT)) OVER (RANGE" + + " BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) -" + + " CAST(UNIX_TIMESTAMP(`@timestamp`) AS BIGINT)) / 86400 + 1 `__period__`\n" + + "FROM (SELECT SPAN(`@timestamp`, 6, 'h') `@timestamp`, SUM(`value`) `sum(value)`\n" + + "FROM `scott`.`events`\n" + + "WHERE `@timestamp` >= TIMESTAMP('2024-07-01 00:00:00') AND `@timestamp` <=" + + " TIMESTAMP('2024-07-03 18:00:00') AND `value` IS NOT NULL\n" + + "GROUP BY SPAN(`@timestamp`, 6, 'h')\n" + + "ORDER BY 1 NULLS LAST) `t3`\n" + + "ORDER BY 1 NULLS LAST, 4 NULLS LAST"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + /** Minimal time-series table: a nullable timestamp and an integer measure. */ + public static class EventsTable implements ScannableTable { + private final ImmutableList rows; + + public EventsTable(ImmutableList rows) { + this.rows = rows; + } + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("@timestamp", SqlTypeName.TIMESTAMP) + .nullable(true) + .add("value", SqlTypeName.INTEGER) + .nullable(true) + .build(); + + @Override + public Enumerable<@Nullable Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 6756cdc198..9ea21684ed 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -283,6 +283,17 @@ public void testTimechartCommand() { anonymize("source=t | timechart timefield=month max(revenue)")); } + @Test + public void testTimewrapCommand() { + assertEquals( + "source=table | timechart count() | timewrap *** align=end series=relative", + anonymize("source=t | timechart count() | timewrap 1day")); + + assertEquals( + "source=table | timechart count() | timewrap *** align=now series=short", + anonymize("source=t | timechart count() | timewrap 1week align=now series=short")); + } + @Test public void testChartCommand() { assertEquals(