From 5c8965872146fddbf29ee733a63cbc47fd7f29ba Mon Sep 17 00:00:00 2001 From: Sarah Asad Date: Sun, 3 May 2026 23:13:10 -0700 Subject: [PATCH 01/19] Files added --- .../pythonvirtualenvironment/PveManager.scala | 132 ++++++++++++++-- .../PveResource.scala | 9 +- .../PveWebsocketResource.scala | 36 ++++- .../computing-unit-selection.component.html | 84 +++++++++- .../computing-unit-selection.component.ts | 148 ++++++++++++------ .../virtual-environment.service.ts | 9 +- frontend/src/styles.scss | 10 +- 7 files changed, 352 insertions(+), 76 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala index 0399e386ba7..3231565e0ff 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala @@ -32,7 +32,8 @@ import org.apache.texera.amber.config.PythonUtils * for each Computing Unit * * It supports: - * - Creating and initializing isolated Python environments + * - Creating and initializing isolated Python environments (with system packages) + * - installing user defined packages * - Streaming pip output logs back to the caller * * Each PVE is stored under: @@ -41,6 +42,11 @@ import org.apache.texera.amber.config.PythonUtils object PveManager { + case class PvePackageResponse( + pveName: String, + userPackages: Seq[String] + ) + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") private def cuidDir(cuid: Int, pveName: String): Path = { @@ -121,18 +127,6 @@ object PveManager { return } - if (!Files.exists(requirementsPath)) { - queue.put(s"[PVE][ERR] requirements.txt not found at ${requirementsPath.toAbsolutePath}") - return - } - - if (!Files.exists(operatorRequirementsPath)) { - queue.put( - s"[PVE][ERR] operator-requirements.txt not found at ${operatorRequirementsPath.toAbsolutePath}" - ) - return - } - queue.put( s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath} and ${operatorRequirementsPath.toAbsolutePath}" ) @@ -170,7 +164,8 @@ object PveManager { queue.put(s"[PVE] Created new environment for cuid = $cuid") } - def getEnvironments(cuid: Int): List[String] = { + // returns list of PVE names and corresponding user packages for a given CU + def getEnvironments(cuid: Int): List[PvePackageResponse] = { val cuPath = VenvRoot.resolve(cuid.toString) @@ -185,7 +180,27 @@ object PveManager { .iterator() .asScala .filter(path => Files.isDirectory(path)) - .map(path => path.getFileName.toString) + .map { path => + val pveName = path.getFileName.toString + val metadataPath = path.resolve("user-packages.txt") + + val userPackages = + if (Files.exists(metadataPath)) { + Files + .readAllLines(metadataPath) + .asScala + .map(_.trim) + .filter(_.nonEmpty) + .toSeq + } else { + Seq() + } + + PvePackageResponse( + pveName = pveName, + userPackages = userPackages + ) + } .toList } finally { stream.close() @@ -212,4 +227,91 @@ object PveManager { stream.close() } } + + /** + * Installs user requested Python packages into the PVE. + * + * 1. Executes pip install for each package + * 2. Updates user metadata file + * 3. Streams logs back via queue + */ + def installUserPackages( + packages: List[String], + cuid: Int, + queue: BlockingQueue[String], + pveName: String + ): Unit = { + + val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString + val envVars = pipEnv + + if (!Files.exists(Paths.get(python))) { + queue.put(s"[PVE][ERR] Python executable not found for PVE: $python") + return + } + + val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt") + Files.createDirectories(metadataPath.getParent) + + var installedPackages = + if (Files.exists(metadataPath)) { + Files + .readAllLines(metadataPath) + .asScala + .map(_.trim) + .filter(_.nonEmpty) + .toSet + } else { + Set[String]() + } + + packages.foreach { pkg => + val trimmedPkg = pkg.trim + + if (trimmedPkg.nonEmpty) { + queue.put(s"[PVE] Installing package: $trimmedPkg") + + val code = Process( + Seq( + python, + "-u", + "-m", + "pip", + "install", + "--progress-bar", + "off", + "--no-input", + trimmedPkg + ), + None, + envVars.toSeq: _* + ).!( + ProcessLogger( + out => queue.put(s"[pip] $out"), + err => queue.put(s"[pip][ERR] $err") + ) + ) + + queue.put(s"[pip] install($trimmedPkg) finished with exit code $code") + + if (code != 0) { + queue.put(s"[PVE][ERR] Failed to install package: $trimmedPkg") + return + } + + installedPackages = installedPackages + trimmedPkg + + Files.write( + metadataPath, + installedPackages.toSeq.sorted.asJava + ) + } + } + + queue.put("[PVE] Final user package list:") + + installedPackages.toSeq.sorted.foreach { pkg => + queue.put(s"[user-package] $pkg") + } + } } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala index 1040fd64ea4..0a058ed6f5c 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala @@ -28,7 +28,7 @@ import java.util @Consumes(Array(MediaType.APPLICATION_JSON)) class PveResource { // -------------------------------------------------- - // Get installed packages + // Get system packages // -------------------------------------------------- @GET @Path("/system") @@ -45,7 +45,7 @@ class PveResource { } // -------------------------------------------------- - // Fetch PVEs + // Fetch PVEs and Installed User Packages // -------------------------------------------------- @GET @Path("/pves") @@ -54,9 +54,10 @@ class PveResource { try { PveManager .getEnvironments(cuid) - .map { pveName => + .map { pve => Map( - "pveName" -> pveName.asInstanceOf[Object] + "pveName" -> pve.pveName.asInstanceOf[Object], + "userPackages" -> pve.userPackages.asJava.asInstanceOf[Object] ).asJava } .asJava diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala index b93d1bfde03..48931bf37c3 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala @@ -26,9 +26,9 @@ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global /** - * WebSocket endpoint for PVE creation that streams pip installation logs - * to the frontend in real time. The environment setup runs asynchronously, - * and output is pushed to the client until completion. + * WebSocket endpoint for PVE creation and user pacakge installation that streams + * pip installation logs to the frontend in real time. The environment setup runs + * asynchronously, and output is pushed to the client until completion. */ @ServerEndpoint("/wsapi/pve") @@ -42,12 +42,33 @@ class PveWebsocketResource { val cuid = params.get("cuid").get(0).toInt val pveName = params.get("pveName").get(0) val isLocal = params.get("isLocal").get(0).toBoolean + val action = params.getOrDefault("action", java.util.List.of("create")).get(0) val queue = new LinkedBlockingQueue[String]() Future { try { - PveManager.createNewPve(cuid, queue, pveName, isLocal) + action match { + case "create" => + PveManager.createNewPve(cuid, queue, pveName, isLocal) + + case "install" => + val packages = + params + .getOrDefault("packages", java.util.List.of("[]")) + .get(0) + .stripPrefix("[") + .stripSuffix("]") + .split(",") + .toList + .map(_.replace("\"", "").trim) + .filter(_.nonEmpty) + + PveManager.installUserPackages(packages, cuid, queue, pveName) + + case _ => + queue.put(s"[ERR] Unknown action: $action") + } } catch { case e: Exception => queue.put(s"[ERR] ${e.getMessage}") @@ -60,11 +81,10 @@ class PveWebsocketResource { var done = false while (!done && session.isOpen) { - val line = queue.take() - - session.getBasicRemote.sendText(line) + val msg = queue.take() + session.getBasicRemote.sendText(msg) - if (line == "__DONE__") { + if (msg == "__DONE__") { done = true session.close() } diff --git a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html index b742c71581c..1a0dc7ef339 100644 --- a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -480,7 +480,7 @@
-
+
+ + +
+
+
+ +
+ +
+ +
+ +
+ +
+
+
+ + +
+
+
+ + +
+ +
+ + + + + + +
+ +
+ + +
+
+
+ +
+ +
+ -
-
-
- - -
+
+
+
Package
+ +
Version
+
-
- - - - - - -
+
+
+
+ +
-
- - +
+ + + + + +
+ +
+ +
diff --git a/frontend/src/styles.scss b/frontend/src/styles.scss index 7609fa8b955..27b2a5362a2 100644 --- a/frontend/src/styles.scss +++ b/frontend/src/styles.scss @@ -369,6 +369,24 @@ hr { background: transparent; } + .user-package-header-row .package-column-label { + font-weight: 600; + } + + .new-packages-section { + display: flex; + flex-direction: column; + gap: 2px; + } + + .user-package-header-row { + display: grid; + grid-template-columns: 1fr 160px 1fr; + gap: 14px; + margin-bottom: 0; + padding: 0; + } + .system-header { display: flex; flex-direction: column; From b1b16395ecd23aa403788a419a3db06cb6594e0b Mon Sep 17 00:00:00 2001 From: Sarah Asad Date: Fri, 8 May 2026 08:43:28 -0700 Subject: [PATCH 16/19] require op and version --- .../computing-unit-selection.component.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts index 6cd9a37e29b..969711863ff 100644 --- a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts +++ b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts @@ -974,6 +974,15 @@ export class ComputingUnitSelectionComponent implements OnInit { private installUserPackages(index: number): void { const env = this.pves[index]; + const missingVersionPackage = env.newPackages?.find( + pkg => pkg.name?.trim() && (!pkg.versionOp?.trim() || !pkg.version?.trim()) + ); + + if (missingVersionPackage) { + this.notificationService.error("Please specify an operator and version for each package."); + return; + } + const systemPackageNames = new Set(this.systemPackages.map(pkg => pkg.name.trim().toLowerCase())); const userPackageNames = new Set(env.userPackages.map(pkg => pkg.name.trim().toLowerCase())); @@ -987,18 +996,18 @@ export class ComputingUnitSelectionComponent implements OnInit { const packageName = pkg.name.trim().toLowerCase(); if (systemPackageNames.has(packageName)) { - this.notificationService.error(`Skipped ${pkg.name}: already installed as a system package.`) + this.notificationService.error(`Skipped ${pkg.name}: already installed as a system package.`); return false; } if (userPackageNames.has(packageName)) { - this.notificationService.error(`Skipped ${pkg.name}: already installed in this environment.`) + this.notificationService.error(`Skipped ${pkg.name}: already installed in this environment.`); return false; } return true; }) - .map(pkg => `${pkg.name.trim()}${pkg.version ? `==${pkg.version.trim()}` : ""}`) ?? []; + .map(pkg => `${pkg.name.trim()}${pkg.versionOp}${(pkg.version ?? "").trim()}`) ?? []; if (skippedMessages.length > 0) { this.pves[index].pipOutput = `${this.pves[index].pipOutput ?? ""}` + skippedMessages.join("\n") + "\n"; From a71c4d4277c1a6a86933f865eb3d007df1aee3f9 Mon Sep 17 00:00:00 2001 From: Sarah Asad Date: Fri, 8 May 2026 12:48:12 -0700 Subject: [PATCH 17/19] update comment --- .../web/resource/pythonvirtualenvironment/PveManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala index 41c728c4591..140cdbd1daf 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala @@ -117,7 +117,7 @@ object PveManager { queue.put(s"[PVE] Creating new PVE for cuid: $cuid with name: $pveName") // NOTE: These paths are derived from computing-unit-master.dockerfile. - // If requirements.txt or operator-requirements.txt locations change, update these paths. + // If requirements.txt location changes, update these paths. val requirementsPath = if (isLocal) Paths.get("amber", "requirements.txt") else Paths.get("/tmp", "requirements.txt") From a09b091739c6d3d64d55fe3440a881488f689c6a Mon Sep 17 00:00:00 2001 From: Sarah Asad Date: Fri, 8 May 2026 13:25:11 -0700 Subject: [PATCH 18/19] Added back op reqs --- .../pythonvirtualenvironment/PveManager.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala index 140cdbd1daf..22ca792a1fe 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala @@ -117,12 +117,16 @@ object PveManager { queue.put(s"[PVE] Creating new PVE for cuid: $cuid with name: $pveName") // NOTE: These paths are derived from computing-unit-master.dockerfile. - // If requirements.txt location changes, update these paths. + // If requirements.txt or operator-requirements.txt locations change, update these paths. val requirementsPath = if (isLocal) Paths.get("amber", "requirements.txt") else Paths.get("/tmp", "requirements.txt") - if (!Files.exists(requirementsPath)) { + val operatorRequirementsPath = + if (isLocal) Paths.get("amber", "operator-requirements.txt") + else Paths.get("/tmp", "operator-requirements.txt") + + if (!Files.exists(requirementsPath) || !Files.exists(operatorRequirementsPath)) { queue.put(s"[PVE][ERR] System requirements not found") return } @@ -156,7 +160,9 @@ object PveManager { python, Seq( "-r", - requirementsPath.toString + requirementsPath.toString, + "-r", + operatorRequirementsPath.toString ), queue ) From ef49f595e8b8fe7eadf900f6e105eb16257073b0 Mon Sep 17 00:00:00 2001 From: Sarah Asad Date: Fri, 8 May 2026 13:27:34 -0700 Subject: [PATCH 19/19] Added back op reqs --- .../web/resource/pythonvirtualenvironment/PveManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala index 22ca792a1fe..bf88fa6112c 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala @@ -153,7 +153,7 @@ object PveManager { } queue.put( - s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath}" + s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath} and operator requirements from ${operatorRequirementsPath.toAbsolutePath}" ) val installReqCode = runPipInstall(