-
Notifications
You must be signed in to change notification settings - Fork 59
feat: implement pluggable taxonomy and policy security plugin #151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ViktorVeselov
wants to merge
14
commits into
google:main
Choose a base branch
from
ViktorVeselov:feature/pluggable-taxonomy-engine
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
eb10205
feat: implement pluggable taxonomy and policy security plugin
ViktorVeselov 419a019
add: add test method model and skills
ViktorVeselov 0459183
feat: wire description shaping, system prompt steering, and priority …
ViktorVeselov 48b8132
test: add AvatarConfig mock
ViktorVeselov fc0a4e7
feat: add steering hook such as descripiton, sys prompts. tool sortin…
ViktorVeselov cbb701c
test: add docstrings
ViktorVeselov 1411e5b
feat: expose pre-built resolver and policy in package API
ViktorVeselov b64813f
feat: add triggers and variables to TaxonomyTerm model
ViktorVeselov 3ec1dbe
feat: implement DefaultKeywordResolver and DefaultSkillPolicy
ViktorVeselov 361fb67
feat: wire Plugin interceptors for skill and prompt tailoring
ViktorVeselov ce5913a
test: add coverage for flat JSON parsing and variable interpolation
ViktorVeselov 63eda7a
refactor: add shape_skill hook, typing imports, and interpolation war…
ViktorVeselov 98b8792
feat: implement robust cross-platform path validation and delegate sk…
ViktorVeselov e89f1fd
test: add unit tests for robust path safety, interpolation warnings, …
ViktorVeselov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| # Copyright 2026 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Pluggable Policy & Taxonomy Security Engine for ADK Community.""" | ||
|
|
||
| from .policy import DefaultSkillPolicy | ||
| from .policy import DefaultKeywordResolver | ||
| from .policy import SkillPolicy | ||
| from .policy import TaxonomyPipeline | ||
| from .policy import TaxonomyResolver | ||
| from .taxonomy_config import TaxonomyRegistry | ||
| from .taxonomy_config import TaxonomyTerm | ||
| from .taxonomy_plugin import TaxonomyPlugin | ||
|
|
||
| __all__ = [ | ||
| "DefaultSkillPolicy", | ||
| "DefaultKeywordResolver", | ||
| "SkillPolicy", | ||
| "TaxonomyPipeline", | ||
| "TaxonomyPlugin", | ||
| "TaxonomyRegistry", | ||
| "TaxonomyResolver", | ||
| "TaxonomyTerm", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,321 @@ | ||
| # Copyright 2026 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Abstract interfaces for taxonomy resolution and skill policy enforcement.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from abc import ABC, abstractmethod | ||
| import logging | ||
| from typing import Any, Optional | ||
|
|
||
| from google.adk.agents.readonly_context import ReadonlyContext | ||
| from google.adk.models.llm_request import LlmRequest | ||
| from google.adk.skills.models import Skill | ||
|
|
||
| logger = logging.getLogger("google_adk_community." + __name__) | ||
|
|
||
| class TaxonomyResolver(ABC): | ||
| """Abstract base class for taxonomy resolution. | ||
|
|
||
| Resolvers analyze context and LLM history to determine which taxonomy | ||
| classification domains (e.g. URI strings) are currently active and relevant. | ||
| """ | ||
|
|
||
| @abstractmethod | ||
| async def resolve_taxonomies( | ||
| self, context: ReadonlyContext, llm_request: LlmRequest | ||
| ) -> list[str]: | ||
| """Resolves active taxonomy domain URIs from context and LLM history. | ||
|
|
||
| Args: | ||
| context: The current read-only execution context. | ||
| llm_request: The upcoming LLM request holding prompt configurations. | ||
|
|
||
| Returns: | ||
| A list of resolved active taxonomy strings/URIs. | ||
| """ | ||
| pass | ||
|
|
||
|
|
||
| class TaxonomyPipeline(TaxonomyResolver): | ||
| """Executes a sequence of taxonomy resolvers in order (multi-step pipeline). | ||
|
|
||
| This implements a composite/pipeline pattern to merge active taxonomy domains | ||
| identified by multiple independent heuristics (e.g. lexical, model-based). | ||
| """ | ||
|
|
||
| def __init__(self, resolvers: list[TaxonomyResolver]): | ||
| self.resolvers = resolvers | ||
|
|
||
| async def resolve_taxonomies( | ||
| self, context: ReadonlyContext, llm_request: LlmRequest | ||
| ) -> list[str]: | ||
| # Aggregates unique taxonomy domains across all registered resolvers | ||
| active_domains: set[str] = set() | ||
| for resolver in self.resolvers: | ||
| domains = await resolver.resolve_taxonomies(context, llm_request) | ||
| if domains: | ||
| active_domains.update(domains) | ||
| return list(active_domains) | ||
|
|
||
|
|
||
| class DefaultKeywordResolver(TaxonomyResolver): | ||
| """Declarative, configuration-driven keyword/phrase resolver. | ||
|
|
||
| Scans user prompt history for triggering phrases defined directly inside each | ||
| taxonomy term's triggers list or alt_labels, resolving active domains natively. | ||
| """ | ||
|
|
||
| def __init__(self, registry: Any): | ||
| self.registry = registry | ||
|
|
||
| async def resolve_taxonomies(self, context: ReadonlyContext, llm_request: LlmRequest) -> list[str]: | ||
| active_domains: set[str] = set() | ||
|
|
||
| for term_id in self.registry.list_ids(): | ||
| term = self.registry.get_term(term_id) | ||
| if term: | ||
| triggers = getattr(term, "triggers", []) | ||
| if not triggers and hasattr(term, "model_extra"): | ||
| triggers = (term.model_extra or {}).get("triggers", []) | ||
|
|
||
| # Fall back to alt_labels as secondary keyword triggers | ||
| if not triggers and hasattr(term, "alt_labels"): | ||
| triggers = term.alt_labels | ||
|
|
||
| if triggers: | ||
| for turn in llm_request.contents: | ||
| for part in turn.parts: | ||
| if part.text: | ||
| text_upper = part.text.upper() | ||
| if any(str(phrase).upper() in text_upper for phrase in triggers): | ||
| active_domains.add(term_id) | ||
| break | ||
|
|
||
| return list(active_domains) | ||
|
|
||
|
|
||
| class SkillPolicy(ABC): | ||
| """Abstract policy engine determining skill execution permissions and instruction shaping. | ||
|
|
||
| This class defines the interface for two main responsibilities: | ||
| 1. Access Control (Authorization): Blocking or permitting skills based on active taxonomies. | ||
| 2. Cognitive Steering (Behavioral Shaping): Altering skill instructions, descriptions, | ||
| prioritization, and global system prompts to steer agent execution dynamically. | ||
|
|
||
| Implements the Hook Method pattern, providing concrete default pass-throughs | ||
| for steering while keeping authorization and core shaping abstract. | ||
| """ | ||
|
|
||
| @abstractmethod | ||
| def is_skill_allowed( | ||
| self, | ||
| skill: Skill, | ||
| context: ReadonlyContext, | ||
| active_taxonomies: list[str], | ||
| ) -> bool: | ||
| """Determines if a skill can be loaded/used under the active taxonomies and context. | ||
|
|
||
| Args: | ||
| skill: The target Skill model instance. | ||
| context: The read-only interaction context. | ||
| active_taxonomies: The list of currently active taxonomy domains. | ||
|
|
||
| Returns: | ||
| True if the skill is permitted to run, False otherwise. | ||
| """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def shape_instructions( | ||
| self, | ||
| skill: Skill, | ||
| context: ReadonlyContext, | ||
| original_instructions: str, | ||
| ) -> str: | ||
| """Applies dynamic instruction shaping/guardrails to a skill's instructions. | ||
|
|
||
| Use this to append safety restrictions, enforce compliance constraints, | ||
| or adjust operating parameters of a skill before execution. | ||
| """ | ||
| pass | ||
|
|
||
| def shape_description( | ||
| self, | ||
| skill: Skill, | ||
| context: ReadonlyContext, | ||
| original_description: str, | ||
| ) -> str: | ||
| """Applies dynamic description shaping before the tool reaches the agent. | ||
|
|
||
| This can be used to emphasize specific features of a skill to the LLM or | ||
| prune redundant information to fit within context limits. | ||
| """ | ||
| return original_description | ||
|
|
||
| def shape_system_instruction( | ||
| self, | ||
| context: ReadonlyContext, | ||
| active_taxonomies: list[str], | ||
| original_instructions: str, | ||
| ) -> str: | ||
| """Applies dynamic instruction shaping to the global agent system instructions. | ||
|
|
||
| Use this to dynamically inject directives (e.g. telling the LLM to trigger | ||
| certain tools almost by default or prioritize specific workflows) depending | ||
| on the current active taxonomy classification. | ||
| """ | ||
| return original_instructions | ||
|
|
||
| def prioritize_skills( | ||
| self, | ||
| skills: list[Skill], | ||
| context: ReadonlyContext, | ||
| active_taxonomies: list[str], | ||
| ) -> list[Skill]: | ||
| """Prioritizes, reorders, or accentuates skills under the active taxonomy. | ||
|
|
||
| Allows the policy to sort key tools to the top of the available_skills XML list | ||
| presented in the prompt, encouraging the LLM to select preferred actions. | ||
| """ | ||
| return skills | ||
|
|
||
| def shape_skill( | ||
| self, | ||
| skill: Skill, | ||
| context: ReadonlyContext, | ||
| shaped_description: Optional[str], | ||
| ) -> Skill: | ||
| """Prepares and shapes a skill representation for presentation to the agent. | ||
|
|
||
| Defaults to a secure manual reconstruction to prevent accidental leakage of | ||
| internal developer/business flags to LLM prompts, but can be overridden by | ||
| custom policies to use `model_copy()` or other strategies. | ||
| """ | ||
| assert skill is not None, "Skill instance cannot be None" | ||
|
|
||
| from google.adk.skills.models import Skill, Frontmatter | ||
| extra = getattr(skill.frontmatter, "model_extra", None) or {} | ||
| return Skill( | ||
| frontmatter=Frontmatter( | ||
| name=skill.frontmatter.name, | ||
| description=shaped_description, | ||
| **extra | ||
| ), | ||
| instructions=skill.instructions | ||
| ) | ||
|
|
||
|
|
||
| def _get_taxonomy_binds(skill: Skill) -> list[str]: | ||
| """Dynamically extracts taxonomy binds, supporting both modified and unmodified core SDKs. | ||
|
|
||
| This utility functions as a robust protocol layer. If the SDK natively supports | ||
| frontmatter taxonomy binds, it reads them directly. Otherwise, it falls back to parsing | ||
| Pydantic extra fields (since core SDK uses `extra="allow"`), handling variations in | ||
| hyphenation/naming conventions. | ||
| """ | ||
| # Direct attribute access check | ||
| if hasattr(skill.frontmatter, "taxonomy_binds"): | ||
| return skill.frontmatter.taxonomy_binds | ||
|
|
||
| # Fallback: Read from Pydantic's model_extra dictionary (natively populated because of extra="allow") | ||
| extra = getattr(skill.frontmatter, "model_extra", None) or {} | ||
| binds = extra.get("taxonomy-binds") or extra.get("taxonomy_binds") or [] | ||
| if isinstance(binds, str): | ||
| return [binds] | ||
| return list(binds) | ||
|
|
||
|
|
||
| def _interpolate_variables(text: str, active_taxonomies: list[str], registry: Optional[Any]) -> str: | ||
| if not text or not registry: | ||
| return text | ||
|
|
||
| import re | ||
| pattern = r"\{taxonomy:([a-zA-Z0-9_-]+)\}" | ||
|
|
||
| def replace(match): | ||
| var_name = match.group(1) | ||
| for tax_id in active_taxonomies: | ||
| term = registry.get_term(tax_id) | ||
| if term: | ||
| variables = getattr(term, "variables", {}) | ||
| if not variables and hasattr(term, "model_extra"): | ||
| variables = (term.model_extra or {}).get("variables", {}) | ||
| if variables and var_name in variables: | ||
| return str(variables[var_name]) | ||
|
|
||
| logger.warning("Taxonomy variable %r not found under active taxonomies: %s", var_name, active_taxonomies) | ||
| return "" | ||
|
|
||
| return re.sub(pattern, replace, text) | ||
|
|
||
|
|
||
| class DefaultSkillPolicy(SkillPolicy): | ||
| """Default skill policy using taxonomy-bind set-intersection matching. | ||
|
|
||
| If a skill has no taxonomy binds defined, it is treated as unrestricted/allowed by default. | ||
| If it has binds, at least one bind must intersect with the active taxonomy set. | ||
| """ | ||
|
|
||
| def __init__(self, registry: Optional[Any] = None): | ||
| self.registry = registry | ||
|
|
||
| def is_skill_allowed( | ||
| self, | ||
| skill: Skill, | ||
| context: ReadonlyContext, | ||
| active_taxonomies: list[str], | ||
| ) -> bool: | ||
| binds = _get_taxonomy_binds(skill) | ||
| # Unrestricted skills are always allowed | ||
| if not binds: | ||
| return True | ||
| # Require at least one matching taxonomy between active set and skill binds | ||
| return bool(set(binds) & set(active_taxonomies)) | ||
|
|
||
| def shape_instructions( | ||
| self, | ||
| skill: Skill, | ||
| context: ReadonlyContext, | ||
| original_instructions: str, | ||
| ) -> str: | ||
| active_taxonomies = context.state.get("_active_taxonomies") or [] | ||
| return _interpolate_variables(original_instructions, active_taxonomies, self.registry) | ||
|
|
||
| def shape_description( | ||
| self, | ||
| skill: Skill, | ||
| context: ReadonlyContext, | ||
| original_description: str, | ||
| ) -> str: | ||
| active_taxonomies = context.state.get("_active_taxonomies") or [] | ||
| return _interpolate_variables(original_description, active_taxonomies, self.registry) | ||
|
|
||
| def shape_system_instruction( | ||
| self, | ||
| context: ReadonlyContext, | ||
| active_taxonomies: list[str], | ||
| original_instructions: str, | ||
| ) -> str: | ||
| return _interpolate_variables(original_instructions, active_taxonomies, self.registry) | ||
|
|
||
| def prioritize_skills( | ||
| self, | ||
| skills: list[Skill], | ||
| context: ReadonlyContext, | ||
| active_taxonomies: list[str], | ||
| ) -> list[Skill]: | ||
| # No-op pass-through for default behavior | ||
| return skills | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a variable is not found in the registry, it is replaced with an empty string. This might make debugging difficult if there's a typo in the template (e.g.,
{taxonomy:typo}).Consider logging a warning when a variable is not found, or keeping the placeholder text to make it obvious that interpolation failed.