[api][runtime] Extend agent skill loading repository.#655
Conversation
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
f448700 to
f2d4239
Compare
|
Thanks for the PR. Left some comments as below. |
| * <p>The zip is downloaded to a temp file and extracted into a process-local temp directory | ||
| * (cleaned up at JVM exit). The downloaded zip itself is removed once extraction completes. | ||
| */ | ||
| public class URLSkillRepository extends FileSystemSkillRepository { |
There was a problem hiding this comment.
Inheritance is being used as composition.
A URL repo is not logically a FileSystem repo — it fetches over the network, then delegates. Side effects: the constructor performs network I/O before super(...), the original url is lost for diagnostics, there's no close() lifecycle, and the fetch step can't be mocked alone.
Suggest a SkillSource { Path materialize() } interface composed into one MaterializingSkillRepository. Same applies to ClasspathSkillRepository and PackageSkillRepository.
| private List<String> paths; | ||
| private final List<String> paths; | ||
| private final List<String> urls; | ||
| private final List<String> classpathResources; |
There was a problem hiding this comment.
Adding a new source today forces 5 parallel changes (Skills field + factory + XxxSkillRepository + SkillManager for-loop + AgentPlan merge branch).
Suggest a discriminated union — single sources: List<SkillSourceSpec(scheme, location)> field + a scheme → handler registry. New source = one registration, no other file touched. Bonus: cross-language ser/de becomes uniform.
Wire-format-affecting; much cheaper to fix before 0.3.0 ships.
|
|
||
| # http(s) URLs. Each URL must point to a ``.zip`` whose top level is | ||
| # the baseDir. | ||
| urls: List[str] = Field(default_factory=list) |
There was a problem hiding this comment.
Cross-language deserialization silently drops fields. Python writes packages → Java drops it → 0 skills loaded, no warning. Mirror for Java classpathResources → Python.
Short term: emit a WARN on unrecognized source fields during deserialization. Long term: unify under one cross-language scheme (see the sources discriminator suggested on Skills.java).
| if ("jar".equals(protocol)) { | ||
| return SkillMaterializer.extractClasspathFromJar(url, resource); | ||
| } | ||
| throw new IllegalArgumentException( |
There was a problem hiding this comment.
Flink user JARs typically use Maven Shade or jar-in-jar; getResource("skills") returns protocols like jar:nested: that throw right here. findInUrlClassLoader also assumes URLClassLoader, not Flink's ChildFirstClassLoader.
Issue #593 explicitly targets this scenario. Before merge: (1) an end-to-end test on a Flink mini-cluster, (2) use ClassLoader.getResources() + getResourceAsStream instead of dispatching on URL protocol.
There was a problem hiding this comment.
What is the significance of highlighting URLClassLoader and ChildFirstClassLoader here? I understand that ChildFirstClassLoader is also a subclass of URLClassLoader.
Since we set the contextClassLoader to Flink's UserCodeClassloader before executing a JavaAction, it is used in most scenarios. However, there was an issue with the initialization of the skill-managed path in Python Actions. I have modified the code to explicitly pass the UserCodeClassloader.
Typically, I understand that the Flink user JAR does not use the jar-in-jar approach.
getResourceAsStream requires a file path as input, but the resource here is typically a directory name, so getResourceAsStream cannot be used.
| @@ -44,7 +47,7 @@ public class SkillManager { | |||
|
|
|||
| public SkillManager(Skills config) { | |||
There was a problem hiding this comment.
Constructor is synchronous, blocking, and not recoverable. loadAll() fetches sources serially; one slow URL stalls startup, one failure kills the whole SkillManager, and repos is final so there's no hot reload.
Follow-up: parallel materialization (CompletableFuture), fail-soft mode with a failure list, lazy load on first getSkill(), optional reloadSource() API.
There was a problem hiding this comment.
This behavior will be optimized in subsequent iterations.
| } | ||
| } | ||
|
|
||
| private void registerRepo(SkillRepository repo) { |
There was a problem hiding this comment.
Duplicate skill names overwrite silently with no log. Multi-source name collisions go through skills.put() and the later registration wins — no WARN, no error. Users debugging "why doesn't my SKILL.md take effect" get no diagnostic.
Minimum: WARN with both origins. Better: document the paths → urls → classpath precedence. (Naming the source concretely needs the SkillOrigin change on AgentSkill.)
| // Fallback for URLClassLoader: JARs without explicit directory entries will not return | ||
| // a URL for a prefix via getResource(). Scan the classloader's URLs directly to find | ||
| // any JAR entry that starts with the resource prefix. | ||
| url = findInUrlClassLoader(resource, classLoader); |
There was a problem hiding this comment.
findInUrlClassLoader silently uses only the first matching jar. Multiple plugin jars with the same prefix → every jar after the first is dropped, no log. Minimum: log the choice. Ideally merge across all matches, or fail loudly when more than one matches.
Also assumes URLClassLoader, not Flink's ChildFirstClassLoader — see the nested-jar thread on materialize.
There was a problem hiding this comment.
I think merge across all matches makes sense.
|
|
||
| @JsonCreator | ||
| public Skills(@JsonProperty("paths") List<String> paths) { | ||
| public Skills( |
There was a problem hiding this comment.
Source-incompatible constructor change. Wire compat is fine; but new Skills(List.of("/data/skills")) no longer compiles. Either keep a @Deprecated 1-arg constructor that forwards, or call it out in the 0.3.0 release notes.
If Skills later migrates to a single sources field (see the discriminator thread), this signature changes again — better to break once.
| org.apache.flink.agents.api.resource.ResourceContext | ||
| .fromGetResource((n, t) -> null)); | ||
| assertEquals(java.util.List.of("/tmp/skill-d"), merged.getPaths()); | ||
| assertEquals(List.of("/tmp/skill-d"), merged.getPaths()); |
There was a problem hiding this comment.
Merge-coverage gap. AgentPlan.addSkills now merges three LinkedHashSets but the test only asserts on getPaths(). Add cases for overlapping urls, overlapping classpathResources, and a mixed three-source case asserting all three end up in the merged result.
| return cls(urls=list(urls)) | ||
|
|
||
| @classmethod | ||
| def from_package(cls, package: str, resource: str) -> Skills: |
There was a problem hiding this comment.
API asymmetry. Java fromClasspath(String...) is varargs; Python from_package(package, resource) is single-pair, so two packages require two @skills functions.
Make it symmetric: def from_package(cls, *pairs: Tuple[str, str]). See also the cross-language drop thread on the packages field.
…RL prefix. Merge the two-pass zip-slip-validate-then-extract into one pass, and extract the magic 4 in extractClasspathFromJar as JAR_URL_PREFIX_LEN. Addresses review comments on PR apache#655. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
#1. Replaces Skills's parallel paths/urls/classpathResources/packages fields with a single sources: List<SkillSourceSpec> + a scheme-keyed registry. Reduces "add a new source" from 5 touchpoints to 1, also closes review apache#3, apache#13, apache#16, apache#17 along the way. Java/Python in lockstep; pre-0.3.0 wire-format break, no deprecation shims. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…inator + registry.
Skills now carries a single sources: List<SkillSourceSpec> field instead
of paths / urls / classpathResources (Java) and paths / urls / packages
(Python). Each spec is {scheme, params}; loading dispatches through a
scheme-keyed SkillSourceRegistry. Adding a new source = one register
call plus one SkillRepository, rather than the previous five parallel
touchpoints (Skills field + factory + repo class + SkillManager loop +
AgentPlan merge branch).
Java registers local / url / classpath; Python registers local / url /
package. Cross-language unsupported schemes fail loud at load time
(replaces the previous silent field drop).
Also picks up:
* from_package on Python becomes varargs (closes review apache#17).
* AgentPlanDeclareSkillsTest covers url / classpath / mixed-scheme dedup
(closes review apache#16).
Addresses PR apache#655 review #1; one-shot wire-format break before 0.3.0
ships, no deprecation shims (decision per design spec).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…emp-dir leak. Per PR apache#655 review apache#10: every URL / classpath / zip materialization registered a JVM shutdown hook that survived past the owning SkillManager, so Flink failover (task redeployed but JVM not exiting) leaked hooks and temp dirs monotonically. Fix is a close-on-operator-close chain that mirrors the pattern already used by other Resources: * SkillMaterializer.extractZipSafely / extractClasspathFromJar now return a Materialized AutoCloseable handle holding the temp dir + hook; close() is idempotent and tolerates a JVM already in shutdown. * SkillRepository extends AutoCloseable (default no-op); the repo impls that own a Materialized close it. * SkillManager implements AutoCloseable, closing every owned repo (identity-dedup since multiple skill names can share a repo). * ResourceContextImpl implements AutoCloseable, closing its lazily-cached SkillManager; ensureSkillManager() now closes the previous instance on config swap. * Java aligned with Python: ResourceCache gains setResourceContext + reuses the injected ctx in getResource (replacing the per-call new ResourceContextImpl), and close() cascades to ctx -> SkillManager. ActionExecutionOperator injects one ctx; the two ctx instances at lines 683 / 759 (and the one in PythonMCPResourceDiscovery) now share the cache's ctx. * Python mirrors the same shape: Materialized class with atexit.unregister-able cleanup; SkillRepository.close default no-op; SkillManager close + context-manager; ResourceContextImpl.close; ResourceCache.close cascades to ctx. * PackageSkillRepository also restructures its as_file atexit lambda into a bound method so close() can unregister it. Tests: SkillMaterializerTest gains closeRemovesTempDirAndDeregistersHook and borrowedMaterializedDoesNotRemoveDir; SkillManagerTest gains closeReleasesUrlRepoTempDir; ResourceCacheTest gains testGetResourceWithoutContextThrows + the existing two cases wire up the context via a new helper. Python: test_materialize.py rewritten around Materialized; test_manager.py adds test_close_releases_url_repo_temp_dir. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously ClasspathSkillRepository.findInUrlClassLoader returned the first matching JAR and silently dropped the rest. When a deployment had multiple plugin JARs each contributing skills under the same prefix (plugin-a.jar -> skills/github, plugin-b.jar -> skills/email, ...), only one plugin's skills loaded with no log. Fix: * findAllMatches replaces findInUrlClassLoader. Returns the full list of distinct URLs from ClassLoader.getResources(...) plus the URLClassLoader-getURLs() fallback scan (kept for JARs without explicit directory entries). * SkillMaterializer gains extractClasspathFromJars(List<URL>, prefix): merges entries from every JAR into a single temp dir + single shutdown hook. Same-path collisions log WARN and last-write-wins. extractClasspathFromJar(URL, prefix) becomes a thin wrapper. * ClasspathSkillRepository.materialize dispatches by URL protocol: - all jar: -> extractClasspathFromJars (merged) - file: only -> existing single-source path (borrowed dir / zip) - mixed -> WARN, file URL wins (rare; multi-classpath-root case) Multi-jar merge logs INFO; mixed/file-multi logs WARN. Tests: * SkillMaterializerTest: extractClasspathFromJarsMergesEntries, extractClasspathFromJarsLastWriteWinsOnCollision. * ClasspathSkillRepositoryTest: loadFromMultipleJarsMergesSkills, loadFromMultipleJarsWithCollisionLastWins. * Existing single-jar / directory / missing tests unchanged and pass. Addresses PR apache#655 review apache#12. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ame override. Addresses PR apache#655 review apache#6 and apache#11 together (their fixes are coupled — apache#11's "WARN with both origins" depends on apache#6 making origin queryable). * New SkillOrigin (Java + Python) carries the scheme + location of the source that produced a skill. scheme mirrors the SkillSourceSpec scheme; location is a human-readable identifier (path / URL / classpath resource name / package+resource). * AgentSkill gains a nullable `origin` field + setter. SkillManager attaches it during registerRepo (origin built from the SkillSourceSpec the repo came from). * registerRepo logs WARN when a put replaces an existing skill of the same name, naming both the new origin and the previous one — closing the silent-override path from review apache#11. * SkillManager's parallel `repos` map is left in place; it exists for getSkillDir(s) / resolveResourcePath (filesystem-backed paths), not for origin, and is orthogonal to this change. Review #2's composition refactor would clean it up. * loadedAt (suggested in review apache#6) intentionally omitted — current loadAll constructs all skills in one synchronous batch so timestamps are uninformative; revisit when hot-reload (review apache#5) lands. Tests: * SkillOriginTest: toString format + equality. * SkillManagerTest: origin attached after load; duplicate name — last-write-wins and the surviving skill carries the second origin (the WARN log itself isn't asserted; verified via origin check). * Python mirrors. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Hi, @wzhero1, Ty for your thorough review. I fixed some design issues and bugs, including
The remaining issues related to robustness and security will be iterated on in subsequent phases. |
…RL prefix. Merge the two-pass zip-slip-validate-then-extract into one pass, and extract the magic 4 in extractClasspathFromJar as JAR_URL_PREFIX_LEN. Addresses review comments on PR apache#655. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…inator + registry.
Skills now carries a single sources: List<SkillSourceSpec> field instead
of paths / urls / classpathResources (Java) and paths / urls / packages
(Python). Each spec is {scheme, params}; loading dispatches through a
scheme-keyed SkillSourceRegistry. Adding a new source = one register
call plus one SkillRepository, rather than the previous five parallel
touchpoints (Skills field + factory + repo class + SkillManager loop +
AgentPlan merge branch).
Java registers local / url / classpath; Python registers local / url /
package. Cross-language unsupported schemes fail loud at load time
(replaces the previous silent field drop).
Also picks up:
* from_package on Python becomes varargs (closes review apache#17).
* AgentPlanDeclareSkillsTest covers url / classpath / mixed-scheme dedup
(closes review apache#16).
Addresses PR apache#655 review #1; one-shot wire-format break before 0.3.0
ships, no deprecation shims (decision per design spec).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…emp-dir leak. Per PR apache#655 review apache#10: every URL / classpath / zip materialization registered a JVM shutdown hook that survived past the owning SkillManager, so Flink failover (task redeployed but JVM not exiting) leaked hooks and temp dirs monotonically. Fix is a close-on-operator-close chain that mirrors the pattern already used by other Resources: * SkillMaterializer.extractZipSafely / extractClasspathFromJar now return a Materialized AutoCloseable handle holding the temp dir + hook; close() is idempotent and tolerates a JVM already in shutdown. * SkillRepository extends AutoCloseable (default no-op); the repo impls that own a Materialized close it. * SkillManager implements AutoCloseable, closing every owned repo (identity-dedup since multiple skill names can share a repo). * ResourceContextImpl implements AutoCloseable, closing its lazily-cached SkillManager; ensureSkillManager() now closes the previous instance on config swap. * Java aligned with Python: ResourceCache gains setResourceContext + reuses the injected ctx in getResource (replacing the per-call new ResourceContextImpl), and close() cascades to ctx -> SkillManager. ActionExecutionOperator injects one ctx; the two ctx instances at lines 683 / 759 (and the one in PythonMCPResourceDiscovery) now share the cache's ctx. * Python mirrors the same shape: Materialized class with atexit.unregister-able cleanup; SkillRepository.close default no-op; SkillManager close + context-manager; ResourceContextImpl.close; ResourceCache.close cascades to ctx. * PackageSkillRepository also restructures its as_file atexit lambda into a bound method so close() can unregister it. Tests: SkillMaterializerTest gains closeRemovesTempDirAndDeregistersHook and borrowedMaterializedDoesNotRemoveDir; SkillManagerTest gains closeReleasesUrlRepoTempDir; ResourceCacheTest gains testGetResourceWithoutContextThrows + the existing two cases wire up the context via a new helper. Python: test_materialize.py rewritten around Materialized; test_manager.py adds test_close_releases_url_repo_temp_dir. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> fix rebase conflict
…tory. Previously ClasspathSkillRepository fell back to Thread.currentThread().getContextClassLoader() to resolve classpath resources. That's stable on the Java-action path (JavaActionTask explicitly sets the context CL to the user-code CL), but SkillManager is constructed lazily inside BaseChatModelSetup.open() — and "first caller wins" means a Python action triggering the chat-model resolve before any Java action gets to it leaves the context CL undefined (Python interpreter thread / pemja callback / async pool), and the classpath: source can silently miss the user JAR. Fix: pass the user-code CL down explicitly. SkillSourceHandler now takes (params, ClassLoader); SkillManager / ResourceContextImpl / ResourceCache each get a primary constructor taking ClassLoader and a convenience overload that uses the thread context (kept for tests and standalone use). ActionExecutionOperator wires getRuntimeContext().getUserCodeClassLoader() into ResourceCache, which threads it through to ClasspathSkillRepository regardless of which thread later triggers the lazy SkillManager construction. ClasspathSkillRepository scan logic is unchanged — the fat-jar / multi-jar concerns in review apache#4 / apache#12 are out of scope for this fix. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously ClasspathSkillRepository.findInUrlClassLoader returned the first matching JAR and silently dropped the rest. When a deployment had multiple plugin JARs each contributing skills under the same prefix (plugin-a.jar -> skills/github, plugin-b.jar -> skills/email, ...), only one plugin's skills loaded with no log. Fix: * findAllMatches replaces findInUrlClassLoader. Returns the full list of distinct URLs from ClassLoader.getResources(...) plus the URLClassLoader-getURLs() fallback scan (kept for JARs without explicit directory entries). * SkillMaterializer gains extractClasspathFromJars(List<URL>, prefix): merges entries from every JAR into a single temp dir + single shutdown hook. Same-path collisions log WARN and last-write-wins. extractClasspathFromJar(URL, prefix) becomes a thin wrapper. * ClasspathSkillRepository.materialize dispatches by URL protocol: - all jar: -> extractClasspathFromJars (merged) - file: only -> existing single-source path (borrowed dir / zip) - mixed -> WARN, file URL wins (rare; multi-classpath-root case) Multi-jar merge logs INFO; mixed/file-multi logs WARN. Tests: * SkillMaterializerTest: extractClasspathFromJarsMergesEntries, extractClasspathFromJarsLastWriteWinsOnCollision. * ClasspathSkillRepositoryTest: loadFromMultipleJarsMergesSkills, loadFromMultipleJarsWithCollisionLastWins. * Existing single-jar / directory / missing tests unchanged and pass. Addresses PR apache#655 review apache#12. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ame override. Addresses PR apache#655 review apache#6 and apache#11 together (their fixes are coupled — apache#11's "WARN with both origins" depends on apache#6 making origin queryable). * New SkillOrigin (Java + Python) carries the scheme + location of the source that produced a skill. scheme mirrors the SkillSourceSpec scheme; location is a human-readable identifier (path / URL / classpath resource name / package+resource). * AgentSkill gains a nullable `origin` field + setter. SkillManager attaches it during registerRepo (origin built from the SkillSourceSpec the repo came from). * registerRepo logs WARN when a put replaces an existing skill of the same name, naming both the new origin and the previous one — closing the silent-override path from review apache#11. * SkillManager's parallel `repos` map is left in place; it exists for getSkillDir(s) / resolveResourcePath (filesystem-backed paths), not for origin, and is orthogonal to this change. Review #2's composition refactor would clean it up. * loadedAt (suggested in review apache#6) intentionally omitted — current loadAll constructs all skills in one synchronous batch so timestamps are uninformative; revisit when hot-reload (review apache#5) lands. Tests: * SkillOriginTest: toString format + equality. * SkillManagerTest: origin attached after load; duplicate name — last-write-wins and the surviving skill carries the second origin (the WARN log itself isn't asserted; verified via origin check). * Python mirrors. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
URL / classpath / package repos no longer extend FileSystemSkillRepository; they implement SkillRepository directly and compose a new SkillDirectoryReader that owns the "parse SKILL.md under baseDir" logic. Each repo manages its own materialization and close lifecycle, and may carry its own source identity (url / resource / package) as a private diagnostic field. SkillRepository gains a default getSkillDir(name) so SkillManager drops its instanceof hack. Java: 310 tests pass (8 new in SkillDirectoryReaderTest). Python: 62 tests pass (8 new in test_skill_directory_reader.py). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Linked issue: #592, #593, #594
Purpose of change
Extend the way Flink-Agents loads external agent skills:
Tests
ut
API
Yes, add
from_xxxtoSkills.Documentation
doc-neededdoc-not-neededdoc-included