Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,29 @@ import {
postToSiteApi,
} from "./reporters/site-api.ts";
import { formatCost, queryPreview } from "./reporters/github/github.ts";
import { DEFAULT_CONFIG, type AnalyzerConfig } from "./config.ts";
import { ApiClient, hookUpApiReporter } from "./remote/api-client.ts";
import { DEFAULT_CONFIG } from "./config.ts";
import { ApiClient } from "./remote/api-client.ts";
import { Remote } from "./remote/remote.ts";
import { ConnectionManager } from "./sync/connection-manager.ts";
import type { RpcStub } from "capnweb";
import type { ServerApi } from "@query-doctor/core";

const INVALID_TOKEN_ERROR = "Unauthorized"
import { PgbadgerSource } from "./sql/pgbadger.ts";
import type { RecentQuerySource } from "./sql/recent-query.ts";

async function runInCI(
targetPostgresUrl: Connectable,
sourcePostgresUrl: Connectable,
logPath: string,
logPath: string | undefined,
maxCost?: number,
) {
const siteApiEndpoint = env.SITE_API_ENDPOINT;
const repo = env.GITHUB_REPOSITORY;
const branch =
process.env.GITHUB_HEAD_REF || process.env.GITHUB_REF_NAME || "";

const remoteDbManager = ConnectionManager.forRemoteDatabase()
const remote = new Remote(
targetPostgresUrl,
ConnectionManager.forLocalDatabase(),
ConnectionManager.forRemoteDatabase(),
remoteDbManager,
{ disableQueryLoader: true },
);

Expand All @@ -55,10 +54,14 @@ async function runInCI(
)
: DEFAULT_CONFIG;

const source: RecentQuerySource = logPath
? new PgbadgerSource(logPath)
: remoteDbManager.getConnectorFor(sourcePostgresUrl);

const runner = await Runner.build({
targetPostgresUrl,
sourcePostgresUrl,
logPath,
source,
maxCost,
ignoredQueryHashes: config.ignoredQueryHashes,
remote,
Expand Down Expand Up @@ -202,10 +205,6 @@ async function main() {
core.setFailed("POSTGRES_URL environment variable is not set");
process.exit(1);
}
if (!env.LOG_PATH) {
core.setFailed("LOG_PATH environment variable is not set");
process.exit(1);
}
await runInCI(
Connectable.fromString(env.POSTGRES_URL),
Connectable.fromString(env.SOURCE_DATABASE_URL),
Expand Down
67 changes: 67 additions & 0 deletions src/runner.ci.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { test, expect } from "vitest";
import { PostgreSqlContainer } from "@testcontainers/postgresql";
import { testSpawnTarget } from "./remote/remote.test.ts";
import { Connectable } from "./sync/connectable.ts";
import { ConnectionManager } from "./sync/connection-manager.ts";
import { Remote } from "./remote/remote.ts";
import { Runner } from "./runner.ts";
import { DEFAULT_CONFIG } from "./config.ts";

test("CI mode runs end-to-end against a source db with pg_stat_statements", async () => {
const [sourceDb, targetDb] = await Promise.all([
new PostgreSqlContainer("postgres:17")
.withCopyContentToContainer([
{
content: `
create extension pg_stat_statements;
create table testing(a int, b text);
insert into testing (a, b) values (1, 'hello');
create index testing_b_idx on testing(b);
select * from testing where a = 10;
select * from testing where b = 'c';
`,
target: "/docker-entrypoint-initdb.d/init.sql",
},
])
.withCommand(["-c", "shared_preload_libraries=pg_stat_statements"])
.start(),
testSpawnTarget(),
]);

try {
const sourcePostgresUrl = Connectable.fromString(sourceDb.getConnectionUri());
const targetPostgresUrl = Connectable.fromString(targetDb.getConnectionUri());

const remote = new Remote(
targetPostgresUrl,
ConnectionManager.forLocalDatabase(),
ConnectionManager.forRemoteDatabase(),
{ disableQueryLoader: true },
);

const sourceManager = ConnectionManager.forRemoteDatabase();
const source = sourceManager.getConnectorFor(sourcePostgresUrl);

const runner = await Runner.build({
targetPostgresUrl,
sourcePostgresUrl,
source,
remote,
});

try {
const { reportContext, allResults } = await runner.run(DEFAULT_CONFIG);

expect(reportContext.queryStats.matched).toBeGreaterThan(0);
expect(allResults.some((q) => q.query.toLowerCase().includes("testing")))
.toBe(true);
expect(reportContext.metadata.logSize).toBe(-1);
expect(reportContext.error).toBeUndefined();
} finally {
await runner.close();
await sourceManager.closeAll();
}
} finally {
await Promise.all([sourceDb.stop(), targetDb.stop()]);
}
});
105 changes: 17 additions & 88 deletions src/runner.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import * as core from "@actions/core";
import csv from "fast-csv";
import { statSync } from "node:fs";
import { spawn } from "node:child_process";
import { fingerprint } from "@libpg-query/parser";
import { preprocessEncodedJson } from "./sql/json.ts";
import { ExplainedLog } from "./sql/pg_log.ts";
import { PgbadgerSource } from "./sql/pgbadger.ts";
import type { RecentQuerySource } from "./sql/recent-query.ts";
import { GithubReporter } from "./reporters/github/github.ts";
import {
deriveIndexStatistics,
Expand All @@ -17,8 +13,6 @@ import { env } from "./env.ts";
import { Connectable } from "./sync/connectable.ts";
import { Remote, StatisticsStrategy } from "./remote/remote.ts";
import { ConnectionManager } from "./sync/connection-manager.ts";
import { RecentQuery } from "./sql/recent-query.ts";
import { QueryHash } from "./sql/recent-query.ts";
import type { OptimizedQuery } from "./sql/recent-query.ts";
import { ExportedStats } from "@query-doctor/core";
import { readFile } from "node:fs/promises";
Expand All @@ -27,7 +21,7 @@ import { buildQueries } from "./reporters/site-api.ts";
export class Runner {
constructor(
private readonly remote: Remote,
private readonly logPath: string,
private readonly source: RecentQuerySource,
private readonly maxCost?: number,
private readonly ignoredQueryHashes: Set<string> = new Set(),
) { }
Expand All @@ -37,15 +31,14 @@ export class Runner {
sourcePostgresUrl: Connectable;
statisticsPath?: string;
maxCost?: number;
logPath: string;
source: RecentQuerySource;
ignoredQueryHashes?: string[];
remote?: Remote;
}) {
const remote = options.remote ?? new Remote(
options.targetPostgresUrl,
ConnectionManager.forLocalDatabase(),
ConnectionManager.forRemoteDatabase(),
// queries are already sourced from logs
{ disableQueryLoader: true }
);
await remote.syncFrom(options.sourcePostgresUrl,
Expand All @@ -54,7 +47,7 @@ export class Runner {
await remote.optimizer.finish;
return new Runner(
remote,
options.logPath,
options.source,
options.maxCost,
new Set(options.ignoredQueryHashes ?? []),
);
Expand Down Expand Up @@ -92,82 +85,15 @@ export class Runner {

async run(config: AnalyzerConfig = DEFAULT_CONFIG) {
const startDate = new Date();
const logSize = statSync(this.logPath).size;
console.log(`logPath=${this.logPath},fileSize=${logSize}`);
const args = [
"--dump-raw-csv",
"--no-progressbar",
"-f",
"stderr",
this.logPath,
];
console.log(`pgbadger ${args.join(" ")}`);
const child = spawn("pgbadger", args, {
stdio: ["ignore", "pipe", "pipe"],
});
child.stderr!.pipe(process.stderr);
let error: Error | undefined;
const stream = csv
.parseStream(child.stdout!, {
headers: false,
})
.on("error", (err) => {
console.error("Got a pgbadger error", err);
error = err;
});

console.time("total");
const recentQueries: RecentQuery[] = [];
for await (const chunk of stream) {
const [
_timestamp,
_username,
_dbname,
_pid,
_client,
_sessionid,
loglevel,
_sqlstate,
_duration,
queryString,
_parameters,
_appname,
_backendtype,
_queryid,
] = chunk as string[];
if (loglevel !== "LOG" || !queryString.startsWith("plan:")) {
continue;
}
const planString: string = queryString.split("plan:")[1].trim();
const json = preprocessEncodedJson(planString);
if (!json) {
console.log("Skipping LOG that is not JSON", queryString);
continue;
}
let parsed: ExplainedLog;
try {
parsed = ExplainedLog.fromLog(json);
} catch (e) {
console.log(e);
console.log(
"Log line that looked like valid auto_explain was not valid json?",
);
continue;
}

const query = parsed.query;
const hash = QueryHash.parse(await fingerprint(query));
if (this.ignoredQueryHashes.has(hash)) {
continue;
}
if (parsed.isIntrospection) {
continue;
}

const recentQuery = await RecentQuery.fromLogEntry(query, hash);
recentQueries.push(recentQuery)
}
console.log("Finished pgbadger stream");
const recentQueries = await this.source.getRecentQueries();
const error = this.source instanceof PgbadgerSource
? this.source.streamError
: undefined;
const totalRows = this.source instanceof PgbadgerSource
? this.source.totalRows
: recentQueries.length;
await this.remote.optimizer.addQueries(recentQueries);

await this.remote.optimizer.finish;
Expand All @@ -184,7 +110,7 @@ export class Runner {
});

console.log(
`Matched ${this.remote.optimizer.validQueriesProcessed} unique queries out of ${recentQueries.length} log entries`,
`Matched ${this.remote.optimizer.validQueriesProcessed} unique queries out of ${totalRows} entries`,
);

const recommendations: ReportIndexRecommendation[] = [];
Expand Down Expand Up @@ -266,7 +192,10 @@ export class Runner {
}),
statistics,
error,
metadata: { logSize, timeElapsed },
metadata: {
logSize: this.source instanceof PgbadgerSource ? this.source.logSize : -1,
timeElapsed,
},
};
console.timeEnd("total");
return { reportContext, allResults };
Expand Down
84 changes: 0 additions & 84 deletions src/sql/pg_log.ts

This file was deleted.

Loading
Loading