Fix fallback to environment variables in loading.py (#1921)
* 🐛 (loading.py): Fix issue where environment variables were not being properly fallbacked to if not found in custom component keys 📝 (loading.py): Update documentation to inform users about the new feature of falling back to environment variables if variable value is not found in custom component keys * add fallback_to_env_vars to build vertex
This commit is contained in:
parent
c115f5b67d
commit
ec910f3bd7
6 changed files with 46 additions and 11 deletions
|
|
@ -242,6 +242,7 @@ class Graph:
|
|||
outputs: list[str],
|
||||
stream: bool,
|
||||
session_id: str,
|
||||
fallback_to_env_vars: bool,
|
||||
) -> List[Optional["ResultData"]]:
|
||||
"""
|
||||
Runs the graph with the given inputs.
|
||||
|
|
@ -289,7 +290,7 @@ class Graph:
|
|||
start_component_id = next(
|
||||
(vertex_id for vertex_id in self._is_input_vertices if "chat" in vertex_id.lower()), None
|
||||
)
|
||||
await self.process(start_component_id=start_component_id)
|
||||
await self.process(start_component_id=start_component_id, fallback_to_env_vars=fallback_to_env_vars)
|
||||
self.increment_run_count()
|
||||
except Exception as exc:
|
||||
logger.exception(exc)
|
||||
|
|
@ -315,6 +316,7 @@ class Graph:
|
|||
outputs: Optional[list[str]] = None,
|
||||
session_id: Optional[str] = None,
|
||||
stream: bool = False,
|
||||
fallback_to_env_vars: bool = False,
|
||||
) -> List[RunOutputs]:
|
||||
"""
|
||||
Run the graph with the given inputs and return the outputs.
|
||||
|
|
@ -340,6 +342,7 @@ class Graph:
|
|||
outputs=outputs,
|
||||
session_id=session_id,
|
||||
stream=stream,
|
||||
fallback_to_env_vars=fallback_to_env_vars,
|
||||
)
|
||||
|
||||
try:
|
||||
|
|
@ -362,6 +365,7 @@ class Graph:
|
|||
outputs: Optional[list[str]] = None,
|
||||
session_id: Optional[str] = None,
|
||||
stream: bool = False,
|
||||
fallback_to_env_vars: bool = False,
|
||||
) -> List[RunOutputs]:
|
||||
"""
|
||||
Runs the graph with the given inputs.
|
||||
|
|
@ -403,6 +407,7 @@ class Graph:
|
|||
outputs=outputs or [],
|
||||
stream=stream,
|
||||
session_id=session_id or "",
|
||||
fallback_to_env_vars=fallback_to_env_vars,
|
||||
)
|
||||
run_output_object = RunOutputs(inputs=run_inputs, outputs=run_outputs)
|
||||
logger.debug(f"Run outputs: {run_output_object}")
|
||||
|
|
@ -710,6 +715,7 @@ class Graph:
|
|||
lock: asyncio.Lock,
|
||||
set_cache_coro: Callable[["Graph", asyncio.Lock], Coroutine],
|
||||
vertex_id: str,
|
||||
fallback_to_env_vars: bool,
|
||||
inputs_dict: Optional[Dict[str, str]] = None,
|
||||
user_id: Optional[str] = None,
|
||||
):
|
||||
|
|
@ -733,7 +739,7 @@ class Graph:
|
|||
vertex = self.get_vertex(vertex_id)
|
||||
try:
|
||||
if not vertex.frozen or not vertex._built:
|
||||
await vertex.build(user_id=user_id, inputs=inputs_dict)
|
||||
await vertex.build(user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars)
|
||||
|
||||
if vertex.result is not None:
|
||||
params = vertex._built_object_repr()
|
||||
|
|
@ -796,7 +802,7 @@ class Graph:
|
|||
vertices.append(vertex)
|
||||
return vertices
|
||||
|
||||
async def process(self, start_component_id: Optional[str] = None) -> "Graph":
|
||||
async def process(self, fallback_to_env_vars: bool, start_component_id: Optional[str] = None) -> "Graph":
|
||||
"""Processes the graph with vertices in each layer run in parallel."""
|
||||
|
||||
first_layer = self.sort_vertices(start_component_id=start_component_id)
|
||||
|
|
@ -821,6 +827,7 @@ class Graph:
|
|||
vertex_id=vertex_id,
|
||||
user_id=self.user_id,
|
||||
inputs_dict={},
|
||||
fallback_to_env_vars=fallback_to_env_vars,
|
||||
),
|
||||
name=f"{vertex.display_name} Run {vertex_task_run_count.get(vertex_id, 0)}",
|
||||
)
|
||||
|
|
|
|||
|
|
@ -390,13 +390,17 @@ class Vertex:
|
|||
self.params = self._raw_params.copy()
|
||||
self.updated_raw_params = True
|
||||
|
||||
async def _build(self, user_id=None):
|
||||
async def _build(
|
||||
self,
|
||||
fallback_to_env_vars,
|
||||
user_id=None,
|
||||
):
|
||||
"""
|
||||
Initiate the build process.
|
||||
"""
|
||||
logger.debug(f"Building {self.display_name}")
|
||||
await self._build_each_vertex_in_params_dict(user_id)
|
||||
await self._get_and_instantiate_class(user_id)
|
||||
await self._get_and_instantiate_class(user_id, fallback_to_env_vars)
|
||||
self._validate_built_object()
|
||||
|
||||
self._built = True
|
||||
|
|
@ -606,7 +610,7 @@ class Vertex:
|
|||
if isinstance(self.params[key], list):
|
||||
self.params[key].extend(result)
|
||||
|
||||
async def _get_and_instantiate_class(self, user_id=None):
|
||||
async def _get_and_instantiate_class(self, user_id=None, fallback_to_env_vars=False):
|
||||
"""
|
||||
Gets the class from a dictionary and instantiates it with the params.
|
||||
"""
|
||||
|
|
@ -615,6 +619,7 @@ class Vertex:
|
|||
try:
|
||||
result = await loading.instantiate_class(
|
||||
user_id=user_id,
|
||||
fallback_to_env_vars=fallback_to_env_vars,
|
||||
vertex=self,
|
||||
)
|
||||
self._update_built_object_and_artifacts(result)
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
import inspect
|
||||
import json
|
||||
import os
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict, Sequence, Type
|
||||
|
||||
|
||||
import orjson
|
||||
from langchain.agents import agent as agent_module
|
||||
from langchain.agents.agent import AgentExecutor
|
||||
|
|
@ -35,6 +35,7 @@ if TYPE_CHECKING:
|
|||
|
||||
async def instantiate_class(
|
||||
vertex: "Vertex",
|
||||
fallback_to_env_vars,
|
||||
user_id=None,
|
||||
) -> Any:
|
||||
"""Instantiate class from module type and key, and params"""
|
||||
|
|
@ -143,7 +144,9 @@ async def instantiate_based_on_type(
|
|||
return class_object(**params)
|
||||
|
||||
|
||||
def update_params_with_load_from_db_fields(custom_component: "CustomComponent", params, load_from_db_fields):
|
||||
def update_params_with_load_from_db_fields(
|
||||
custom_component: "CustomComponent", params, load_from_db_fields, fallback_to_env_vars=False
|
||||
):
|
||||
# For each field in load_from_db_fields, we will check if it's in the params
|
||||
# and if it is, we will get the value from the custom_component.keys(name)
|
||||
# and update the params with the value
|
||||
|
|
@ -151,14 +154,24 @@ def update_params_with_load_from_db_fields(custom_component: "CustomComponent",
|
|||
if field in params:
|
||||
try:
|
||||
key = custom_component.variables(params[field])
|
||||
params[field] = key if key else params[field]
|
||||
if fallback_to_env_vars and key is None:
|
||||
var = os.getenv(params[field])
|
||||
if var is None:
|
||||
raise ValueError(f"Environment variable {params[field]} is not set.")
|
||||
key = var
|
||||
params[field] = key
|
||||
logger.warning(
|
||||
f"It was not possible to get value for field {field}. Setting value to None."
|
||||
" If you want to fallback to an environment variable with the same name, "
|
||||
"set LANGFLOW_FALLBACK_TO_ENV_VAR=True in your environment."
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to get value for {field} from custom component. Error: {exc}")
|
||||
pass
|
||||
return params
|
||||
|
||||
|
||||
async def instantiate_custom_component(params, user_id, vertex):
|
||||
async def instantiate_custom_component(params, user_id, vertex, fallback_to_env_vars):
|
||||
params_copy = params.copy()
|
||||
class_object: Type["CustomComponent"] = eval_custom_component_code(params_copy.pop("code"))
|
||||
custom_component: "CustomComponent" = class_object(
|
||||
|
|
@ -167,7 +180,9 @@ async def instantiate_custom_component(params, user_id, vertex):
|
|||
vertex=vertex,
|
||||
selected_output_type=vertex.selected_output_type,
|
||||
)
|
||||
params_copy = update_params_with_load_from_db_fields(custom_component, params_copy, vertex.load_from_db_fields)
|
||||
params_copy = update_params_with_load_from_db_fields(
|
||||
custom_component, params_copy, vertex.load_from_db_fields, fallback_to_env_vars
|
||||
)
|
||||
|
||||
if "retriever" in params_copy and hasattr(params_copy["retriever"], "as_retriever"):
|
||||
params_copy["retriever"] = params_copy["retriever"].as_retriever()
|
||||
|
|
|
|||
|
|
@ -82,6 +82,7 @@ def run_flow_from_json(
|
|||
env_file: Optional[str] = None,
|
||||
cache: Optional[str] = None,
|
||||
disable_logs: Optional[bool] = True,
|
||||
fallback_to_env_vars: Optional[bool] = False,
|
||||
) -> List[RunOutputs]:
|
||||
"""
|
||||
Run a flow from a JSON file or dictionary.
|
||||
|
|
@ -127,5 +128,6 @@ def run_flow_from_json(
|
|||
input_type=input_type,
|
||||
output_type=output_type,
|
||||
output_component=output_component,
|
||||
fallback_to_env_vars=fallback_to_env_vars,
|
||||
)
|
||||
return result
|
||||
|
|
|
|||
|
|
@ -175,6 +175,7 @@ def run_graph(
|
|||
input_value: str,
|
||||
input_type: str,
|
||||
output_type: str,
|
||||
fallback_to_env_vars: bool = False,
|
||||
output_component: Optional[str] = None,
|
||||
) -> List[RunOutputs]:
|
||||
"""
|
||||
|
|
@ -218,6 +219,7 @@ def run_graph(
|
|||
outputs or [],
|
||||
stream=False,
|
||||
session_id="",
|
||||
fallback_to_env_vars=fallback_to_env_vars,
|
||||
)
|
||||
return run_outputs
|
||||
|
||||
|
|
|
|||
|
|
@ -111,6 +111,10 @@ class Settings(BaseSettings):
|
|||
|
||||
CELERY_ENABLED: bool = False
|
||||
|
||||
fallback_to_env_var: bool = True
|
||||
"""If set to True, Global Variables set in the UI will fallback to a environment variable
|
||||
with the same name in case Langflow fails to retrieve the variable value."""
|
||||
|
||||
store_environment_variables: bool = True
|
||||
"""Whether to store environment variables as Global Variables in the database."""
|
||||
variables_to_get_from_environment: list[str] = VARIABLES_TO_GET_FROM_ENVIRONMENT
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue