Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5c89658
Files added
SarahAsad23 May 4, 2026
c1a9574
test added
SarahAsad23 May 4, 2026
04c9c2c
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 4, 2026
6d11ab4
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 4, 2026
60257bb
fix comment
SarahAsad23 May 4, 2026
2845f67
remove redundancy
SarahAsad23 May 4, 2026
315b1f7
rename PackageRow
SarahAsad23 May 4, 2026
a65ef35
rename pveWebsocket
SarahAsad23 May 4, 2026
3050238
reduce user package row spacing
SarahAsad23 May 4, 2026
43afe3c
disable installing duplicate and system packages
SarahAsad23 May 4, 2026
f4ed6cc
added helper
SarahAsad23 May 4, 2026
6e78110
code refactor
SarahAsad23 May 4, 2026
e1068a0
formatting
SarahAsad23 May 4, 2026
c948a95
Merge branch 'pve-add-user-packages' of https://github.com/SarahAsad2…
SarahAsad23 May 4, 2026
9a977ad
cleanup
SarahAsad23 May 4, 2026
722cc2b
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 4, 2026
c51f962
revert
SarahAsad23 May 4, 2026
a3a2be8
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 5, 2026
6ed49f1
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 5, 2026
41e9642
rename operator
SarahAsad23 May 5, 2026
acd7f71
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 8, 2026
47c0859
Merge branch 'pve-add-user-packages' of https://github.com/SarahAsad2…
SarahAsad23 May 8, 2026
f915509
remove redundant headers
SarahAsad23 May 8, 2026
b1b1639
require op and version
SarahAsad23 May 8, 2026
2a7db5d
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.texera.amber.config.PythonUtils
* for each Computing Unit
*
* It supports:
* - Creating and initializing isolated Python environments
* - Creating and initializing isolated Python environments (with system packages)
* - Installing user defined packages
* - Streaming pip output logs back to the caller
*
* Each PVE is stored under:
Expand All @@ -41,6 +42,11 @@ import org.apache.texera.amber.config.PythonUtils

