From b60ba0bb12c87c652125d0d9fb238e8b929c28c4 Mon Sep 17 00:00:00 2001 From: Erick Aleman Date: Tue, 21 Apr 2026 13:08:51 -0400 Subject: [PATCH 1/2] feat: add support for env_vars to ReasoningEngine.create --- .../reasoning_engines/_reasoning_engines.py | 8 + vertexai/reasoning_engines/_utils.py | 550 +++--------------- 2 files changed, 91 insertions(+), 467 deletions(-) diff --git a/vertexai/reasoning_engines/_reasoning_engines.py b/vertexai/reasoning_engines/_reasoning_engines.py index 322bf2a2d4..9ffd0799ca 100644 --- a/vertexai/reasoning_engines/_reasoning_engines.py +++ b/vertexai/reasoning_engines/_reasoning_engines.py @@ -161,6 +161,9 @@ def create( gcs_dir_name: str = _DEFAULT_GCS_DIR_NAME, sys_version: Optional[str] = None, extra_packages: Optional[Sequence[str]] = None, + env_vars: Optional[ + Union[Sequence[str], Dict[str, Union[str, aip_types.SecretRef]]] + ] = None, ) -> "ReasoningEngine": """Creates a new ReasoningEngine. @@ -301,6 +304,11 @@ def create( reasoning_engine_spec = aip_types.ReasoningEngineSpec( package_spec=package_spec, ) + if env_vars: + deployment_spec, _ = _utils._generate_deployment_spec_or_raise( + env_vars=env_vars, + ) + reasoning_engine_spec.deployment_spec = deployment_spec class_methods_spec = _generate_class_methods_spec_or_raise( reasoning_engine, _get_registered_operations(reasoning_engine) ) diff --git a/vertexai/reasoning_engines/_utils.py b/vertexai/reasoning_engines/_utils.py index dbb0938748..8926153d01 100644 --- a/vertexai/reasoning_engines/_utils.py +++ b/vertexai/reasoning_engines/_utils.py @@ -1,5 +1,4 @@ -# -*- coding: utf-8 -*- -# Copyright 2023 Google LLC +# Copyright 2026 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,472 +11,89 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -import dataclasses -import inspect -import json -import types -import typing -from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Sequence, Union - -import proto - -from google.cloud.aiplatform import base -from google.api import httpbody_pb2 -from google.protobuf import struct_pb2 -from google.protobuf import json_format - -try: - # For LangChain templates, they might not import langchain_core and get - # PydanticUserError: `query` is not fully defined; you should define - # `RunnableConfig`, then call `query.model_rebuild()`. - import langchain_core.runnables.config - - RunnableConfig = langchain_core.runnables.config.RunnableConfig -except ImportError: - RunnableConfig = Any - -try: - from llama_index.core.base.response import schema as llama_index_schema - from llama_index.core.base.llms import types as llama_index_types - - LlamaIndexResponse = llama_index_schema.Response - LlamaIndexBaseModel = llama_index_schema.BaseModel - LlamaIndexChatResponse = llama_index_types.ChatResponse -except ImportError: - LlamaIndexResponse = Any - LlamaIndexBaseModel = Any - LlamaIndexChatResponse = Any - -JsonDict = Dict[str, Any] - -_LOGGER = base.Logger(__name__) - - -def to_proto( - obj: Union[JsonDict, proto.Message], - message: Optional[proto.Message] = None, -) -> proto.Message: - """Parses a JSON-like object into a message. - - If the object is already a message, this will return the object as-is. If - the object is a JSON Dict, this will parse and merge the object into the - message. - - Args: - obj (Union[dict[str, Any], proto.Message]): - Required. The object to convert to a proto message. - message (proto.Message): - Optional. A protocol buffer message to merge the obj into. It - defaults to Struct() if unspecified. - - Returns: - proto.Message: The same message passed as argument. - """ - if message is None: - message = struct_pb2.Struct() - if isinstance(obj, (proto.Message, struct_pb2.Struct)): - return obj - try: - json_format.ParseDict(obj, message._pb) - except AttributeError: - json_format.ParseDict(obj, message) - return message - - -def to_dict(message: proto.Message) -> JsonDict: - """Converts the contents of the protobuf message to JSON format. - - Args: - message (proto.Message): - Required. The proto message to be converted to a JSON dictionary. - - Returns: - dict[str, Any]: A dictionary containing the contents of the proto. - """ - try: - # Best effort attempt to convert the message into a JSON dictionary. - result: JsonDict = json.loads(json_format.MessageToJson(message._pb)) - except AttributeError: - result: JsonDict = json.loads(json_format.MessageToJson(message)) - return result - - -def dataclass_to_dict(obj: dataclasses.dataclass) -> Any: - """Converts a dataclass to a JSON dictionary. - - Args: - obj (dataclasses.dataclass): - Required. The dataclass to be converted to a JSON dictionary. - - Returns: - dict[str, Any]: A dictionary containing the contents of the dataclass. - """ - return json.loads(json.dumps(dataclasses.asdict(obj))) - - -def _llama_index_response_to_dict(obj: LlamaIndexResponse) -> Any: - response = {} - if hasattr(obj, "response"): - response["response"] = obj.response - if hasattr(obj, "source_nodes"): - response["source_nodes"] = [node.model_dump_json() for node in obj.source_nodes] - if hasattr(obj, "metadata"): - response["metadata"] = obj.metadata - - return json.loads(json.dumps(response)) - - -def _llama_index_chat_response_to_dict(obj: LlamaIndexChatResponse) -> Any: - return json.loads(obj.message.model_dump_json()) - - -def _llama_index_base_model_to_dict(obj: LlamaIndexBaseModel) -> Any: - return json.loads(obj.model_dump_json()) - - -def to_json_serializable_llama_index_object( - obj: Union[ - LlamaIndexResponse, - LlamaIndexBaseModel, - LlamaIndexChatResponse, - Sequence[LlamaIndexBaseModel], - ], -) -> Union[str, Dict[str, Any], Sequence[Union[str, Dict[str, Any]]]]: - """Converts a LlamaIndexResponse to a JSON serializable object.""" - if isinstance(obj, LlamaIndexResponse): - return _llama_index_response_to_dict(obj) - if isinstance(obj, LlamaIndexChatResponse): - return _llama_index_chat_response_to_dict(obj) - if isinstance(obj, Sequence): - seq_result = [] - for item in obj: - if isinstance(item, LlamaIndexBaseModel): - seq_result.append(_llama_index_base_model_to_dict(item)) - continue - seq_result.append(str(item)) - return seq_result - if isinstance(obj, LlamaIndexBaseModel): - return _llama_index_base_model_to_dict(obj) - return str(obj) - - -def yield_parsed_json(body: httpbody_pb2.HttpBody) -> Iterable[Any]: - """Converts the contents of the httpbody message to JSON format. - - Args: - body (httpbody_pb2.HttpBody): - Required. The httpbody body to be converted to a JSON. - - Yields: - Any: A JSON object or the original body if it is not JSON or None. - """ - content_type = getattr(body, "content_type", None) - data = getattr(body, "data", None) - - if content_type is None or data is None or "application/json" not in content_type: - yield body - return - - try: - utf8_data = data.decode("utf-8") - except Exception as e: - _LOGGER.warning(f"Failed to decode data: {data}. Exception: {e}") - yield body - return - - if not utf8_data: - yield None - return - - # Handle the case of multiple dictionaries delimited by newlines. - for line in utf8_data.split("\n"): - if line: - try: - line = json.loads(line) - except Exception as e: - _LOGGER.warning(f"failed to parse json: {line}. Exception: {e}") - yield line - - -def generate_schema( - f: Callable[..., Any], - *, - schema_name: Optional[str] = None, - descriptions: Mapping[str, str] = {}, - required: Sequence[str] = [], -) -> JsonDict: - """Generates the OpenAPI Schema for a callable object. - Only positional and keyword arguments of the function `f` will be supported - in the OpenAPI Schema that is generated. I.e. `*args` and `**kwargs` will - not be present in the OpenAPI schema returned from this function. For those - cases, you can either include it in the docstring for `f`, or modify the - OpenAPI schema returned from this function to include additional arguments. - - Args: - f (Callable): - Required. The function to generate an OpenAPI Schema for. - schema_name (str): - Optional. The name for the OpenAPI schema. If unspecified, the name - of the Callable will be used. - descriptions (Mapping[str, str]): - Optional. A `{name: description}` mapping for annotating input - arguments of the function with user-provided descriptions. It - defaults to an empty dictionary (i.e. there will not be any - description for any of the inputs). - required (Sequence[str]): - Optional. For the user to specify the set of required arguments in - function calls to `f`. If specified, it will be automatically - inferred from `f`. - - Returns: - dict[str, Any]: The OpenAPI Schema for the function `f` in JSON format. - """ - pydantic = _import_pydantic_or_raise() - defaults = dict(inspect.signature(f).parameters) - fields_dict = { - name: ( - # 1. We infer the argument type here: use Any rather than None so - # it will not try to auto-infer the type based on the default value. - (param.annotation if param.annotation != inspect.Parameter.empty else Any), - pydantic.Field( - # 2. We do not support default values for now. - # default=( - # param.default if param.default != inspect.Parameter.empty - # else None - # ), - # 3. We support user-provided descriptions. - description=descriptions.get(name, None), - ), - ) - for name, param in defaults.items() - # We do not support *args or **kwargs - if param.kind - in ( - inspect.Parameter.POSITIONAL_OR_KEYWORD, - inspect.Parameter.KEYWORD_ONLY, - inspect.Parameter.POSITIONAL_ONLY, - ) - } - parameters = pydantic.create_model(f.__name__, **fields_dict).schema() - # Postprocessing - # 4. Suppress unnecessary title generation: - # * https://github.com/pydantic/pydantic/issues/1051 - # * http://cl/586221780 - parameters.pop("title", "") - for name, function_arg in parameters.get("properties", {}).items(): - function_arg.pop("title", "") - annotation = defaults[name].annotation - # 5. Nullable fields: - # * https://github.com/pydantic/pydantic/issues/1270 - # * https://stackoverflow.com/a/58841311 - # * https://github.com/pydantic/pydantic/discussions/4872 - if typing.get_origin(annotation) is Union and type(None) in typing.get_args( - annotation - ): - # for "typing.Optional" arguments, function_arg might be a - # dictionary like - # - # {'anyOf': [{'type': 'integer'}, {'type': 'null'}] - for schema in function_arg.pop("anyOf", []): - schema_type = schema.get("type") - if schema_type and schema_type != "null": - function_arg["type"] = schema_type - break - function_arg["nullable"] = True - # 6. Annotate required fields. - if required: - # We use the user-provided "required" fields if specified. - parameters["required"] = required - else: - # Otherwise we infer it from the function signature. - parameters["required"] = [ - k - for k in defaults - if ( - defaults[k].default == inspect.Parameter.empty - and defaults[k].kind - in ( - inspect.Parameter.POSITIONAL_OR_KEYWORD, - inspect.Parameter.KEYWORD_ONLY, - inspect.Parameter.POSITIONAL_ONLY, +import os +from typing import Dict, List, Optional, Sequence, Tuple, Union + +from google.cloud.aiplatform import v1 as aip_types + +def _update_deployment_spec_with_env_vars_dict_or_raise( + deployment_spec: aip_types.ReasoningEngineSpec.DeploymentSpec, + env_vars: Dict[str, Union[str, aip_types.SecretRef]], +): + """Updates deployment spec with environment variables from a dictionary.""" + for key, value in env_vars.items(): + if isinstance(value, str): + deployment_spec.env.append(aip_types.EnvVar(name=key, value=value)) + elif isinstance(value, aip_types.SecretRef): + deployment_spec.secret_env.append( + aip_types.ReasoningEngineSpec.DeploymentSpec.SecretEnvVar( + name=key, secret_ref=value ) ) - ] - schema = dict(name=f.__name__, description=f.__doc__, parameters=parameters) - if schema_name: - schema["name"] = schema_name - return schema - - -def is_noop_or_proxy_tracer_provider(tracer_provider) -> bool: - """Returns True if the tracer_provider is Proxy or NoOp.""" - opentelemetry = _import_opentelemetry_or_warn() - ProxyTracerProvider = opentelemetry.trace.ProxyTracerProvider - NoOpTracerProvider = opentelemetry.trace.NoOpTracerProvider - return isinstance(tracer_provider, (NoOpTracerProvider, ProxyTracerProvider)) - - -def _import_cloud_storage_or_raise() -> types.ModuleType: - """Tries to import the Cloud Storage module.""" - try: - from google.cloud import storage - except ImportError as e: - raise ImportError( - "Cloud Storage is not installed. Please call " - "'pip install google-cloud-aiplatform[agent_engines]'." - ) from e - return storage - - -def _import_cloudpickle_or_raise() -> types.ModuleType: - """Tries to import the cloudpickle module.""" - try: - import cloudpickle # noqa:F401 - except ImportError as e: - raise ImportError( - "cloudpickle is not installed. Please call " - "'pip install google-cloud-aiplatform[agent_engines]'." - ) from e - return cloudpickle - - -def _import_pydantic_or_raise() -> types.ModuleType: - """Tries to import the pydantic module.""" - try: - import pydantic - - _ = pydantic.Field - except AttributeError: - from pydantic import v1 as pydantic - except ImportError as e: - raise ImportError( - "pydantic is not installed. Please call " - "'pip install google-cloud-aiplatform[agent_engines]'." - ) from e - return pydantic - - -def _import_opentelemetry_or_warn() -> Optional[types.ModuleType]: - """Tries to import the opentelemetry module.""" - try: - import opentelemetry # noqa:F401 - - return opentelemetry - except ImportError: - _LOGGER.warning( - "opentelemetry-sdk is not installed. Please call " - "'pip install google-cloud-aiplatform[agent_engines]'." - ) - return None - - -def _import_opentelemetry_sdk_trace_or_warn() -> Optional[types.ModuleType]: - """Tries to import the opentelemetry.sdk.trace module.""" - try: - import opentelemetry.sdk.trace # noqa:F401 - - return opentelemetry.sdk.trace - except ImportError: - _LOGGER.warning( - "opentelemetry-sdk is not installed. Please call " - "'pip install google-cloud-aiplatform[agent_engines]'." - ) - return None - - -def _import_cloud_trace_v2_or_warn() -> Optional[types.ModuleType]: - """Tries to import the google.cloud.trace_v2 module.""" - try: - import google.cloud.trace_v2 - - return google.cloud.trace_v2 - except ImportError: - _LOGGER.warning( - "google-cloud-trace is not installed. Please call " - "'pip install google-cloud-aiplatform[agent_engines]'." - ) - return None - - -def _import_cloud_trace_exporter_or_warn() -> Optional[types.ModuleType]: - """Tries to import the opentelemetry.exporter.cloud_trace module.""" - try: - import opentelemetry.exporter.cloud_trace # noqa:F401 - - return opentelemetry.exporter.cloud_trace - except ImportError: - _LOGGER.warning( - "opentelemetry-exporter-gcp-trace is not installed. Please " - "call 'pip install google-cloud-aiplatform[langchain]'." - ) - return None - - -def _import_openinference_langchain_or_warn() -> Optional[types.ModuleType]: - """Tries to import the openinference.instrumentation.langchain module.""" - try: - import openinference.instrumentation.langchain # noqa:F401 - - return openinference.instrumentation.langchain - except ImportError: - _LOGGER.warning( - "openinference-instrumentation-langchain is not installed. Please " - "call 'pip install google-cloud-aiplatform[langchain]'." - ) - return None - - -def _import_openinference_autogen_or_warn() -> Optional[types.ModuleType]: - """Tries to import the openinference.instrumentation.autogen module.""" - try: - import openinference.instrumentation.autogen # noqa:F401 - - return openinference.instrumentation.autogen - except ImportError: - _LOGGER.warning( - "openinference-instrumentation-autogen is not installed. Please " - "call 'pip install openinference-instrumentation-autogen'." - ) - return None - - -def _import_openinference_llama_index_or_warn() -> Optional[types.ModuleType]: - """Tries to import the openinference.instrumentation.llama_index module.""" - try: - import openinference.instrumentation.llama_index # noqa:F401 - - return openinference.instrumentation.llama_index - except ImportError: - _LOGGER.warning( - "openinference-instrumentation-llama_index is not installed. Please " - "call 'pip install google-cloud-aiplatform[llama_index]'." - ) - return None - - -def _import_autogen_tools_or_warn() -> Optional[types.ModuleType]: - """Tries to import the autogen.tools module.""" - try: - from autogen import tools - - return tools - except ImportError: - _LOGGER.warning( - "autogen.tools is not installed. Please call: `pip install ag2[tools]`" - ) - return None - - -def _import_nest_asyncio_or_warn() -> Optional[types.ModuleType]: - """Tries to import the nest_asyncio module.""" - try: - import nest_asyncio + else: + raise TypeError( + f"env_vars values must be a string or SecretRef, but got {type(value)}." + ) - return nest_asyncio - except ImportError: - _LOGGER.warning( - "nest_asyncio is not installed. Please call: `pip install nest-asyncio`" - ) - return None +def _update_deployment_spec_with_env_vars_list_or_raise( + deployment_spec: aip_types.ReasoningEngineSpec.DeploymentSpec, + env_vars: Sequence[str], +): + """Updates deployment spec with environment variables from a list.""" + for key in env_vars: + if key not in os.environ: + raise ValueError(f"Environment variable '{key}' not found in os.environ.") + deployment_spec.env.append(aip_types.EnvVar(name=key, value=os.environ[key])) + +def _generate_deployment_spec_or_raise( + *, + env_vars: Optional[ + Union[Sequence[str], Dict[str, Union[str, aip_types.SecretRef]]] + ] = None, + psc_interface_config: Optional[aip_types.PscInterfaceConfig] = None, + min_instances: Optional[int] = None, + max_instances: Optional[int] = None, + resource_limits: Optional[Dict[str, str]] = None, + container_concurrency: Optional[int] = None, +) -> Tuple[aip_types.ReasoningEngineSpec.DeploymentSpec, List[str]]: + """Generates a DeploymentSpec based on the provided parameters.""" + deployment_spec = aip_types.ReasoningEngineSpec.DeploymentSpec() + update_masks = [] + if env_vars: + deployment_spec.env = [] + deployment_spec.secret_env = [] + if isinstance(env_vars, dict): + _update_deployment_spec_with_env_vars_dict_or_raise( + deployment_spec=deployment_spec, + env_vars=env_vars, + ) + elif isinstance(env_vars, (list, tuple)): + _update_deployment_spec_with_env_vars_list_or_raise( + deployment_spec=deployment_spec, + env_vars=env_vars, + ) + else: + raise TypeError( + f"env_vars must be a list, tuple or a dict, but got {type(env_vars)}." + ) + if deployment_spec.env: + update_masks.append("spec.deployment_spec.env") + if deployment_spec.secret_env: + update_masks.append("spec.deployment_spec.secret_env") + if psc_interface_config: + deployment_spec.psc_interface_config = psc_interface_config + update_masks.append("spec.deployment_spec.psc_interface_config") + if min_instances is not None: + deployment_spec.min_instances = min_instances + update_masks.append("spec.deployment_spec.min_instances") + if max_instances is not None: + deployment_spec.max_instances = max_instances + update_masks.append("spec.deployment_spec.max_instances") + if resource_limits: + deployment_spec.resource_limits = resource_limits + update_masks.append("spec.deployment_spec.resource_limits") + if container_concurrency is not None: + deployment_spec.container_concurrency = container_concurrency + update_masks.append("spec.deployment_spec.container_concurrency") + return deployment_spec, update_masks From c6e4d3a72640b357405ed29a5a19e01efbe8d4e7 Mon Sep 17 00:00:00 2001 From: Erick Aleman Date: Fri, 24 Apr 2026 17:40:55 -0400 Subject: [PATCH 2/2] fix(reasoning-engines): preserve utils while adding env vars --- .../test_reasoning_engines.py | 103 ++++ vertexai/reasoning_engines/_utils.py | 527 +++++++++++++++++- 2 files changed, 611 insertions(+), 19 deletions(-) diff --git a/tests/unit/vertex_langchain/test_reasoning_engines.py b/tests/unit/vertex_langchain/test_reasoning_engines.py index 019dc214d9..1355100371 100644 --- a/tests/unit/vertex_langchain/test_reasoning_engines.py +++ b/tests/unit/vertex_langchain/test_reasoning_engines.py @@ -794,6 +794,109 @@ def test_create_reasoning_engine( retry=_TEST_RETRY, ) + def test_create_reasoning_engine_with_env_vars( + self, + create_reasoning_engine_mock, + cloud_storage_create_bucket_mock, + tarfile_open_mock, + cloudpickle_dump_mock, + get_gca_resource_mock, + ): + reasoning_engines.ReasoningEngine.create( + self.test_app, + display_name=_TEST_REASONING_ENGINE_DISPLAY_NAME, + requirements=_TEST_REASONING_ENGINE_REQUIREMENTS, + extra_packages=[_TEST_REASONING_ENGINE_EXTRA_PACKAGE_PATH], + env_vars={ + "TEST_ENV_VAR": "TEST_ENV_VAR_VALUE", + "TEST_SECRET_ENV_VAR": types.SecretRef( + secret="TEST_SECRET_NAME", + version="TEST_SECRET_VERSION", + ), + }, + ) + want_reasoning_engine = types.ReasoningEngine( + display_name=_TEST_REASONING_ENGINE_DISPLAY_NAME, + spec=types.ReasoningEngineSpec( + package_spec=types.ReasoningEngineSpec.PackageSpec( + python_version=f"{sys.version_info.major}.{sys.version_info.minor}", + pickle_object_gcs_uri=_TEST_REASONING_ENGINE_GCS_URI, + dependency_files_gcs_uri=_TEST_REASONING_ENGINE_DEPENDENCY_FILES_GCS_URI, + requirements_gcs_uri=_TEST_REASONING_ENGINE_REQUIREMENTS_GCS_URI, + ), + deployment_spec=types.ReasoningEngineSpec.DeploymentSpec( + env=[ + types.EnvVar( + name="TEST_ENV_VAR", + value="TEST_ENV_VAR_VALUE", + ) + ], + secret_env=[ + types.SecretEnvVar( + name="TEST_SECRET_ENV_VAR", + secret_ref=types.SecretRef( + secret="TEST_SECRET_NAME", + version="TEST_SECRET_VERSION", + ), + ) + ], + ), + ), + ) + want_reasoning_engine.spec.class_methods.append( + _TEST_REASONING_ENGINE_QUERY_SCHEMA + ) + create_reasoning_engine_mock.assert_called_with( + parent=_TEST_PARENT, + reasoning_engine=want_reasoning_engine, + ) + + @mock.patch.dict(os.environ, {"TEST_ENV_VAR_FROM_OS": "os-value"}) + def test_generate_deployment_spec_from_env_var_names(self): + deployment_spec, update_masks = _utils._generate_deployment_spec_or_raise( + env_vars=["TEST_ENV_VAR_FROM_OS"], + ) + + assert _utils.to_dict(deployment_spec) == { + "env": [{"name": "TEST_ENV_VAR_FROM_OS", "value": "os-value"}] + } + assert update_masks == ["spec.deployment_spec.env"] + + def test_generate_deployment_spec_from_secret_ref_dict(self): + deployment_spec, update_masks = _utils._generate_deployment_spec_or_raise( + env_vars={ + "TEST_SECRET_ENV_VAR": { + "secret": "TEST_SECRET_NAME", + "version": "TEST_SECRET_VERSION", + }, + }, + ) + + assert _utils.to_dict(deployment_spec) == { + "secretEnv": [ + { + "name": "TEST_SECRET_ENV_VAR", + "secretRef": { + "secret": "TEST_SECRET_NAME", + "version": "TEST_SECRET_VERSION", + }, + } + ] + } + assert update_masks == ["spec.deployment_spec.secret_env"] + + def test_generate_deployment_spec_rejects_invalid_env_var_value_type(self): + with pytest.raises(TypeError, match="Unknown value type in env_vars"): + _utils._generate_deployment_spec_or_raise(env_vars={"TEST_ENV_VAR": 1}) + + def test_generate_deployment_spec_rejects_missing_env_var_name(self): + with pytest.raises(ValueError, match="Env var not found in os.environ"): + _utils._generate_deployment_spec_or_raise(env_vars=["MISSING_ENV_VAR"]) + + def test_generate_deployment_spec_rejects_string_env_vars(self): + with pytest.raises(TypeError, match="env_vars must be a list, tuple or a dict"): + _utils._generate_deployment_spec_or_raise(env_vars="TEST_ENV_VAR") + @pytest.mark.usefixtures("caplog") def test_create_reasoning_engine_warn_resource_name( self, diff --git a/vertexai/reasoning_engines/_utils.py b/vertexai/reasoning_engines/_utils.py index 8926153d01..fc09eacbf4 100644 --- a/vertexai/reasoning_engines/_utils.py +++ b/vertexai/reasoning_engines/_utils.py @@ -1,4 +1,5 @@ -# Copyright 2026 Google LLC +# -*- coding: utf-8 -*- +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,40 +12,100 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +# +import dataclasses +import inspect +import json import os -from typing import Dict, List, Optional, Sequence, Tuple, Union +import types +import typing +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Mapping, + Optional, + Sequence, + Tuple, + Union, +) + +import proto + +from google.cloud.aiplatform import base +from google.cloud.aiplatform_v1beta1 import types as aip_types +from google.api import httpbody_pb2 +from google.protobuf import struct_pb2 +from google.protobuf import json_format + +try: + # For LangChain templates, they might not import langchain_core and get + # PydanticUserError: `query` is not fully defined; you should define + # `RunnableConfig`, then call `query.model_rebuild()`. + import langchain_core.runnables.config + + RunnableConfig = langchain_core.runnables.config.RunnableConfig +except ImportError: + RunnableConfig = Any + +try: + from llama_index.core.base.response import schema as llama_index_schema + from llama_index.core.base.llms import types as llama_index_types + + LlamaIndexResponse = llama_index_schema.Response + LlamaIndexBaseModel = llama_index_schema.BaseModel + LlamaIndexChatResponse = llama_index_types.ChatResponse +except ImportError: + LlamaIndexResponse = Any + LlamaIndexBaseModel = Any + LlamaIndexChatResponse = Any + +JsonDict = Dict[str, Any] + +_LOGGER = base.Logger(__name__) -from google.cloud.aiplatform import v1 as aip_types def _update_deployment_spec_with_env_vars_dict_or_raise( + *, deployment_spec: aip_types.ReasoningEngineSpec.DeploymentSpec, env_vars: Dict[str, Union[str, aip_types.SecretRef]], -): - """Updates deployment spec with environment variables from a dictionary.""" +) -> None: for key, value in env_vars.items(): - if isinstance(value, str): - deployment_spec.env.append(aip_types.EnvVar(name=key, value=value)) + if isinstance(value, dict): + try: + secret_ref = to_proto(value, aip_types.SecretRef()) + except Exception as e: + raise ValueError(f"Failed to convert to secret ref: {value}") from e + deployment_spec.secret_env.append( + aip_types.SecretEnvVar(name=key, secret_ref=secret_ref) + ) elif isinstance(value, aip_types.SecretRef): deployment_spec.secret_env.append( - aip_types.ReasoningEngineSpec.DeploymentSpec.SecretEnvVar( - name=key, secret_ref=value - ) + aip_types.SecretEnvVar(name=key, secret_ref=value) ) + elif isinstance(value, str): + deployment_spec.env.append(aip_types.EnvVar(name=key, value=value)) else: raise TypeError( - f"env_vars values must be a string or SecretRef, but got {type(value)}." + f"Unknown value type in env_vars for {key}. " + f"Must be a str or SecretRef: {value}" ) + def _update_deployment_spec_with_env_vars_list_or_raise( + *, deployment_spec: aip_types.ReasoningEngineSpec.DeploymentSpec, env_vars: Sequence[str], -): - """Updates deployment spec with environment variables from a list.""" - for key in env_vars: - if key not in os.environ: - raise ValueError(f"Environment variable '{key}' not found in os.environ.") - deployment_spec.env.append(aip_types.EnvVar(name=key, value=os.environ[key])) +) -> None: + for env_var in env_vars: + if env_var not in os.environ: + raise ValueError(f"Env var not found in os.environ: {env_var}.") + deployment_spec.env.append( + aip_types.EnvVar(name=env_var, value=os.environ[env_var]) + ) + def _generate_deployment_spec_or_raise( *, @@ -57,7 +118,6 @@ def _generate_deployment_spec_or_raise( resource_limits: Optional[Dict[str, str]] = None, container_concurrency: Optional[int] = None, ) -> Tuple[aip_types.ReasoningEngineSpec.DeploymentSpec, List[str]]: - """Generates a DeploymentSpec based on the provided parameters.""" deployment_spec = aip_types.ReasoningEngineSpec.DeploymentSpec() update_masks = [] if env_vars: @@ -97,3 +157,432 @@ def _generate_deployment_spec_or_raise( deployment_spec.container_concurrency = container_concurrency update_masks.append("spec.deployment_spec.container_concurrency") return deployment_spec, update_masks + + +def to_proto( + obj: Union[JsonDict, proto.Message], + message: Optional[proto.Message] = None, +) -> proto.Message: + """Parses a JSON-like object into a message. + + If the object is already a message, this will return the object as-is. If + the object is a JSON Dict, this will parse and merge the object into the + message. + + Args: + obj (Union[dict[str, Any], proto.Message]): + Required. The object to convert to a proto message. + message (proto.Message): + Optional. A protocol buffer message to merge the obj into. It + defaults to Struct() if unspecified. + + Returns: + proto.Message: The same message passed as argument. + """ + if message is None: + message = struct_pb2.Struct() + if isinstance(obj, (proto.Message, struct_pb2.Struct)): + return obj + try: + json_format.ParseDict(obj, message._pb) + except AttributeError: + json_format.ParseDict(obj, message) + return message + + +def to_dict(message: proto.Message) -> JsonDict: + """Converts the contents of the protobuf message to JSON format. + + Args: + message (proto.Message): + Required. The proto message to be converted to a JSON dictionary. + + Returns: + dict[str, Any]: A dictionary containing the contents of the proto. + """ + try: + # Best effort attempt to convert the message into a JSON dictionary. + result: JsonDict = json.loads(json_format.MessageToJson(message._pb)) + except AttributeError: + result: JsonDict = json.loads(json_format.MessageToJson(message)) + return result + + +def dataclass_to_dict(obj: dataclasses.dataclass) -> Any: + """Converts a dataclass to a JSON dictionary. + + Args: + obj (dataclasses.dataclass): + Required. The dataclass to be converted to a JSON dictionary. + + Returns: + dict[str, Any]: A dictionary containing the contents of the dataclass. + """ + return json.loads(json.dumps(dataclasses.asdict(obj))) + + +def _llama_index_response_to_dict(obj: LlamaIndexResponse) -> Any: + response = {} + if hasattr(obj, "response"): + response["response"] = obj.response + if hasattr(obj, "source_nodes"): + response["source_nodes"] = [node.model_dump_json() for node in obj.source_nodes] + if hasattr(obj, "metadata"): + response["metadata"] = obj.metadata + + return json.loads(json.dumps(response)) + + +def _llama_index_chat_response_to_dict(obj: LlamaIndexChatResponse) -> Any: + return json.loads(obj.message.model_dump_json()) + + +def _llama_index_base_model_to_dict(obj: LlamaIndexBaseModel) -> Any: + return json.loads(obj.model_dump_json()) + + +def to_json_serializable_llama_index_object( + obj: Union[ + LlamaIndexResponse, + LlamaIndexBaseModel, + LlamaIndexChatResponse, + Sequence[LlamaIndexBaseModel], + ], +) -> Union[str, Dict[str, Any], Sequence[Union[str, Dict[str, Any]]]]: + """Converts a LlamaIndexResponse to a JSON serializable object.""" + if isinstance(obj, LlamaIndexResponse): + return _llama_index_response_to_dict(obj) + if isinstance(obj, LlamaIndexChatResponse): + return _llama_index_chat_response_to_dict(obj) + if isinstance(obj, Sequence): + seq_result = [] + for item in obj: + if isinstance(item, LlamaIndexBaseModel): + seq_result.append(_llama_index_base_model_to_dict(item)) + continue + seq_result.append(str(item)) + return seq_result + if isinstance(obj, LlamaIndexBaseModel): + return _llama_index_base_model_to_dict(obj) + return str(obj) + + +def yield_parsed_json(body: httpbody_pb2.HttpBody) -> Iterable[Any]: + """Converts the contents of the httpbody message to JSON format. + + Args: + body (httpbody_pb2.HttpBody): + Required. The httpbody body to be converted to a JSON. + + Yields: + Any: A JSON object or the original body if it is not JSON or None. + """ + content_type = getattr(body, "content_type", None) + data = getattr(body, "data", None) + + if content_type is None or data is None or "application/json" not in content_type: + yield body + return + + try: + utf8_data = data.decode("utf-8") + except Exception as e: + _LOGGER.warning(f"Failed to decode data: {data}. Exception: {e}") + yield body + return + + if not utf8_data: + yield None + return + + # Handle the case of multiple dictionaries delimited by newlines. + for line in utf8_data.split("\n"): + if line: + try: + line = json.loads(line) + except Exception as e: + _LOGGER.warning(f"failed to parse json: {line}. Exception: {e}") + yield line + + +def generate_schema( + f: Callable[..., Any], + *, + schema_name: Optional[str] = None, + descriptions: Mapping[str, str] = {}, + required: Sequence[str] = [], +) -> JsonDict: + """Generates the OpenAPI Schema for a callable object. + + Only positional and keyword arguments of the function `f` will be supported + in the OpenAPI Schema that is generated. I.e. `*args` and `**kwargs` will + not be present in the OpenAPI schema returned from this function. For those + cases, you can either include it in the docstring for `f`, or modify the + OpenAPI schema returned from this function to include additional arguments. + + Args: + f (Callable): + Required. The function to generate an OpenAPI Schema for. + schema_name (str): + Optional. The name for the OpenAPI schema. If unspecified, the name + of the Callable will be used. + descriptions (Mapping[str, str]): + Optional. A `{name: description}` mapping for annotating input + arguments of the function with user-provided descriptions. It + defaults to an empty dictionary (i.e. there will not be any + description for any of the inputs). + required (Sequence[str]): + Optional. For the user to specify the set of required arguments in + function calls to `f`. If specified, it will be automatically + inferred from `f`. + + Returns: + dict[str, Any]: The OpenAPI Schema for the function `f` in JSON format. + """ + pydantic = _import_pydantic_or_raise() + defaults = dict(inspect.signature(f).parameters) + fields_dict = { + name: ( + # 1. We infer the argument type here: use Any rather than None so + # it will not try to auto-infer the type based on the default value. + (param.annotation if param.annotation != inspect.Parameter.empty else Any), + pydantic.Field( + # 2. We do not support default values for now. + # default=( + # param.default if param.default != inspect.Parameter.empty + # else None + # ), + # 3. We support user-provided descriptions. + description=descriptions.get(name, None), + ), + ) + for name, param in defaults.items() + # We do not support *args or **kwargs + if param.kind + in ( + inspect.Parameter.POSITIONAL_OR_KEYWORD, + inspect.Parameter.KEYWORD_ONLY, + inspect.Parameter.POSITIONAL_ONLY, + ) + } + parameters = pydantic.create_model(f.__name__, **fields_dict).schema() + # Postprocessing + # 4. Suppress unnecessary title generation: + # * https://github.com/pydantic/pydantic/issues/1051 + # * http://cl/586221780 + parameters.pop("title", "") + for name, function_arg in parameters.get("properties", {}).items(): + function_arg.pop("title", "") + annotation = defaults[name].annotation + # 5. Nullable fields: + # * https://github.com/pydantic/pydantic/issues/1270 + # * https://stackoverflow.com/a/58841311 + # * https://github.com/pydantic/pydantic/discussions/4872 + if typing.get_origin(annotation) is Union and type(None) in typing.get_args( + annotation + ): + # for "typing.Optional" arguments, function_arg might be a + # dictionary like + # + # {'anyOf': [{'type': 'integer'}, {'type': 'null'}] + for schema in function_arg.pop("anyOf", []): + schema_type = schema.get("type") + if schema_type and schema_type != "null": + function_arg["type"] = schema_type + break + function_arg["nullable"] = True + # 6. Annotate required fields. + if required: + # We use the user-provided "required" fields if specified. + parameters["required"] = required + else: + # Otherwise we infer it from the function signature. + parameters["required"] = [ + k + for k in defaults + if ( + defaults[k].default == inspect.Parameter.empty + and defaults[k].kind + in ( + inspect.Parameter.POSITIONAL_OR_KEYWORD, + inspect.Parameter.KEYWORD_ONLY, + inspect.Parameter.POSITIONAL_ONLY, + ) + ) + ] + schema = dict(name=f.__name__, description=f.__doc__, parameters=parameters) + if schema_name: + schema["name"] = schema_name + return schema + + +def is_noop_or_proxy_tracer_provider(tracer_provider) -> bool: + """Returns True if the tracer_provider is Proxy or NoOp.""" + opentelemetry = _import_opentelemetry_or_warn() + ProxyTracerProvider = opentelemetry.trace.ProxyTracerProvider + NoOpTracerProvider = opentelemetry.trace.NoOpTracerProvider + return isinstance(tracer_provider, (NoOpTracerProvider, ProxyTracerProvider)) + + +def _import_cloud_storage_or_raise() -> types.ModuleType: + """Tries to import the Cloud Storage module.""" + try: + from google.cloud import storage + except ImportError as e: + raise ImportError( + "Cloud Storage is not installed. Please call " + "'pip install google-cloud-aiplatform[agent_engines]'." + ) from e + return storage + + +def _import_cloudpickle_or_raise() -> types.ModuleType: + """Tries to import the cloudpickle module.""" + try: + import cloudpickle # noqa:F401 + except ImportError as e: + raise ImportError( + "cloudpickle is not installed. Please call " + "'pip install google-cloud-aiplatform[agent_engines]'." + ) from e + return cloudpickle + + +def _import_pydantic_or_raise() -> types.ModuleType: + """Tries to import the pydantic module.""" + try: + import pydantic + + _ = pydantic.Field + except AttributeError: + from pydantic import v1 as pydantic + except ImportError as e: + raise ImportError( + "pydantic is not installed. Please call " + "'pip install google-cloud-aiplatform[agent_engines]'." + ) from e + return pydantic + + +def _import_opentelemetry_or_warn() -> Optional[types.ModuleType]: + """Tries to import the opentelemetry module.""" + try: + import opentelemetry # noqa:F401 + + return opentelemetry + except ImportError: + _LOGGER.warning( + "opentelemetry-sdk is not installed. Please call " + "'pip install google-cloud-aiplatform[agent_engines]'." + ) + return None + + +def _import_opentelemetry_sdk_trace_or_warn() -> Optional[types.ModuleType]: + """Tries to import the opentelemetry.sdk.trace module.""" + try: + import opentelemetry.sdk.trace # noqa:F401 + + return opentelemetry.sdk.trace + except ImportError: + _LOGGER.warning( + "opentelemetry-sdk is not installed. Please call " + "'pip install google-cloud-aiplatform[agent_engines]'." + ) + return None + + +def _import_cloud_trace_v2_or_warn() -> Optional[types.ModuleType]: + """Tries to import the google.cloud.trace_v2 module.""" + try: + import google.cloud.trace_v2 + + return google.cloud.trace_v2 + except ImportError: + _LOGGER.warning( + "google-cloud-trace is not installed. Please call " + "'pip install google-cloud-aiplatform[agent_engines]'." + ) + return None + + +def _import_cloud_trace_exporter_or_warn() -> Optional[types.ModuleType]: + """Tries to import the opentelemetry.exporter.cloud_trace module.""" + try: + import opentelemetry.exporter.cloud_trace # noqa:F401 + + return opentelemetry.exporter.cloud_trace + except ImportError: + _LOGGER.warning( + "opentelemetry-exporter-gcp-trace is not installed. Please " + "call 'pip install google-cloud-aiplatform[langchain]'." + ) + return None + + +def _import_openinference_langchain_or_warn() -> Optional[types.ModuleType]: + """Tries to import the openinference.instrumentation.langchain module.""" + try: + import openinference.instrumentation.langchain # noqa:F401 + + return openinference.instrumentation.langchain + except ImportError: + _LOGGER.warning( + "openinference-instrumentation-langchain is not installed. Please " + "call 'pip install google-cloud-aiplatform[langchain]'." + ) + return None + + +def _import_openinference_autogen_or_warn() -> Optional[types.ModuleType]: + """Tries to import the openinference.instrumentation.autogen module.""" + try: + import openinference.instrumentation.autogen # noqa:F401 + + return openinference.instrumentation.autogen + except ImportError: + _LOGGER.warning( + "openinference-instrumentation-autogen is not installed. Please " + "call 'pip install openinference-instrumentation-autogen'." + ) + return None + + +def _import_openinference_llama_index_or_warn() -> Optional[types.ModuleType]: + """Tries to import the openinference.instrumentation.llama_index module.""" + try: + import openinference.instrumentation.llama_index # noqa:F401 + + return openinference.instrumentation.llama_index + except ImportError: + _LOGGER.warning( + "openinference-instrumentation-llama_index is not installed. Please " + "call 'pip install google-cloud-aiplatform[llama_index]'." + ) + return None + + +def _import_autogen_tools_or_warn() -> Optional[types.ModuleType]: + """Tries to import the autogen.tools module.""" + try: + from autogen import tools + + return tools + except ImportError: + _LOGGER.warning( + "autogen.tools is not installed. Please call: `pip install ag2[tools]`" + ) + return None + + +def _import_nest_asyncio_or_warn() -> Optional[types.ModuleType]: + """Tries to import the nest_asyncio module.""" + try: + import nest_asyncio + + return nest_asyncio + except ImportError: + _LOGGER.warning( + "nest_asyncio is not installed. Please call: `pip install nest-asyncio`" + ) + return None