diff --git a/src/main/java/com/github/jinba1/blazedb/BlazeDB.java b/src/main/java/com/github/jinba1/blazedb/BlazeDB.java index b3a5761..1ad901a 100644 --- a/src/main/java/com/github/jinba1/blazedb/BlazeDB.java +++ b/src/main/java/com/github/jinba1/blazedb/BlazeDB.java @@ -9,6 +9,7 @@ import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; +import com.github.jinba1.blazedb.operator.LimitOperator; import com.github.jinba1.blazedb.operator.Operator; /** @@ -76,10 +77,6 @@ static int run(String[] args) { } Operator rootOp = planned.root(); - if (rootOp == null) { - System.err.println("Error: query could not be planned"); - return 1; - } if (maxTuples != null || timeoutMs != null) { rootOp.attachBudget(new QueryBudget(maxTuples, timeoutMs)); } @@ -91,6 +88,12 @@ static int run(String[] args) { } catch (IOException e) { System.err.println("Error: failed to write output: " + e.getMessage()); return 1; + } catch (RuntimeException e) { + // Engine bug, not a user error — fail with an exit code instead of an + // uncaught-exception crash; the trace stays on stderr for bug reports + System.err.println("Internal error: " + e); + e.printStackTrace(); + return 1; } } @@ -104,8 +107,9 @@ private static void usage() { * on the root object of the operator tree. Writes the result to `outputFile`. * @param root The root operator of the operator tree (assumed to be non-null). * @param outputFile The name of the file where the result will be written. + * @return Execution metadata: rows written, LIMIT truncation, refine hint. */ - public static void execute(Operator root, String outputFile) { + public static QueryResult execute(Operator root, String outputFile) { File outputFileObj = new File(outputFile); File parentDir = outputFileObj.getParentFile(); if (parentDir != null && !parentDir.exists()) { @@ -117,6 +121,7 @@ public static void execute(Operator root, String outputFile) { List headers = root.getContext().getOrderedColumnNames(root.propagateSchemaId()); CSVFormat format = CSVFormat.RFC4180.builder().setRecordSeparator("\n").build(); + long rows = 0; try { try (CSVPrinter printer = new CSVPrinter(new FileWriter(outputFile), format)) { printer.printRecord(headers); @@ -127,21 +132,30 @@ public static void execute(Operator root, String outputFile) { fields.add(v.toString()); } printer.printRecord(fields); + rows++; } } - } catch (QueryExecutionException e) { + } catch (RuntimeException e) { + // QueryExecutionException and internal errors alike: never leave a + // truncated file that looks like a complete result deletePartialOutput(outputFileObj, outputFile); throw e; } catch (IOException e) { // Disk full, permissions, output path is a directory, ... — swallowing this // would make a broken or missing file look like success to callers deletePartialOutput(outputFileObj, outputFile); - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.DATA_ERROR, "Failed to write output file '" + outputFile + "': " + e.getMessage()); } + // The planner places Limit topmost, so the root knows whether the cap cut the result + boolean truncated = root instanceof LimitOperator limitOp && limitOp.wasTruncated(); + QueryResult result = truncated ? QueryResult.truncated(rows) : QueryResult.complete(rows); + System.out.println("Query executed successfully!"); System.out.println("Output file: " + outputFile); + System.out.println("Rows: " + rows + (truncated ? " (truncated; more rows exist)" : "")); + return result; } /** Never leave a truncated file that looks like a complete result. */ diff --git a/src/main/java/com/github/jinba1/blazedb/DBCatalog.java b/src/main/java/com/github/jinba1/blazedb/DBCatalog.java index 32da284..c30c553 100644 --- a/src/main/java/com/github/jinba1/blazedb/DBCatalog.java +++ b/src/main/java/com/github/jinba1/blazedb/DBCatalog.java @@ -83,7 +83,8 @@ public static synchronized void resetDBCatalog() { private void loadDBCatalog(String dBDirectory) { Path dataPath = Paths.get(dBDirectory).resolve(Constants.DATA_DIRECTORY_NAME); if (!Files.isDirectory(dataPath)) { - throw new QueryExecutionException("Data directory not found: " + dataPath); + throw new QueryExecutionException(ErrorCode.DATA_ERROR, + "Data directory not found: " + dataPath); } try (Stream files = Files.list(dataPath)) { List csvs = files @@ -96,7 +97,8 @@ private void loadDBCatalog(String dBDirectory) { loadTable(tableName, csv); } } catch (IOException e) { - throw new QueryExecutionException("Error scanning data directory " + dataPath + ": " + e.getMessage()); + throw new QueryExecutionException(ErrorCode.DATA_ERROR, + "Error scanning data directory " + dataPath + ": " + e.getMessage()); } } @@ -107,7 +109,7 @@ private void loadTable(String tableName, Path csv) throws IOException { try (CSVParser parser = CSVParser.parse(csv, StandardCharsets.UTF_8, format)) { Iterator it = parser.iterator(); if (!it.hasNext()) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.DATA_ERROR, "Table '" + tableName + "' has no header row (" + csv + ")"); } CSVRecord header = it.next(); @@ -116,7 +118,7 @@ private void loadTable(String tableName, Path csv) throws IOException { for (int i = 0; i < width; i++) { String col = header.get(i).trim().toLowerCase(); if (columnMap.putIfAbsent(col, i) != null) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.DATA_ERROR, "Table '" + tableName + "' has duplicate column '" + col + "' in header"); } } @@ -128,7 +130,8 @@ private void loadTable(String tableName, Path csv) throws IOException { CSVRecord record = it.next(); rowNum++; if (record.size() != width) { - throw new QueryExecutionException("Table '" + tableName + "' row " + rowNum + throw new QueryExecutionException(ErrorCode.DATA_ERROR, + "Table '" + tableName + "' row " + rowNum + ": expected " + width + " fields, found " + record.size()); } for (int i = 0; i < width; i++) { @@ -187,6 +190,30 @@ public List getColumnTypes(String tableName) { return dbColumnTypes.get(tableName); } + /** + * Returns all table names in the catalog, sorted — for agent-legible + * "Available tables: ..." error messages. + * @return The sorted list of loaded table names + */ + public List getTableNames() { + List names = new ArrayList<>(dbLocations.keySet()); + Collections.sort(names); + return names; + } + + /** + * Builds the standard unknown-table failure, listing what is available so an + * agent caller can self-correct. One construction site keeps the wording from + * drifting between the FROM-clause and WHERE-clause detection paths. + * @param tableName The table name that failed to resolve + * @return The exception to throw; never null + */ + public QueryExecutionException unknownTable(String tableName) { + return new QueryExecutionException(ErrorCode.UNKNOWN_TABLE, + "Table '" + tableName + "' not found. Available tables: " + + String.join(", ", getTableNames()) + "."); + } + /** * Checks if a specified table exists in the database. * @param tableName The name of the table to check diff --git a/src/main/java/com/github/jinba1/blazedb/ErrorCode.java b/src/main/java/com/github/jinba1/blazedb/ErrorCode.java new file mode 100644 index 0000000..509aec9 --- /dev/null +++ b/src/main/java/com/github/jinba1/blazedb/ErrorCode.java @@ -0,0 +1,25 @@ +package com.github.jinba1.blazedb; + +/** + * Machine-readable category for a {@link QueryExecutionException}. Downstream surfaces + * (CLI today, REST/MCP later) classify failures by code instead of parsing message text: + * user errors map to 4xx-style handling, {@link #INTERNAL} to 5xx. + */ +public enum ErrorCode { + /** The SQL text could not be parsed. */ + PARSE_ERROR, + /** The SQL parsed but uses constructs the engine does not support. */ + UNSUPPORTED_SQL, + /** A referenced table does not exist in the catalog. */ + UNKNOWN_TABLE, + /** A referenced column does not exist in the schema searched. */ + UNKNOWN_COLUMN, + /** Values of incompatible types were compared, joined, or aggregated. */ + TYPE_MISMATCH, + /** The query exceeded its tuple or time budget. */ + BUDGET_EXCEEDED, + /** Source-data or I/O problem: unreadable file, malformed row, numeric overflow. */ + DATA_ERROR, + /** An engine invariant broke — a bug, not a user error. */ + INTERNAL +} diff --git a/src/main/java/com/github/jinba1/blazedb/ExpressionEvaluator.java b/src/main/java/com/github/jinba1/blazedb/ExpressionEvaluator.java index 515536b..ece34f4 100644 --- a/src/main/java/com/github/jinba1/blazedb/ExpressionEvaluator.java +++ b/src/main/java/com/github/jinba1/blazedb/ExpressionEvaluator.java @@ -68,8 +68,6 @@ public boolean evaluate(Expression expression, Tuple tuple) { throw new RuntimeException("Expression evaluation did not produce a result"); } - boolean result = resultStack.peek(); - return resultStack.pop(); } @@ -205,8 +203,7 @@ public void visit(Column column) { } if (colIdx == null) { - throw new RuntimeException("Column '" + tableName + "." + columnName + - " not found in schema " + schemaId); + throw ctx.unknownColumn(tableName, columnName, schemaId); } valueStack.push(currentTuple.getAttribute(colIdx)); @@ -239,6 +236,18 @@ public void visit(net.sf.jsqlparser.expression.StringValue stringValue) { */ @Override public void visitBinaryExpression(BinaryExpression expression) { + // Reject before evaluating children: an unsupported operator (e.g. OR) has + // boolean operands that push nothing onto the value stack, so popping below + // would fail with an opaque EmptyStackException instead of this message + if (!(expression instanceof Multiplication + || expression instanceof EqualsTo || expression instanceof NotEqualsTo + || expression instanceof GreaterThan || expression instanceof GreaterThanEquals + || expression instanceof MinorThan || expression instanceof MinorThanEquals)) { + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, + "Unsupported condition '" + expression + + "'; supported comparators: =, !=, <, <=, >, >= combined with AND"); + } + expression.getLeftExpression().accept(this); expression.getRightExpression().accept(this); @@ -248,7 +257,8 @@ public void visitBinaryExpression(BinaryExpression expression) { if (expression instanceof Multiplication) { if (!(left instanceof IntValue l) || !(right instanceof IntValue r)) { - throw new QueryExecutionException("Arithmetic requires int operands: cannot multiply " + throw new QueryExecutionException(ErrorCode.TYPE_MISMATCH, + "Arithmetic requires int operands: cannot multiply " + left.typeName() + " '" + left + "' with " + right.typeName() + " '" + right + "'"); } valueStack.push(new IntValue(l.v() * r.v())); @@ -256,7 +266,7 @@ public void visitBinaryExpression(BinaryExpression expression) { } if (left.getClass() != right.getClass()) { - throw new QueryExecutionException("Type mismatch: cannot compare " + throw new QueryExecutionException(ErrorCode.TYPE_MISMATCH, "Type mismatch: cannot compare " + left.typeName() + " '" + left + "' with " + right.typeName() + " '" + right + "'"); } @@ -270,10 +280,8 @@ public void visitBinaryExpression(BinaryExpression expression) { resultStack.push(left.compareTo(right) >= 0); } else if (expression instanceof MinorThan) { resultStack.push(left.compareTo(right) < 0); - } else if (expression instanceof MinorThanEquals) { + } else { // MinorThanEquals: the only type left after the guard above resultStack.push(left.compareTo(right) <= 0); - } else { - throw new UnsupportedOperationException("Unsupported binary expression: " + expression.getClass().getName()); } } /** diff --git a/src/main/java/com/github/jinba1/blazedb/ExpressionPreprocessor.java b/src/main/java/com/github/jinba1/blazedb/ExpressionPreprocessor.java index 664d343..5e68606 100644 --- a/src/main/java/com/github/jinba1/blazedb/ExpressionPreprocessor.java +++ b/src/main/java/com/github/jinba1/blazedb/ExpressionPreprocessor.java @@ -30,6 +30,7 @@ */ public class ExpressionPreprocessor extends ExpressionVisitorAdapter { + private final PlanContext ctx; private final Stack tableStack; // track table references private final List joinExpressions; private final List selectExpressions; @@ -38,8 +39,10 @@ public class ExpressionPreprocessor extends ExpressionVisitorAdapter { /** * Constructs a new ExpressionPreprocessor. * Initializes internal data structures for tracking table references and expressions. + * @param ctx The per-query context, used to build resolution-failure messages */ - public ExpressionPreprocessor() { + public ExpressionPreprocessor(PlanContext ctx) { + this.ctx = ctx; tableStack = new Stack<>(); joinExpressions = new LinkedList<>(); selectExpressions = new LinkedList<>(); @@ -164,7 +167,7 @@ public void visit(MinorThanEquals minorThanEquals) { * Visits a column reference and pushes its table name onto the stack. * Verifies that the column exists in the database schema. * @param column The column reference to visit - * @throws UnsupportedOperationException If the column or table doesn't exist + * @throws QueryExecutionException If the column or table doesn't exist */ @Override public void visit(Column column) { @@ -174,10 +177,12 @@ public void visit(Column column) { if (DBCatalog.getInstance().columnExists(tableName, columnName)) { tableStack.push(tableName); } else { - throw new UnsupportedOperationException(String.format("Table %s does not have column %s", tableName, columnName)); + // base-table name doubles as its schema id, so the shared + // construction site applies here too + throw ctx.unknownColumn(tableName, columnName, tableName); } } else { - throw new UnsupportedOperationException(String.format("Table %s does not exist", tableName)); + throw DBCatalog.getInstance().unknownTable(tableName); } } @@ -199,22 +204,27 @@ public void visit(LongValue longValue) { */ @Override public void visitBinaryExpression(BinaryExpression expression) { + // Reject before visiting children: an unsupported operator (e.g. OR) has + // comparison operands that push nothing onto the table stack, so popping + // below would fail with an opaque EmptyStackException instead of this message + if (!(expression instanceof EqualsTo || expression instanceof NotEqualsTo + || expression instanceof GreaterThanEquals || expression instanceof GreaterThan + || expression instanceof MinorThan || expression instanceof MinorThanEquals)) { + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, + "Unsupported condition '" + expression + + "'; supported comparators: =, !=, <, <=, >, >= combined with AND"); + } + expression.getLeftExpression().accept(this); expression.getRightExpression().accept(this); String rightTable = tableStack.pop(); String leftTable = tableStack.pop(); - if (expression instanceof EqualsTo || expression instanceof NotEqualsTo - || expression instanceof GreaterThanEquals || expression instanceof GreaterThan - || expression instanceof MinorThan || expression instanceof MinorThanEquals) { - if ((rightTable != null && leftTable != null) && (!rightTable.equals(leftTable))) { - joinExpressions.add(expression); - } else { - selectExpressions.add(expression); - } + if ((rightTable != null && leftTable != null) && (!rightTable.equals(leftTable))) { + joinExpressions.add(expression); } else { - throw new UnsupportedOperationException("Unsupported binary expression: " + expression.getClass().getName()); + selectExpressions.add(expression); } } diff --git a/src/main/java/com/github/jinba1/blazedb/IntValue.java b/src/main/java/com/github/jinba1/blazedb/IntValue.java index a9b7e08..ca4a22d 100644 --- a/src/main/java/com/github/jinba1/blazedb/IntValue.java +++ b/src/main/java/com/github/jinba1/blazedb/IntValue.java @@ -8,7 +8,7 @@ public int compareTo(Value other) { if (other instanceof IntValue o) { return Integer.compare(v, o.v); } - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.TYPE_MISMATCH, "Type mismatch: cannot compare int value '" + v + "' with " + other.typeName() + " value '" + other + "'"); } diff --git a/src/main/java/com/github/jinba1/blazedb/PlanContext.java b/src/main/java/com/github/jinba1/blazedb/PlanContext.java index 0a6c9f1..1317527 100644 --- a/src/main/java/com/github/jinba1/blazedb/PlanContext.java +++ b/src/main/java/com/github/jinba1/blazedb/PlanContext.java @@ -117,6 +117,43 @@ private Integer smartResolveColumnIndex(String schemaId, String tableName, Strin return index; } + /** + * Comma-separated resolvable column names of a schema, in column order — for + * agent-legible error messages. Alias keys mapping to the same index are all + * listed, because each is a name the caller may legally write. + */ + public String availableColumns(String schemaId) { + return formatColumns(getSchema(schemaId)); + } + + /** + * Builds the standard unknown-column failure for a schema lookup miss, listing + * what is available so an agent caller can self-correct. One construction site + * keeps the wording aligned across projection, sort, aggregate, and evaluation. + * @param tableName The table qualifier as written in the query + * @param columnName The column name that failed to resolve + * @param schemaId The schema the lookup ran against + * @return The exception to throw; never null + */ + public QueryExecutionException unknownColumn(String tableName, String columnName, + String schemaId) { + return new QueryExecutionException(ErrorCode.UNKNOWN_COLUMN, + "Column '" + tableName + "." + columnName + "' not found. Available: " + + availableColumns(schemaId) + "."); + } + + /** Formats a schema map as a comma-separated column list, ordered by index then name. */ + public static String formatColumns(Map schema) { + if (schema == null || schema.isEmpty()) { + return "(none)"; + } + return schema.entrySet().stream() + .sorted(Map.Entry.comparingByValue() + .thenComparing(Map.Entry.comparingByKey())) + .map(Map.Entry::getKey) + .collect(java.util.stream.Collectors.joining(", ")); + } + /** * Result-header names for a schema, in column order. Ported verbatim from * DBCatalog.getOrderedColumnNames: bare-ifies plain columns, keeps aggregate diff --git a/src/main/java/com/github/jinba1/blazedb/PlannedQuery.java b/src/main/java/com/github/jinba1/blazedb/PlannedQuery.java index fa9750c..f2b4e67 100644 --- a/src/main/java/com/github/jinba1/blazedb/PlannedQuery.java +++ b/src/main/java/com/github/jinba1/blazedb/PlannedQuery.java @@ -5,7 +5,7 @@ /** * Result of planning one query file. * - * @param root the (optimized) executable operator tree, or null if planning failed + * @param root the (optimized) executable operator tree; never null (planning failures throw) * @param explainText for EXPLAIN queries, the rendered before/after plan text; null otherwise */ public record PlannedQuery(Operator root, String explainText) { diff --git a/src/main/java/com/github/jinba1/blazedb/QueryBudget.java b/src/main/java/com/github/jinba1/blazedb/QueryBudget.java index be9b461..6c51950 100644 --- a/src/main/java/com/github/jinba1/blazedb/QueryBudget.java +++ b/src/main/java/com/github/jinba1/blazedb/QueryBudget.java @@ -50,6 +50,28 @@ public void charge() { } } + /** + * Checks only the time limit, without counting a tuple. Blocking operators call this + * inside their build/drain loops, where {@link #charge()} is unreachable until the + * phase completes — otherwise a query stuck in a long build that emits nothing would + * escape the timeout. Starts the clock if nothing has yet, so a stall before the + * first emission is covered too. + * @throws QueryBudgetExceededException when the time limit is exceeded + */ + public void checkDeadline() { + if (timeoutMs == null) { + return; + } + if (!started) { + started = true; + deadlineNanos = System.nanoTime() + timeoutMs * 1_000_000; + } + if (System.nanoTime() >= deadlineNanos) { + throw new QueryBudgetExceededException( + "Time budget exceeded: limit " + timeoutMs + " ms"); + } + } + /** Total tuples charged so far. */ public long processed() { return processed; diff --git a/src/main/java/com/github/jinba1/blazedb/QueryBudgetExceededException.java b/src/main/java/com/github/jinba1/blazedb/QueryBudgetExceededException.java index d8003d3..c43035d 100644 --- a/src/main/java/com/github/jinba1/blazedb/QueryBudgetExceededException.java +++ b/src/main/java/com/github/jinba1/blazedb/QueryBudgetExceededException.java @@ -5,7 +5,11 @@ * Subclass of QueryExecutionException so existing error handling paths apply. */ public class QueryBudgetExceededException extends QueryExecutionException { + /** + * @param message which limit was exceeded and by how much; the error code is + * always {@link ErrorCode#BUDGET_EXCEEDED} + */ public QueryBudgetExceededException(String message) { - super(message); + super(ErrorCode.BUDGET_EXCEEDED, message); } } diff --git a/src/main/java/com/github/jinba1/blazedb/QueryExecutionException.java b/src/main/java/com/github/jinba1/blazedb/QueryExecutionException.java index 8767bbc..47fbe4c 100644 --- a/src/main/java/com/github/jinba1/blazedb/QueryExecutionException.java +++ b/src/main/java/com/github/jinba1/blazedb/QueryExecutionException.java @@ -1,12 +1,28 @@ package com.github.jinba1.blazedb; /** - * Thrown when query execution fails for a data- or type-related reason. - * Messages state the operation, the column/literal involved, and both types, - * so callers (and downstream LLM agents) can self-correct. + * Thrown when a query fails for a reason the caller can act on. Messages state the + * operation, the table/column/literal involved, and what is actually available, + * so callers (and downstream LLM agents) can self-correct; the {@link ErrorCode} + * classifies the failure without message parsing. */ public class QueryExecutionException extends RuntimeException { + + private final ErrorCode code; + + /** Uncategorized failure; defaults to {@link ErrorCode#INTERNAL}. */ public QueryExecutionException(String message) { + this(ErrorCode.INTERNAL, message); + } + + /** Categorized failure; {@code code} classifies the error for programmatic handling. */ + public QueryExecutionException(ErrorCode code, String message) { super(message); + this.code = code; + } + + /** The machine-readable category of this failure. */ + public ErrorCode code() { + return code; } } diff --git a/src/main/java/com/github/jinba1/blazedb/QueryPlanner.java b/src/main/java/com/github/jinba1/blazedb/QueryPlanner.java index 92b6bde..643a03e 100644 --- a/src/main/java/com/github/jinba1/blazedb/QueryPlanner.java +++ b/src/main/java/com/github/jinba1/blazedb/QueryPlanner.java @@ -7,6 +7,7 @@ import net.sf.jsqlparser.expression.LongValue; import net.sf.jsqlparser.expression.operators.conditional.AndExpression; import net.sf.jsqlparser.expression.operators.relational.*; +import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.parser.CCJSqlParserUtil; import net.sf.jsqlparser.schema.Column; import net.sf.jsqlparser.schema.Table; @@ -15,6 +16,7 @@ import net.sf.jsqlparser.statement.select.*; import java.io.FileReader; +import java.io.IOException; import java.util.*; /** @@ -38,7 +40,8 @@ public class QueryPlanner { * Parses an SQL statement from a file and constructs a query plan. * Kept for existing callers; EXPLAIN-aware callers should use {@link #planQuery}. * @param filename The path to a file containing a valid SQL query - * @return The root operator of the constructed query plan, or null if parsing fails + * @return The root operator of the constructed query plan + * @throws QueryExecutionException if the file is unreadable or the query cannot be planned */ public static Operator parseStatement(String filename) { return planQuery(filename).root(); @@ -48,7 +51,8 @@ public static Operator parseStatement(String filename) { * Parses an SQL statement with explicit configuration and constructs a query plan. * @param filename The path to a file containing a valid SQL query * @param config The per-query planner configuration - * @return The root operator of the constructed query plan, or null if parsing fails + * @return The root operator of the constructed query plan + * @throws QueryExecutionException if the file is unreadable or the query cannot be planned */ public static Operator parseStatement(String filename, QueryConfig config) { return planQuery(filename, config).root(); @@ -57,7 +61,8 @@ public static Operator parseStatement(String filename, QueryConfig config) { /** * Plans one query file under the production default configuration. * @param filename The path to a file containing a valid SQL query (optionally EXPLAIN-prefixed) - * @return The planned query; root is null if parsing fails + * @return The planned query; the root is never null + * @throws QueryExecutionException if the file is unreadable or the query cannot be planned */ public static PlannedQuery planQuery(String filename) { return planQuery(filename, QueryConfig.defaults()); @@ -69,45 +74,52 @@ public static PlannedQuery planQuery(String filename) { * the optimized, executable tree either way. * @param filename The path to a file containing a valid SQL query (optionally EXPLAIN-prefixed) * @param config The per-query planner configuration - * @return The planned query; root is null if parsing fails + * @return The planned query; the root is never null + * @throws QueryExecutionException if the file is unreadable or the query cannot be planned, + * with an {@link ErrorCode} and a message the caller can act on */ public static PlannedQuery planQuery(String filename, QueryConfig config) { PlanContext ctx = new PlanContext(config); - Operator rootOp = null; - boolean explain = false; + + Statement statement; try { - Statement statement = CCJSqlParserUtil.parse(new FileReader(filename)); - - Select select = null; - if (statement instanceof ExplainStatement explainStatement) { - explain = true; - select = explainStatement.getStatement(); - } else if (statement != null) { - select = (Select) statement; - } + statement = CCJSqlParserUtil.parse(new FileReader(filename)); + } catch (JSQLParserException e) { + throw new QueryExecutionException(ErrorCode.PARSE_ERROR, + "SQL syntax error: " + parserMessage(e)); + } catch (IOException e) { + throw new QueryExecutionException(ErrorCode.DATA_ERROR, + "Could not read query file '" + filename + "': " + e.getMessage()); + } - if (select != null) { - rootOp = buildOperatorTree(ctx, select); - } - } catch (QueryExecutionException e) { - throw e; - } catch (Exception e) { - System.err.println("Exception occurred during parsing"); - e.printStackTrace(); + boolean explain = false; + Select select; + if (statement instanceof ExplainStatement explainStatement) { + explain = true; + select = explainStatement.getStatement(); + } else if (statement instanceof Select s) { + select = s; + } else { + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, + "Only SELECT queries are supported; got " + + (statement == null ? "an empty statement" + : statement.getClass().getSimpleName().toUpperCase())); } + Operator rootOp = buildOperatorTree(ctx, select); + // Ensure schemas are properly registered ensureAllSchemasRegistered(rootOp); - String beforeText = (explain && rootOp != null) ? PlanPrinter.print(rootOp) : null; + String beforeText = explain ? PlanPrinter.print(rootOp) : null; - // Apply query optimization if enabled (skip when planning failed: root is null) - if (config.useQueryOptimization() && rootOp != null) { + // Apply query optimization if enabled + if (config.useQueryOptimization()) { rootOp = QueryPlanOptimizer.optimize(ctx, rootOp); } String explainText = null; - if (explain && rootOp != null) { + if (explain) { explainText = "=== Plan (as written) ===\n" + beforeText + "\n=== Plan (optimized) ===\n" + PlanPrinter.print(rootOp); } @@ -115,6 +127,18 @@ public static PlannedQuery planQuery(String filename, QueryConfig config) { return new PlannedQuery(rootOp, explainText); } + /** + * Trims a JSqlParser failure to its useful head: the unexpected token and position. + * The full message appends every token the grammar would accept — hundreds of lines + * of noise for an agent that just needs to see what was wrong. + */ + private static String parserMessage(JSQLParserException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + String message = cause.getMessage() != null ? cause.getMessage() : "unparseable SQL"; + int cut = message.indexOf("\nWas expecting"); + return (cut > 0 ? message.substring(0, cut) : message).strip(); + } + /** * Builds the unoptimized operator tree for a SELECT (the pipeline previously inlined * in parseStatement): scan, joins/selection, aggregation, projection, distinct/order, @@ -158,7 +182,7 @@ private static Operator createScanOperator(PlanContext ctx, Select select) { * @return The root operator of the join tree */ private static Operator processJoins(PlanContext ctx, Operator rootOp, Select select) { - ExpressionPreprocessor preprocessor = new ExpressionPreprocessor(); + ExpressionPreprocessor preprocessor = new ExpressionPreprocessor(ctx); Expression whereExpression = select.getPlainSelect().getWhere(); @@ -286,14 +310,15 @@ private static Operator processLimit(PlanContext ctx, Operator rootOp, Select se return rootOp; } if (limit.getOffset() != null) { - throw new QueryExecutionException("OFFSET is not supported; use plain LIMIT n"); + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, + "OFFSET is not supported; use plain LIMIT n"); } if (limit.isLimitAll() || limit.isLimitNull()) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, "LIMIT ALL / LIMIT NULL are not supported; use LIMIT n with n >= 0"); } if (!(limit.getRowCount() instanceof LongValue rowCount) || rowCount.getValue() < 0) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, "LIMIT requires a non-negative integer literal; got '" + limit.getRowCount() + "'"); } @@ -356,23 +381,23 @@ private static void validateAggregateQuery(Select select) { continue; // aggregate call — validated by extractAggregateCalls } if (expr instanceof AllColumns) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, "SELECT * cannot be combined with aggregates or GROUP BY; " + "list the columns explicitly"); } if (expr instanceof Column column) { if (!hasGroupBy) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, "Non-aggregate column '" + column + "' in SELECT with aggregates requires GROUP BY"); } if (!groupByKeys.contains(columnKey(column))) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, "Column '" + column + "' in SELECT must appear in GROUP BY"); } continue; } - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, "Unsupported SELECT item '" + expr + "' with aggregates or GROUP BY; " + "only aggregate functions and grouped columns are allowed"); } @@ -399,9 +424,9 @@ private static List extractGroupByColumns(Select select) { if (expr instanceof Column) { Column column = (Column) expr; groupByColumns.add(column); -// column.getTable().getName() + "." + column.getColumnName()); } else { - throw new UnsupportedOperationException("Only column references are supported in GROUP BY"); + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, + "GROUP BY supports only column references; got '" + expr + "'"); } } @@ -436,13 +461,13 @@ private static List extractAggregateCalls(Select select) { && function.getParameters().get(0) instanceof AllColumns); if (isStar) { if (aggregateFunction != AggregateFunction.COUNT) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, function.getName() + "(*) is not supported; only COUNT(*) may use '*'"); } argument = null; } else { if (function.getParameters().size() != 1) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, function.getName() + " expects exactly one argument, got " + function.getParameters().size() + ": '" + function + "'"); } @@ -454,7 +479,7 @@ private static List extractAggregateCalls(Select select) { argument.accept(extractor); for (Column argColumn : extractor.getColumns()) { if (argColumn.getTable() == null || argColumn.getTable().getName() == null) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, "Aggregate arguments must use qualified column names " + "(table.column): '" + function + "'"); } @@ -565,7 +590,8 @@ private static List getSortCols(Select select) { Column column = (Column) exp; sortCols.add(column); } else { - throw new RuntimeException("Unexpected item: " + orderByElement + " of type " + orderByElements.getClass()); + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, + "ORDER BY supports only column references; got '" + orderByElement + "'"); } } return sortCols; @@ -586,7 +612,8 @@ private static List getProjectCols(Select select) { Column column = (Column) exp; projectCols.add(column); } else { - throw new RuntimeException("Unexpected item: " + item + " of type " + item.getClass()); + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, + "SELECT supports only column references or aggregate calls; got '" + exp + "'"); } } return projectCols; @@ -631,7 +658,9 @@ private static List getTablesInOrder(Select select) { List
tables = new ArrayList<>(); for (Join join : select.getPlainSelect().getJoins()) { if (!(join.getRightItem() instanceof Table)) { - throw new UnsupportedOperationException("All joined items must be tables"); + throw new QueryExecutionException(ErrorCode.UNSUPPORTED_SQL, + "Unsupported FROM item '" + join.getRightItem() + + "'; only plain tables can be joined"); } Table joinTable = (Table) join.getRightItem(); tables.add(joinTable); diff --git a/src/main/java/com/github/jinba1/blazedb/QueryResult.java b/src/main/java/com/github/jinba1/blazedb/QueryResult.java new file mode 100644 index 0000000..c22d1d4 --- /dev/null +++ b/src/main/java/com/github/jinba1/blazedb/QueryResult.java @@ -0,0 +1,24 @@ +package com.github.jinba1.blazedb; + +/** + * Execution metadata for one completed query: rows written, whether a LIMIT cut the + * result short, and a refinement hint for agent callers when it did. The output file + * itself stays pure data; this record travels back to the caller (CLI today, the + * REST gateway later). + * + * @param rows number of data rows written (excluding the header) + * @param truncated true when a LIMIT stopped the query although more rows existed + * @param hint how to refine the query when truncated; null otherwise + */ +public record QueryResult(long rows, boolean truncated, String hint) { + + static QueryResult complete(long rows) { + return new QueryResult(rows, false, null); + } + + static QueryResult truncated(long rows) { + return new QueryResult(rows, true, + "Result truncated at " + rows + " rows; more rows exist. " + + "Narrow with a WHERE filter or raise LIMIT."); + } +} diff --git a/src/main/java/com/github/jinba1/blazedb/StringValue.java b/src/main/java/com/github/jinba1/blazedb/StringValue.java index 1be2d1d..e32c795 100644 --- a/src/main/java/com/github/jinba1/blazedb/StringValue.java +++ b/src/main/java/com/github/jinba1/blazedb/StringValue.java @@ -8,7 +8,7 @@ public int compareTo(Value other) { if (other instanceof StringValue o) { return v.compareTo(o.v); } - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.TYPE_MISMATCH, "Type mismatch: cannot compare string value '" + v + "' with " + other.typeName() + " value '" + other + "'"); } diff --git a/src/main/java/com/github/jinba1/blazedb/operator/Accumulator.java b/src/main/java/com/github/jinba1/blazedb/operator/Accumulator.java index 40861f0..97ce115 100644 --- a/src/main/java/com/github/jinba1/blazedb/operator/Accumulator.java +++ b/src/main/java/com/github/jinba1/blazedb/operator/Accumulator.java @@ -1,6 +1,7 @@ package com.github.jinba1.blazedb.operator; import com.github.jinba1.blazedb.AggregateCall; +import com.github.jinba1.blazedb.ErrorCode; import com.github.jinba1.blazedb.IntValue; import com.github.jinba1.blazedb.QueryExecutionException; import com.github.jinba1.blazedb.Value; @@ -52,7 +53,7 @@ class IntSumAccumulator implements Accumulator { @Override public void add(Value value) { if (!(value instanceof IntValue iv)) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.TYPE_MISMATCH, (average ? "AVG" : "SUM") + " requires int values; got " + value.typeName() + " value '" + value + "' in '" + schemaKey + "'"); } @@ -64,7 +65,7 @@ public void add(Value value) { public Value result() { long raw = average ? sum / count : sum; if (raw < Integer.MIN_VALUE || raw > Integer.MAX_VALUE) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.DATA_ERROR, (average ? "AVG" : "SUM") + " overflow: " + raw + " exceeds int range in '" + schemaKey + "'"); } @@ -93,7 +94,7 @@ public void add(Value value) { @Override public Value result() { if (count > Integer.MAX_VALUE) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.DATA_ERROR, "COUNT overflow: " + count + " exceeds int range in '" + schemaKey + "'"); } return new IntValue((int) count); diff --git a/src/main/java/com/github/jinba1/blazedb/operator/AggregateOperator.java b/src/main/java/com/github/jinba1/blazedb/operator/AggregateOperator.java index 722e958..b256d0d 100644 --- a/src/main/java/com/github/jinba1/blazedb/operator/AggregateOperator.java +++ b/src/main/java/com/github/jinba1/blazedb/operator/AggregateOperator.java @@ -124,7 +124,9 @@ public Tuple getNextTuple() { if (groupKeyIndex != -1) { resultAttributes.add(groupKeys.get(groupKeyIndex)); } else { - throw new RuntimeException("Output column not found in group by columns"); + // planner validation guarantees output ⊆ group-by; reaching here is a bug + throw new QueryExecutionException(ErrorCode.INTERNAL, + "Output column at index " + outputIndex + " not found in GROUP BY columns"); } } @@ -152,6 +154,7 @@ public Tuple getNextTuple() { private void processChildTuples() { Tuple tuple; while ((tuple = child.getNextTuple()) != null) { + checkBudgetDeadline(); // accumulation emits nothing; the timeout must still reach it List groupKey = new ArrayList<>(); for (Integer index : groupByIndices) { groupKey.add(tuple.getAttribute(index)); @@ -251,8 +254,7 @@ protected void registerSchema() { // Record source column Integer sourceIndex = ctx.resolveColumnWithOrigins(childSchemaId, tableName, columnName); if (sourceIndex == null) { - throw new RuntimeException("Column " + tableName + "." + columnName + - " not found in schema " + childSchemaId); + throw ctx.unknownColumn(tableName, columnName, childSchemaId); } transformationDetails.put(key, "group_by:" + sourceIndex); diff --git a/src/main/java/com/github/jinba1/blazedb/operator/DuplicateEliminationOperator.java b/src/main/java/com/github/jinba1/blazedb/operator/DuplicateEliminationOperator.java index 228b38e..d21a951 100644 --- a/src/main/java/com/github/jinba1/blazedb/operator/DuplicateEliminationOperator.java +++ b/src/main/java/com/github/jinba1/blazedb/operator/DuplicateEliminationOperator.java @@ -68,6 +68,7 @@ private void processChildTuples() { // Process each tuple from the child operator Tuple childTuple; while ((childTuple = child.getNextTuple()) != null) { + checkBudgetDeadline(); // dedup emits nothing; the timeout must still reach it // The HashSet will use Tuple's equals() and hashCode() methods // to determine if a tuple is already in the set if (uniqueSet.add(childTuple)) { diff --git a/src/main/java/com/github/jinba1/blazedb/operator/HashJoinOperator.java b/src/main/java/com/github/jinba1/blazedb/operator/HashJoinOperator.java index 088486d..7ffc8ad 100644 --- a/src/main/java/com/github/jinba1/blazedb/operator/HashJoinOperator.java +++ b/src/main/java/com/github/jinba1/blazedb/operator/HashJoinOperator.java @@ -1,5 +1,6 @@ package com.github.jinba1.blazedb.operator; +import com.github.jinba1.blazedb.ErrorCode; import com.github.jinba1.blazedb.ExpressionEvaluator; import com.github.jinba1.blazedb.PlanContext; import com.github.jinba1.blazedb.QueryExecutionException; @@ -64,6 +65,7 @@ public Tuple getNextTuple() { while (true) { if (currentBucket != null) { while (currentBucket.hasNext()) { + checkBudgetDeadline(); // a skewed key's bucket can spin long without emitting Tuple combined = combineTuples(currentOuterTuple, currentBucket.next()); if (probeEvaluator.evaluate(getJoinCondition(), combined)) { countTuple(); @@ -90,6 +92,7 @@ private void prepare() { buildTable = new HashMap<>(); Tuple tuple; while ((tuple = getChild().getNextTuple()) != null) { + checkBudgetDeadline(); // the build emits nothing; the timeout must still reach it buildTable.computeIfAbsent(extractKey(tuple, innerKeyIndices), k -> new ArrayList<>()) .add(tuple); } @@ -128,7 +131,8 @@ private void deriveKeys() { } if (outerKeyIndices.isEmpty()) { - throw new QueryExecutionException( + // the planner only selects hash join after hasEquiConjunct(); reaching here is a bug + throw new QueryExecutionException(ErrorCode.INTERNAL, "Hash join selected for condition without a cross-side equality: '" + getJoinCondition() + "'"); } @@ -158,7 +162,7 @@ private void checkKeyTypes(List outerKey) { List sample = buildTable.keySet().iterator().next(); for (int i = 0; i < outerKey.size(); i++) { if (!outerKey.get(i).getClass().equals(sample.get(i).getClass())) { - throw new QueryExecutionException( + throw new QueryExecutionException(ErrorCode.TYPE_MISMATCH, "Type mismatch in join key: cannot compare " + outerKey.get(i).typeName() + " with " + sample.get(i).typeName() + " in '" + getJoinCondition() + "'"); diff --git a/src/main/java/com/github/jinba1/blazedb/operator/JoinOperator.java b/src/main/java/com/github/jinba1/blazedb/operator/JoinOperator.java index d56ce4a..c1c79f0 100644 --- a/src/main/java/com/github/jinba1/blazedb/operator/JoinOperator.java +++ b/src/main/java/com/github/jinba1/blazedb/operator/JoinOperator.java @@ -188,7 +188,9 @@ protected void registerSchema() { Map leftSchemaMap = getSchemaMap(leftSchemaId); Map rightSchemaMap = getSchemaMap(rightSchemaId); if (leftSchemaMap == null || rightSchemaMap == null) { - throw new RuntimeException("Could not retrieve schemas for join"); + String missing = leftSchemaMap == null ? leftSchemaId : rightSchemaId; + throw new QueryExecutionException(ErrorCode.INTERNAL, + "Could not retrieve schema '" + missing + "' for join"); } // Create combined schema diff --git a/src/main/java/com/github/jinba1/blazedb/operator/LimitOperator.java b/src/main/java/com/github/jinba1/blazedb/operator/LimitOperator.java index f7739b7..99d5668 100644 --- a/src/main/java/com/github/jinba1/blazedb/operator/LimitOperator.java +++ b/src/main/java/com/github/jinba1/blazedb/operator/LimitOperator.java @@ -1,6 +1,8 @@ package com.github.jinba1.blazedb.operator; +import com.github.jinba1.blazedb.ErrorCode; import com.github.jinba1.blazedb.PlanContext; +import com.github.jinba1.blazedb.QueryExecutionException; import com.github.jinba1.blazedb.Tuple; import java.util.HashMap; @@ -11,11 +13,17 @@ * It emits at most {@code limit} tuples from its child, then reports EOF. * The planner places it at the top of the plan, above DISTINCT and ORDER BY, * so it caps the final result. + * + *

When the cap is hit, the operator peeks the child once more so + * {@link #wasTruncated()} can tell a truncated result from one with exactly + * {@code limit} rows — agent callers decide whether to refine based on it. */ public class LimitOperator extends Operator { private final long limit; private long emitted = 0; + private boolean peeked = false; + private boolean truncated = false; /** * Constructs a LimitOperator capping the child's output. @@ -33,6 +41,7 @@ public LimitOperator(PlanContext ctx, Operator child, long limit) { @Override public Tuple getNextTuple() { if (emitted >= limit) { + peekForMore(); return null; } Tuple tuple = child.getNextTuple(); @@ -43,10 +52,39 @@ public Tuple getNextTuple() { return tuple; } + /** Pulls the child once past the cap; a row there means the result was truncated. */ + private void peekForMore() { + if (peeked) { + return; + } + peeked = true; + try { + truncated = child.getNextTuple() != null; + } catch (QueryExecutionException e) { + if (e.code() == ErrorCode.INTERNAL) { + throw e; // engine bug — never mask it as truncation metadata + } + // The capped result is already complete; rows past the cap are not part + // of the answer, so neither budget exhaustion nor a bad row there may + // fail the query. Either reads honestly as "result may be incomplete". + truncated = true; + } + } + + /** + * Whether the child had more rows than the cap. Only meaningful after the drain + * completed (this operator returned null); false before that. + */ + public boolean wasTruncated() { + return truncated; + } + @Override public void reset() { child.reset(); emitted = 0; + peeked = false; + truncated = false; } @Override diff --git a/src/main/java/com/github/jinba1/blazedb/operator/Operator.java b/src/main/java/com/github/jinba1/blazedb/operator/Operator.java index 62ceb92..ee0c73b 100644 --- a/src/main/java/com/github/jinba1/blazedb/operator/Operator.java +++ b/src/main/java/com/github/jinba1/blazedb/operator/Operator.java @@ -82,7 +82,20 @@ protected final void countTuple() { } /** - * Attaches a query budget to this operator and its entire subtree. + * Checks the time budget without counting a tuple. Blocking operators call this + * once per iteration of their build/drain loops, where {@link #countTuple()} is + * unreachable until the phase completes. + */ + protected final void checkBudgetDeadline() { + if (budget != null) { + budget.checkDeadline(); + } + } + + /** + * Attaches a query budget to this operator and its entire subtree. May be called + * at any point — operators consult the budget on each tuple/check, so attaching + * mid-execution simply starts enforcement from that point on. * @param budget The budget shared by all operators of one query */ public void attachBudget(QueryBudget budget) { @@ -177,7 +190,7 @@ public void updateSchema() { * @param schemaId The schema identifier to resolve against * @param targetList Optional existing list to populate with resolved indices (will be cleared if not null) * @return A list of resolved column indices, either the provided targetList or a new ArrayList - * @throws RuntimeException If any column cannot be resolved in the specified schema + * @throws QueryExecutionException If any column cannot be resolved in the specified schema */ protected List resolveColumnIndices(List columns, String schemaId, List targetList) { @@ -192,8 +205,7 @@ protected List resolveColumnIndices(List columns, String schema Integer index = ctx.resolveColumnWithOrigins(schemaId, tableName, columnName); if (index == null) { - throw new RuntimeException("Column " + tableName + "." + columnName + - " not found in schema " + schemaId); + throw ctx.unknownColumn(tableName, columnName, schemaId); } indices.add(index); diff --git a/src/main/java/com/github/jinba1/blazedb/operator/ScanOperator.java b/src/main/java/com/github/jinba1/blazedb/operator/ScanOperator.java index 8b55792..aecb7dc 100644 --- a/src/main/java/com/github/jinba1/blazedb/operator/ScanOperator.java +++ b/src/main/java/com/github/jinba1/blazedb/operator/ScanOperator.java @@ -2,6 +2,7 @@ import com.github.jinba1.blazedb.ColumnType; import com.github.jinba1.blazedb.DBCatalog; +import com.github.jinba1.blazedb.ErrorCode; import com.github.jinba1.blazedb.IntValue; import com.github.jinba1.blazedb.PlanContext; import com.github.jinba1.blazedb.QueryExecutionException; @@ -44,6 +45,9 @@ public ScanOperator(PlanContext ctx, String tableName) { this.tableName = tableName; tablePath = DBCatalog.getInstance().getDBLocation(tableName); this.columnTypes = DBCatalog.getInstance().getColumnTypes(tableName); + if (tablePath == null || columnTypes == null) { + throw DBCatalog.getInstance().unknownTable(tableName); + } child = null; // Scan cannot have child operator this.schemaRegistered = true; @@ -66,7 +70,8 @@ private void openReader() { records.next(); // skip header row } } catch (IOException e) { - throw new QueryExecutionException("Failed to open table '" + tableName + "': " + e.getMessage()); + throw new QueryExecutionException(ErrorCode.DATA_ERROR, + "Failed to open table '" + tableName + "': " + e.getMessage()); } } @@ -83,7 +88,8 @@ public Tuple getNextTuple() { } CSVRecord record = records.next(); if (record.size() != columnTypes.size()) { - throw new QueryExecutionException("Table '" + tableName + "' row " + record.getRecordNumber() + throw new QueryExecutionException(ErrorCode.DATA_ERROR, + "Table '" + tableName + "' row " + record.getRecordNumber() + ": expected " + columnTypes.size() + " fields, found " + record.size()); } List attributes = new ArrayList<>(columnTypes.size()); @@ -93,7 +99,8 @@ public Tuple getNextTuple() { try { attributes.add(new IntValue(Integer.parseInt(field))); } catch (NumberFormatException e) { - throw new QueryExecutionException("Table '" + tableName + "' row " + record.getRecordNumber() + throw new QueryExecutionException(ErrorCode.DATA_ERROR, + "Table '" + tableName + "' row " + record.getRecordNumber() + ", column " + i + ": expected int, found '" + field + "' (file changed since catalog init?)"); } diff --git a/src/main/java/com/github/jinba1/blazedb/operator/SortOperator.java b/src/main/java/com/github/jinba1/blazedb/operator/SortOperator.java index ea03c5f..0b0489e 100644 --- a/src/main/java/com/github/jinba1/blazedb/operator/SortOperator.java +++ b/src/main/java/com/github/jinba1/blazedb/operator/SortOperator.java @@ -115,6 +115,7 @@ private void bufferTuple() { Tuple tuple = child.getNextTuple(); while (tuple != null) { + checkBudgetDeadline(); // buffering emits nothing; the timeout must still reach it tupleBuffer.add(tuple); // Only add non-null tuples tuple = child.getNextTuple(); } diff --git a/src/test/java/com/github/jinba1/blazedb/AgentLegibleErrorsTest.java b/src/test/java/com/github/jinba1/blazedb/AgentLegibleErrorsTest.java new file mode 100644 index 0000000..b159c91 --- /dev/null +++ b/src/test/java/com/github/jinba1/blazedb/AgentLegibleErrorsTest.java @@ -0,0 +1,239 @@ +package com.github.jinba1.blazedb; + +import com.github.jinba1.blazedb.operator.Operator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +/** + * Errors triggered by user SQL must be QueryExecutionExceptions with a machine-readable + * ErrorCode and a message an LLM agent can self-correct from: name the offending + * table/column AND list what is actually available. Previously these paths threw bare + * RuntimeException/UnsupportedOperationException (raw stack trace, or swallowed into a + * generic "query could not be planned"). + */ +public class AgentLegibleErrorsTest { + + @TempDir + Path tempDir; + + private Path db; + + @BeforeEach + public void setUp() throws IOException { + db = tempDir.resolve("db"); + Path data = db.resolve("data"); + Files.createDirectories(data); + Files.write(data.resolve("Student.csv"), List.of( + "sid, cid, age, grade", + "1, 101, 20, 75", + "2, 102, 21, 82")); + Files.write(data.resolve("Course.csv"), List.of( + "cid, title", + "101, algebra", + "102, calculus")); + DBCatalog.resetDBCatalog(); + DBCatalog.initDBCatalog(db.toString()); + } + + private Path query(String sql) throws IOException { + Path file = tempDir.resolve("q" + sql.hashCode() + ".sql"); + Files.writeString(file, sql); + return file; + } + + private QueryExecutionException assertPlanFails(String sql) throws IOException { + Path file = query(sql); + return assertThrows(QueryExecutionException.class, + () -> QueryPlanner.planQuery(file.toString())); + } + + private QueryExecutionException assertExecutionFails(String sql) throws IOException { + Path file = query(sql); + PlannedQuery planned = QueryPlanner.planQuery(file.toString()); + Operator root = planned.root(); + return assertThrows(QueryExecutionException.class, () -> { + while (root.getNextTuple() != null) { + // drain + } + }); + } + + // ---- column not found ---- + + @Test + public void selectUnknownColumnFailsAtPlanTimeWithAvailableList() throws IOException { + QueryExecutionException ex = assertPlanFails("SELECT Student.nam FROM Student"); + assertEquals(ErrorCode.UNKNOWN_COLUMN, ex.code()); + assertTrue(ex.getMessage().contains("nam"), ex.getMessage()); + assertTrue(ex.getMessage().contains("Available"), ex.getMessage()); + assertTrue(ex.getMessage().contains("sid"), ex.getMessage()); + assertTrue(ex.getMessage().contains("grade"), ex.getMessage()); + } + + @Test + public void whereUnknownColumnInJoinQueryFailsAtPlanTime() throws IOException { + QueryExecutionException ex = assertPlanFails( + "SELECT * FROM Student, Course WHERE Student.nam = Course.cid"); + assertEquals(ErrorCode.UNKNOWN_COLUMN, ex.code()); + // same single construction site as every other unknown-column path + assertTrue(ex.getMessage().contains("Column 'Student.nam' not found"), ex.getMessage()); + assertTrue(ex.getMessage().contains("Available"), ex.getMessage()); + assertTrue(ex.getMessage().contains("sid"), ex.getMessage()); + } + + @Test + public void whereUnknownColumnInSingleTableQueryFailsAtExecution() throws IOException { + QueryExecutionException ex = assertExecutionFails( + "SELECT * FROM Student WHERE Student.nam = 5"); + assertEquals(ErrorCode.UNKNOWN_COLUMN, ex.code()); + assertTrue(ex.getMessage().contains("nam"), ex.getMessage()); + assertTrue(ex.getMessage().contains("Available"), ex.getMessage()); + assertTrue(ex.getMessage().contains("sid"), ex.getMessage()); + } + + @Test + public void orderByUnknownColumnFailsWithAvailableList() throws IOException { + QueryExecutionException ex = assertExecutionFails( + "SELECT * FROM Student ORDER BY Student.nam"); + assertEquals(ErrorCode.UNKNOWN_COLUMN, ex.code()); + assertTrue(ex.getMessage().contains("nam"), ex.getMessage()); + assertTrue(ex.getMessage().contains("Available"), ex.getMessage()); + } + + @Test + public void groupByUnknownColumnFailsAtPlanTime() throws IOException { + QueryExecutionException ex = assertPlanFails( + "SELECT Student.nam, COUNT(*) FROM Student GROUP BY Student.nam"); + assertEquals(ErrorCode.UNKNOWN_COLUMN, ex.code()); + assertTrue(ex.getMessage().contains("nam"), ex.getMessage()); + assertTrue(ex.getMessage().contains("Available"), ex.getMessage()); + } + + // ---- table not found ---- + + @Test + public void unknownTableInFromListsAvailableTables() throws IOException { + QueryExecutionException ex = assertPlanFails("SELECT * FROM Studnt"); + assertEquals(ErrorCode.UNKNOWN_TABLE, ex.code()); + assertTrue(ex.getMessage().contains("Studnt"), ex.getMessage()); + assertTrue(ex.getMessage().contains("Available tables"), ex.getMessage()); + assertTrue(ex.getMessage().contains("Student"), ex.getMessage()); + assertTrue(ex.getMessage().contains("Course"), ex.getMessage()); + } + + @Test + public void unknownTableInWhereListsAvailableTables() throws IOException { + QueryExecutionException ex = assertPlanFails( + "SELECT * FROM Student, Course WHERE Studnt.sid = Course.cid"); + assertEquals(ErrorCode.UNKNOWN_TABLE, ex.code()); + assertTrue(ex.getMessage().contains("Studnt"), ex.getMessage()); + assertTrue(ex.getMessage().contains("Available tables"), ex.getMessage()); + } + + // ---- parse / unsupported SQL ---- + + @Test + public void syntaxErrorSurfacesParserMessage() throws IOException { + QueryExecutionException ex = assertPlanFails("SELEKT * FROM Student"); + assertEquals(ErrorCode.PARSE_ERROR, ex.code()); + assertTrue(ex.getMessage().contains("SQL syntax error"), ex.getMessage()); + } + + @Test + public void nonSelectStatementIsUnsupported() throws IOException { + QueryExecutionException ex = assertPlanFails("UPDATE Student SET age = 5"); + assertEquals(ErrorCode.UNSUPPORTED_SQL, ex.code()); + assertTrue(ex.getMessage().contains("SELECT"), ex.getMessage()); + } + + @Test + public void unsupportedConditionOperatorIsUnsupportedSql() throws IOException { + // OR is not supported by the engine's condition handling + QueryExecutionException ex = assertExecutionFails( + "SELECT * FROM Student WHERE Student.sid = 1 OR Student.sid = 2"); + assertEquals(ErrorCode.UNSUPPORTED_SQL, ex.code()); + } + + @Test + public void selectArithmeticItemIsUnsupported() throws IOException { + QueryExecutionException ex = assertPlanFails("SELECT Student.sid * 2 FROM Student"); + assertEquals(ErrorCode.UNSUPPORTED_SQL, ex.code()); + assertTrue(ex.getMessage().contains("column references"), ex.getMessage()); + } + + @Test + public void orderByArithmeticItemIsUnsupported() throws IOException { + QueryExecutionException ex = assertPlanFails( + "SELECT * FROM Student ORDER BY Student.sid * 2"); + assertEquals(ErrorCode.UNSUPPORTED_SQL, ex.code()); + assertTrue(ex.getMessage().contains("ORDER BY"), ex.getMessage()); + } + + @Test + public void rowShapeChangedSinceCatalogInitIsDataError() throws IOException { + // Catalog inferred 4 columns at init; a row appended afterwards with the + // wrong width must fail as DATA_ERROR when the scan reaches it + Files.writeString(db.resolve("data").resolve("Student.csv"), + "3, 103", java.nio.file.StandardOpenOption.APPEND); + QueryExecutionException ex = assertExecutionFails("SELECT * FROM Student"); + assertEquals(ErrorCode.DATA_ERROR, ex.code()); + assertTrue(ex.getMessage().contains("expected 4 fields"), ex.getMessage()); + } + + // ---- type errors ---- + + @Test + public void comparingIntColumnToStringIsTypeMismatch() throws IOException { + QueryExecutionException ex = assertExecutionFails( + "SELECT * FROM Student WHERE Student.sid = 'abc'"); + assertEquals(ErrorCode.TYPE_MISMATCH, ex.code()); + } + + // ---- exception API ---- + + @Test + public void singleArgConstructorDefaultsToInternal() { + assertEquals(ErrorCode.INTERNAL, new QueryExecutionException("boom").code()); + } + + @Test + public void budgetExceptionCarriesBudgetExceededCode() { + QueryBudget budget = new QueryBudget(0L, null); + QueryBudgetExceededException ex = + assertThrows(QueryBudgetExceededException.class, budget::charge); + assertEquals(ErrorCode.BUDGET_EXCEEDED, ex.code()); + } + + @Test + public void catalogExposesSortedTableNames() { + assertEquals(List.of("Course", "Student"), DBCatalog.getInstance().getTableNames()); + } + + // ---- CLI behavior ---- + + @Test + public void cliReturnsOneInsteadOfCrashingOnRuntimeColumnMiss() throws IOException { + // Pre-change this path threw a bare RuntimeException straight through run() + Path queryFile = query("SELECT * FROM Student ORDER BY Student.nam"); + Path out = tempDir.resolve("out.csv"); + int code = BlazeDB.run(new String[]{db.toString(), queryFile.toString(), out.toString()}); + assertEquals(1, code); + assertFalse(Files.exists(out), "partial output must be deleted"); + } + + @Test + public void cliReportsUnreadableQueryFile() { + Path out = tempDir.resolve("out.csv"); + int code = BlazeDB.run(new String[]{ + db.toString(), tempDir.resolve("missing.sql").toString(), out.toString()}); + assertEquals(1, code); + assertFalse(Files.exists(out)); + } +} diff --git a/src/test/java/com/github/jinba1/blazedb/ExpressionPreprocessorTest.java b/src/test/java/com/github/jinba1/blazedb/ExpressionPreprocessorTest.java index 1096364..f7e0607 100644 --- a/src/test/java/com/github/jinba1/blazedb/ExpressionPreprocessorTest.java +++ b/src/test/java/com/github/jinba1/blazedb/ExpressionPreprocessorTest.java @@ -20,6 +20,8 @@ public class ExpressionPreprocessorTest { private static final String TEST_DB_DIR = "src/test/resources/testdb_ep"; private static final String DATA_DIR = TEST_DB_DIR + "/data"; + private PlanContext ctx; + @BeforeEach public void setUp() throws IOException { Files.createDirectories(Paths.get(DATA_DIR)); @@ -36,6 +38,7 @@ public void setUp() throws IOException { DBCatalog.resetDBCatalog(); DBCatalog.initDBCatalog(TEST_DB_DIR); + ctx = new PlanContext(QueryConfig.defaults()); } @AfterEach @@ -48,7 +51,7 @@ public void tearDown() throws IOException { @Test public void testSingleTableSelectionGoesToSelectExpressions() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); Expression expr = CCJSqlParserUtil.parseExpression("T1.A = 5"); ep.evaluate(expr); @@ -62,7 +65,7 @@ public void testSingleTableSelectionGoesToSelectExpressions() throws Exception { @Test public void testJoinConditionGoesToJoinExpressions() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); Expression expr = CCJSqlParserUtil.parseExpression("T1.A = T2.C"); ep.evaluate(expr); @@ -76,7 +79,7 @@ public void testJoinConditionGoesToJoinExpressions() throws Exception { @Test public void testMixedAndExpressionCorrectlySplit() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); // T1.A = 5 AND T1.B = T2.D Expression expr = CCJSqlParserUtil.parseExpression("T1.A = 5 AND T1.B = T2.D"); @@ -91,7 +94,7 @@ public void testMixedAndExpressionCorrectlySplit() throws Exception { @Test public void testCompoundAndAllSelections() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); Expression expr = CCJSqlParserUtil.parseExpression("T1.A > 1 AND T1.B < 100"); ep.evaluate(expr); @@ -105,7 +108,7 @@ public void testCompoundAndAllSelections() throws Exception { @Test public void testCompoundAndAllJoins() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); Expression expr = CCJSqlParserUtil.parseExpression("T1.A = T2.C AND T1.B = T2.D"); ep.evaluate(expr); @@ -119,7 +122,7 @@ public void testCompoundAndAllJoins() throws Exception { @Test public void testNestedAndWithJoinAndSelection() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); // Three-way AND: two selections + one join Expression expr = CCJSqlParserUtil.parseExpression("T1.A > 1 AND T1.B = T2.D AND T2.C < 50"); @@ -134,7 +137,7 @@ public void testNestedAndWithJoinAndSelection() throws Exception { @Test public void testEvaluateThrowsOnSecondCall() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); Expression expr = CCJSqlParserUtil.parseExpression("T1.A = 5"); ep.evaluate(expr); @@ -145,7 +148,7 @@ public void testEvaluateThrowsOnSecondCall() throws Exception { @Test public void testGetJoinExpressionsThrowsBeforeEvaluate() { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); assertThrows(IllegalStateException.class, ep::getJoinExpressions, "Should throw when getJoinExpressions called before evaluate"); @@ -153,7 +156,7 @@ public void testGetJoinExpressionsThrowsBeforeEvaluate() { @Test public void testGetSelectExpressionsThrowsBeforeEvaluate() { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); assertThrows(IllegalStateException.class, ep::getSelectExpressions, "Should throw when getSelectExpressions called before evaluate"); @@ -161,27 +164,31 @@ public void testGetSelectExpressionsThrowsBeforeEvaluate() { @Test public void testUnsupportedColumnThrows() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); // Table NonExistent doesn't exist Expression expr = CCJSqlParserUtil.parseExpression("NonExistent.A = 5"); - assertThrows(UnsupportedOperationException.class, () -> ep.evaluate(expr), - "Should throw for nonexistent table"); + QueryExecutionException ex = assertThrows(QueryExecutionException.class, + () -> ep.evaluate(expr), "Should throw for nonexistent table"); + assertEquals(ErrorCode.UNKNOWN_TABLE, ex.code()); + assertTrue(ex.getMessage().contains("Available tables"), ex.getMessage()); } @Test public void testUnsupportedColumnInTableThrows() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); // Column Z doesn't exist in T1 Expression expr = CCJSqlParserUtil.parseExpression("T1.Z = 5"); - assertThrows(UnsupportedOperationException.class, () -> ep.evaluate(expr), - "Should throw for nonexistent column"); + QueryExecutionException ex = assertThrows(QueryExecutionException.class, + () -> ep.evaluate(expr), "Should throw for nonexistent column"); + assertEquals(ErrorCode.UNKNOWN_COLUMN, ex.code()); + assertTrue(ex.getMessage().contains("Available"), ex.getMessage()); } @Test public void testDifferentComparisonOperators() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); // != join Expression expr = CCJSqlParserUtil.parseExpression("T1.A != T2.C"); @@ -193,7 +200,7 @@ public void testDifferentComparisonOperators() throws Exception { @Test public void testGreaterThanJoin() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); Expression expr = CCJSqlParserUtil.parseExpression("T1.A > T2.C"); ep.evaluate(expr); @@ -203,7 +210,7 @@ public void testGreaterThanJoin() throws Exception { @Test public void testLessThanOrEqualSelection() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); Expression expr = CCJSqlParserUtil.parseExpression("T1.A <= 10"); ep.evaluate(expr); @@ -214,7 +221,7 @@ public void testLessThanOrEqualSelection() throws Exception { @Test public void testGreaterThanOrEqualJoin() throws Exception { - ExpressionPreprocessor ep = new ExpressionPreprocessor(); + ExpressionPreprocessor ep = new ExpressionPreprocessor(ctx); Expression expr = CCJSqlParserUtil.parseExpression("T1.B >= T2.D"); ep.evaluate(expr); diff --git a/src/test/java/com/github/jinba1/blazedb/QueryBudgetTest.java b/src/test/java/com/github/jinba1/blazedb/QueryBudgetTest.java index d8afe60..55b4a86 100644 --- a/src/test/java/com/github/jinba1/blazedb/QueryBudgetTest.java +++ b/src/test/java/com/github/jinba1/blazedb/QueryBudgetTest.java @@ -62,4 +62,50 @@ public void nullLimitsMeanUnlimited() { public void exceptionIsAQueryExecutionException() { assertTrue(QueryExecutionException.class.isAssignableFrom(QueryBudgetExceededException.class)); } + + @Test + public void checkDeadlineWithZeroTimeoutTripsImmediately() { + QueryBudget budget = new QueryBudget(null, 0L); + QueryBudgetExceededException ex = + assertThrows(QueryBudgetExceededException.class, budget::checkDeadline); + assertTrue(ex.getMessage().contains("Time budget exceeded"), ex.getMessage()); + } + + @Test + public void checkDeadlineStartsTheClockWhenFirst() throws InterruptedException { + // a stall before the first charge must still be covered: the check starts the clock + QueryBudget budget = new QueryBudget(null, 30L); + Thread.sleep(50); // before the first check; must not count + budget.checkDeadline(); // clock starts here, deadline 30 ms away: no throw + Thread.sleep(50); + assertThrows(QueryBudgetExceededException.class, budget::checkDeadline); + } + + @Test + public void checkDeadlineDoesNotCountTuples() { + QueryBudget budget = new QueryBudget(2L, null); + for (int i = 0; i < 100; i++) { + budget.checkDeadline(); // tuple limit must be untouched + } + budget.charge(); + budget.charge(); // exactly at limit: fine + assertEquals(2, budget.processed()); + } + + @Test + public void checkDeadlineWithNullTimeoutIsANoOp() { + QueryBudget budget = new QueryBudget(null, null); + for (int i = 0; i < 100; i++) { + budget.checkDeadline(); + } + assertEquals(0, budget.processed()); + } + + @Test + public void checkDeadlineSeesClockStartedByCharge() throws InterruptedException { + QueryBudget budget = new QueryBudget(null, 5L); + budget.charge(); // starts the clock + Thread.sleep(50); + assertThrows(QueryBudgetExceededException.class, budget::checkDeadline); + } } diff --git a/src/test/java/com/github/jinba1/blazedb/QueryResultTest.java b/src/test/java/com/github/jinba1/blazedb/QueryResultTest.java new file mode 100644 index 0000000..35e810d --- /dev/null +++ b/src/test/java/com/github/jinba1/blazedb/QueryResultTest.java @@ -0,0 +1,74 @@ +package com.github.jinba1.blazedb; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +/** + * BlazeDB.execute returns token-awareness metadata for the caller (CLI today, + * the REST gateway later): rows written, a trustworthy truncated flag, and a + * refine hint when truncated. The output file itself stays pure data. + */ +public class QueryResultTest { + + @TempDir + Path tempDir; + + private Path db; + + @BeforeEach + public void setUp() throws IOException { + db = tempDir.resolve("db"); + Path data = db.resolve("data"); + Files.createDirectories(data); + Files.write(data.resolve("T.csv"), List.of("a", "1", "2", "3")); + DBCatalog.resetDBCatalog(); + DBCatalog.initDBCatalog(db.toString()); + } + + private QueryResult execute(String sql) throws IOException { + Path file = tempDir.resolve("q.sql"); + Files.writeString(file, sql); + PlannedQuery planned = QueryPlanner.planQuery(file.toString()); + return BlazeDB.execute(planned.root(), tempDir.resolve("out.csv").toString()); + } + + @Test + public void unlimitedQueryReportsRowsAndNoTruncation() throws IOException { + QueryResult result = execute("SELECT * FROM T"); + assertEquals(3, result.rows()); + assertFalse(result.truncated()); + assertNull(result.hint()); + } + + @Test + public void truncatingLimitReportsTruncatedWithHint() throws IOException { + QueryResult result = execute("SELECT * FROM T LIMIT 2"); + assertEquals(2, result.rows()); + assertTrue(result.truncated()); + assertNotNull(result.hint()); + assertTrue(result.hint().contains("truncated"), result.hint()); + assertTrue(result.hint().contains("LIMIT"), result.hint()); + } + + @Test + public void exactLimitIsNotTruncated() throws IOException { + QueryResult result = execute("SELECT * FROM T LIMIT 3"); + assertEquals(3, result.rows()); + assertFalse(result.truncated()); + assertNull(result.hint()); + } + + @Test + public void outputFileContainsOnlyDataNoMetadata() throws IOException { + execute("SELECT * FROM T LIMIT 2"); + List lines = Files.readAllLines(tempDir.resolve("out.csv")); + assertEquals(List.of("a", "1", "2"), lines, "metadata must not pollute the result file"); + } +} diff --git a/src/test/java/com/github/jinba1/blazedb/operator/BlockingDrainTimeoutTest.java b/src/test/java/com/github/jinba1/blazedb/operator/BlockingDrainTimeoutTest.java new file mode 100644 index 0000000..b8f3908 --- /dev/null +++ b/src/test/java/com/github/jinba1/blazedb/operator/BlockingDrainTimeoutTest.java @@ -0,0 +1,161 @@ +package com.github.jinba1.blazedb.operator; + +import com.github.jinba1.blazedb.DBCatalog; +import com.github.jinba1.blazedb.IntValue; +import com.github.jinba1.blazedb.PlanContext; +import com.github.jinba1.blazedb.QueryBudget; +import com.github.jinba1.blazedb.QueryBudgetExceededException; +import com.github.jinba1.blazedb.QueryConfig; +import com.github.jinba1.blazedb.Tuple; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.schema.Column; +import net.sf.jsqlparser.schema.Table; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * The time budget must trip inside blocking build/drain phases, not just on tuple + * emission. Each test uses a stub child that never charges the budget, so with a + * zero timeout the only way the exception can fire is the deadline check inside + * the blocking loop itself; tupleCounter == 0 proves no emission happened first. + */ +public class BlockingDrainTimeoutTest { + + @TempDir + Path tempDb; + + private PlanContext ctx; + + @BeforeEach + public void setUp() throws IOException { + Path data = tempDb.resolve("data"); + Files.createDirectories(data); + Files.write(data.resolve("L.csv"), List.of("a", "1", "2", "3")); + Files.write(data.resolve("R.csv"), List.of("b", "1", "1", "2")); + DBCatalog.resetDBCatalog(); + DBCatalog.initDBCatalog(tempDb.toString()); + ctx = new PlanContext(QueryConfig.defaults()); + } + + /** Emits canned single-int tuples under a base-table schema; never charges the budget. */ + private static final class StubChild extends Operator { + private final String tableId; + private final List rows; + private int pos = 0; + + StubChild(PlanContext ctx, String tableId, int... values) { + super(ctx); + this.tableId = tableId; + this.rows = new ArrayList<>(values.length); + for (int v : values) { + rows.add(new Tuple(List.of(new IntValue(v)))); + } + this.schemaRegistered = true; + this.intermediateSchemaId = tableId; + } + + @Override + public Tuple getNextTuple() { + return pos < rows.size() ? rows.get(pos++) : null; + } + + @Override + public void reset() { + pos = 0; + } + + @Override + public String propagateSchemaId() { + return tableId; + } + + @Override + public String describe() { + return "Stub[" + tableId + "]"; + } + + @Override + protected void registerSchema() { + schemaRegistered = true; + } + } + + private static Column col(String table, String column) { + return new Column(new Table(table), column); + } + + @Test + public void sortBuildPhaseTripsTimeout() { + StubChild child = new StubChild(ctx, "L", 3, 1, 2); + SortOperator sort = new SortOperator(ctx, child, List.of(col("L", "a"))); + sort.attachBudget(new QueryBudget(null, 0L)); + + assertThrows(QueryBudgetExceededException.class, sort::getNextTuple); + assertEquals(0, sort.getTupleCount(), "must trip during buffering, before any emission"); + } + + @Test + public void aggregateBuildPhaseTripsTimeout() { + StubChild child = new StubChild(ctx, "L", 1, 1, 2); + AggregateOperator aggregate = new AggregateOperator( + ctx, child, List.of(col("L", "a")), List.of(), List.of(col("L", "a"))); + aggregate.attachBudget(new QueryBudget(null, 0L)); + + assertThrows(QueryBudgetExceededException.class, aggregate::getNextTuple); + assertEquals(0, aggregate.getTupleCount(), "must trip during accumulation, before any emission"); + } + + @Test + public void distinctBuildPhaseTripsTimeout() { + StubChild child = new StubChild(ctx, "L", 1, 1, 2); + DuplicateEliminationOperator distinct = new DuplicateEliminationOperator(ctx, child); + distinct.attachBudget(new QueryBudget(null, 0L)); + + assertThrows(QueryBudgetExceededException.class, distinct::getNextTuple); + assertEquals(0, distinct.getTupleCount(), "must trip during dedup, before any emission"); + } + + @Test + public void hashJoinBuildPhaseTripsTimeout() throws Exception { + StubChild outer = new StubChild(ctx, "L", 1); + StubChild inner = new StubChild(ctx, "R", 1, 1, 2); + HashJoinOperator join = new HashJoinOperator( + ctx, outer, inner, CCJSqlParserUtil.parseCondExpression("L.a = R.b")); + join.attachBudget(new QueryBudget(null, 0L)); + + assertThrows(QueryBudgetExceededException.class, join::getNextTuple); + assertEquals(0, join.getTupleCount(), "must trip while draining the build side"); + } + + @Test + public void hashJoinProbeBucketScanChecksDeadline() throws Exception { + // Build completes without a budget; the deadline then trips inside the + // bucket-scan loop (a skewed key's bucket can spin long between outer pulls). + StubChild outer = new StubChild(ctx, "L", 1); + StubChild inner = new StubChild(ctx, "R", 1, 1); // two build rows share key 1 + HashJoinOperator join = new HashJoinOperator( + ctx, outer, inner, CCJSqlParserUtil.parseCondExpression("L.a = R.b")); + + assertNotNull(join.getNextTuple()); // prepare() + first match; bucket still has one row + + join.attachBudget(new QueryBudget(null, 0L)); + assertThrows(QueryBudgetExceededException.class, join::getNextTuple); + } + + @Test + public void blockingOperatorsStillCompleteWithoutBudget() throws Exception { + SortOperator sort = new SortOperator( + ctx, new StubChild(ctx, "L", 3, 1, 2), List.of(col("L", "a"))); + int count = 0; + while (sort.getNextTuple() != null) count++; + assertEquals(3, count); + } +} diff --git a/src/test/java/com/github/jinba1/blazedb/operator/LimitTruncationTest.java b/src/test/java/com/github/jinba1/blazedb/operator/LimitTruncationTest.java new file mode 100644 index 0000000..84cb4d2 --- /dev/null +++ b/src/test/java/com/github/jinba1/blazedb/operator/LimitTruncationTest.java @@ -0,0 +1,164 @@ +package com.github.jinba1.blazedb.operator; + +import com.github.jinba1.blazedb.DBCatalog; +import com.github.jinba1.blazedb.ErrorCode; +import com.github.jinba1.blazedb.IntValue; +import com.github.jinba1.blazedb.PlanContext; +import com.github.jinba1.blazedb.QueryBudget; +import com.github.jinba1.blazedb.QueryConfig; +import com.github.jinba1.blazedb.QueryExecutionException; +import com.github.jinba1.blazedb.Tuple; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +/** + * LimitOperator must distinguish "truncated at N" from "exactly N rows existed": + * after emitting its cap it peeks the child once, so agents can trust the flag. + */ +public class LimitTruncationTest { + + @TempDir + Path tempDb; + + private PlanContext ctx; + + @BeforeEach + public void setUp() throws IOException { + Path data = tempDb.resolve("data"); + Files.createDirectories(data); + Files.write(data.resolve("T.csv"), List.of("a", "1", "2", "3")); + DBCatalog.resetDBCatalog(); + DBCatalog.initDBCatalog(tempDb.toString()); + ctx = new PlanContext(QueryConfig.defaults()); + } + + private static long drain(Operator op) { + long count = 0; + while (op.getNextTuple() != null) count++; + return count; + } + + @Test + public void childWithMoreRowsReportsTruncated() { + LimitOperator limit = new LimitOperator(ctx, new ScanOperator(ctx, "T"), 2); + assertEquals(2, drain(limit)); + assertTrue(limit.wasTruncated()); + } + + @Test + public void exactlyLimitRowsIsNotTruncated() { + LimitOperator limit = new LimitOperator(ctx, new ScanOperator(ctx, "T"), 3); + assertEquals(3, drain(limit)); + assertFalse(limit.wasTruncated(), "exactly-N result must not report truncation"); + } + + @Test + public void fewerThanLimitRowsIsNotTruncated() { + LimitOperator limit = new LimitOperator(ctx, new ScanOperator(ctx, "T"), 10); + assertEquals(3, drain(limit)); + assertFalse(limit.wasTruncated()); + } + + @Test + public void truncationUnknownBeforeDrainCompletes() { + LimitOperator limit = new LimitOperator(ctx, new ScanOperator(ctx, "T"), 2); + assertNotNull(limit.getNextTuple()); + assertFalse(limit.wasTruncated(), "no claim before the peek has run"); + } + + @Test + public void resetClearsPeekState() { + LimitOperator limit = new LimitOperator(ctx, new ScanOperator(ctx, "T"), 2); + drain(limit); + assertTrue(limit.wasTruncated()); + + limit.reset(); + assertFalse(limit.wasTruncated()); + assertEquals(2, drain(limit)); + assertTrue(limit.wasTruncated()); + } + + @Test + public void dataErrorDuringPeekMeansTruncatedNotKilled() throws IOException { + // Rows past the cap are not part of the answer; a malformed row there must + // not fail the already-complete capped result. Corrupt row 3 after catalog + // init (column already inferred as int) so only the peek's read trips. + Files.writeString(tempDb.resolve("data").resolve("T.csv"), "a\n1\n2\nnot-an-int\n"); + LimitOperator limit = new LimitOperator(ctx, new ScanOperator(ctx, "T"), 2); + + assertEquals(2, drain(limit)); + assertTrue(limit.wasTruncated()); + } + + /** Emits two tuples, then throws an INTERNAL error — simulates an engine bug. */ + private static final class FailsAfterTwo extends Operator { + private int served = 0; + + FailsAfterTwo(PlanContext ctx) { + super(ctx); + this.schemaRegistered = true; + this.intermediateSchemaId = "T"; + } + + @Override + public Tuple getNextTuple() { + if (served < 2) { + served++; + return new Tuple(List.of(new IntValue(served))); + } + throw new QueryExecutionException(ErrorCode.INTERNAL, "invariant broke"); + } + + @Override + public void reset() { + served = 0; + } + + @Override + public String propagateSchemaId() { + return "T"; + } + + @Override + public String describe() { + return "FailsAfterTwo"; + } + + @Override + protected void registerSchema() { + schemaRegistered = true; + } + } + + @Test + public void internalErrorDuringPeekPropagates() { + // User-facing failures past the cap read as truncated; engine bugs must not + // be masked as truncation metadata. + LimitOperator limit = new LimitOperator(ctx, new FailsAfterTwo(ctx), 2); + assertNotNull(limit.getNextTuple()); + assertNotNull(limit.getNextTuple()); + + QueryExecutionException ex = + assertThrows(QueryExecutionException.class, limit::getNextTuple); + assertEquals(ErrorCode.INTERNAL, ex.code()); + } + + @Test + public void budgetExhaustionDuringPeekMeansTruncatedNotKilled() { + // 3-row scan, LIMIT 2: emitting both rows costs 4 charges (2 scan + 2 limit); + // the peek's third scan row is charge 5 and trips the budget. The query already + // produced its complete capped result — the probe must not kill it. + LimitOperator limit = new LimitOperator(ctx, new ScanOperator(ctx, "T"), 2); + limit.attachBudget(new QueryBudget(4L, null)); + + assertEquals(2, drain(limit)); + assertTrue(limit.wasTruncated()); + } +}