object PveManager {

case class PvePackageResponse(
pveName: String,
userPackages: Seq[String]
)

private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")

private def cuidDir(cuid: Int, pveName: String): Path = {
Expand All @@ -66,6 +72,32 @@ object PveManager {
Process(Seq(python, "-m", "pip", "freeze")).!!.split("\n").map(_.trim).filter(_.nonEmpty).toSeq
}

private def runPipInstall(
python: String,
args: Seq[String],
queue: BlockingQueue[String]
): Int = {
Process(
Seq(
python,
"-u",
"-m",
"pip",
"install",
"--progress-bar",
"off",
"--no-input"
) ++ args,
None,
pipEnv.toSeq: _*
).!(
ProcessLogger(
out => queue.put(s"[pip] $out"),
err => queue.put(s"[pip][ERR] $err")
)
)
}

/**
* Creates a new PVE for a CU.
*
Expand All @@ -90,18 +122,13 @@ object PveManager {
if (isLocal) Paths.get("amber", "requirements.txt")
else Paths.get("/tmp", "requirements.txt")

val operatorRequirementsPath =
if (isLocal) Paths.get("amber", "operator-requirements.txt")
else Paths.get("/tmp", "operator-requirements.txt")

if (!Files.exists(requirementsPath) || !Files.exists(operatorRequirementsPath)) {
if (!Files.exists(requirementsPath)) {
queue.put(s"[PVE][ERR] System requirements not found")
return
}

val venvDirPath = pveDir(cuid, pveName).toAbsolutePath
val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
val envVars = pipEnv

val createVenvPython = PythonUtils.getPythonExecutable

Expand All @@ -121,43 +148,17 @@ object PveManager {
return
}

if (!Files.exists(requirementsPath)) {
queue.put(s"[PVE][ERR] requirements.txt not found at ${requirementsPath.toAbsolutePath}")
return
}

if (!Files.exists(operatorRequirementsPath)) {
queue.put(
s"[PVE][ERR] operator-requirements.txt not found at ${operatorRequirementsPath.toAbsolutePath}"
)
return
}

Comment thread
kunwp1 marked this conversation as resolved.
queue.put(
s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath} and ${operatorRequirementsPath.toAbsolutePath}"
s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath}"
)

val installReqCode = Process(
val installReqCode = runPipInstall(
python,
Seq(
python,
"-u",
"-m",
"pip",
"install",
"--progress-bar",
"off",
"-r",
requirementsPath.toString,
"-r",
operatorRequirementsPath.toString
requirementsPath.toString
),
None,
envVars.toSeq: _*
).!(
ProcessLogger(
out => queue.put(s"[pip] $out"),
err => queue.put(s"[pip][ERR] $err")
)
queue
)

queue.put(s"[PVE] requirements install finished with exit code $installReqCode")
Expand All @@ -170,7 +171,8 @@ object PveManager {
queue.put(s"[PVE] Created new environment for cuid = $cuid")
}

def getEnvironments(cuid: Int): List[String] = {
// returns list of PVE names and corresponding user packages for a given CU
def getEnvironments(cuid: Int): List[PvePackageResponse] = {

val cuPath = VenvRoot.resolve(cuid.toString)

Expand All @@ -185,7 +187,27 @@ object PveManager {
.iterator()
.asScala
.filter(path => Files.isDirectory(path))
.map(path => path.getFileName.toString)
.map { path =>
val pveName = path.getFileName.toString
val metadataPath = path.resolve("user-packages.txt")

val userPackages =
if (Files.exists(metadataPath)) {
Files
.readAllLines(metadataPath)
.asScala
.map(_.trim)
.filter(_.nonEmpty)
.toSeq
} else {
Seq()
}

PvePackageResponse(
pveName = pveName,
userPackages = userPackages
)
}
.toList
} finally {
stream.close()
Expand All @@ -212,4 +234,75 @@ object PveManager {
stream.close()
}
}

/**
* Installs user requested Python packages into the PVE.
*
* 1. Executes pip install for each package
* 2. Updates user metadata file
* 3. Streams logs back via queue
*/
def installUserPackages(
packages: List[String],
cuid: Int,
queue: BlockingQueue[String],
pveName: String
): Unit = {

val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
val envVars = pipEnv

if (!Files.exists(Paths.get(python))) {
queue.put(s"[PVE][ERR] Python executable not found for PVE: $python")
return
}

val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt")

var installedPackages =
if (Files.exists(metadataPath)) {
Files
.readAllLines(metadataPath)
.asScala
.map(_.trim)
.filter(_.nonEmpty)
.toSet
} else {
Set[String]()
}

packages.foreach { pkg =>
val trimmedPkg = pkg.trim

if (trimmedPkg.nonEmpty) {
queue.put(s"[PVE] Installing package: $trimmedPkg")

val code = runPipInstall(
python,
Seq(trimmedPkg),
queue
)

queue.put(s"[pip] install($trimmedPkg) finished with exit code $code")

if (code != 0) {
queue.put(s"[PVE][ERR] Failed to install package: $trimmedPkg")
return
}

installedPackages = installedPackages + trimmedPkg
Comment thread
SarahAsad23 marked this conversation as resolved.

Files.write(
metadataPath,
installedPackages.toSeq.sorted.asJava
)
}
}

queue.put("[PVE] Final user package list:")

installedPackages.toSeq.sorted.foreach { pkg =>
queue.put(s"[user-package] $pkg")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.util
@Consumes(Array(MediaType.APPLICATION_JSON))
class PveResource {
// --------------------------------------------------
// Get installed packages
// Get system packages
// --------------------------------------------------
@GET
@Path("/system")
Expand All @@ -45,7 +45,7 @@ class PveResource {
}

// --------------------------------------------------
// Fetch PVEs
// Fetch PVEs and Installed User Packages
// --------------------------------------------------
@GET
@Path("/pves")
Expand All @@ -54,9 +54,10 @@ class PveResource {
try {
PveManager
.getEnvironments(cuid)
.map { pveName =>
.map { pve =>
Map(
"pveName" -> pveName.asInstanceOf[Object]
"pveName" -> pve.pveName.asInstanceOf[Object],
"userPackages" -> pve.userPackages.asJava.asInstanceOf[Object]
).asJava
}
.asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

/**
* WebSocket endpoint for PVE creation that streams pip installation logs
* to the frontend in real time. The environment setup runs asynchronously,
* and output is pushed to the client until completion.
* WebSocket endpoint for PVE creation and user pacakge installation that streams
* pip installation logs to the frontend in real time. The environment setup runs
* asynchronously, and output is pushed to the client until completion.
*/

@ServerEndpoint("/wsapi/pve")
Expand All @@ -42,12 +42,33 @@ class PveWebsocketResource {
val cuid = params.get("cuid").get(0).toInt
val pveName = params.get("pveName").get(0)
val isLocal = params.get("isLocal").get(0).toBoolean
val action = params.getOrDefault("action", java.util.List.of("create")).get(0)

val queue = new LinkedBlockingQueue[String]()

Future {
try {
PveManager.createNewPve(cuid, queue, pveName, isLocal)
action match {
case "create" =>
PveManager.createNewPve(cuid, queue, pveName, isLocal)

case "install" =>
val packages =
params
.getOrDefault("packages", java.util.List.of("[]"))
.get(0)
.stripPrefix("[")
.stripSuffix("]")
.split(",")
.toList
.map(_.replace("\"", "").trim)
.filter(_.nonEmpty)

PveManager.installUserPackages(packages, cuid, queue, pveName)

case _ =>
queue.put(s"[ERR] Unknown action: $action")
}
} catch {
case e: Exception =>
queue.put(s"[ERR] ${e.getMessage}")
Expand All @@ -61,7 +82,6 @@ class PveWebsocketResource {

while (!done && session.isOpen) {
val line = queue.take()

session.getBasicRemote.sendText(line)

if (line == "__DONE__") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,37 @@ class PveResourceSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach
Files.exists(pythonPath) shouldBe true
Files.exists(pipPath) shouldBe true

PveManager.getEnvironments(testCuid) should contain(testPveName)
PveManager.getEnvironments(testCuid).map(_.pveName) should contain(testPveName)
}

"PveManager" should "install a user package and list it for the PVE" in {
PveManager.createNewPve(testCuid, queue, testPveName, isLocal = true)

val packageName = "colorama"
val packageVersion = "0.4.6"
val packageSpec = s"$packageName==$packageVersion"

queue.clear()

PveManager.installUserPackages(
List(packageSpec),
testCuid,
queue,
testPveName
)

val logs = queueText()

logs should not include "[PVE][ERR]"
logs should include(s"[PVE] Installing package: $packageSpec")
logs should include(s"[user-package] $packageSpec")

val pve = PveManager
.getEnvironments(testCuid)
.find(_.pveName == testPveName)

pve should not be empty
pve.get.userPackages should contain(packageSpec)
}

"PveManager" should "delete all PVEs for a computing unit" in {
Expand Down
Loading