diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 77cf84615..72bfa774e 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -242,6 +242,7 @@ class Graph: outputs: list[str], stream: bool, session_id: str, + fallback_to_env_vars: bool, ) -> List[Optional["ResultData"]]: """ Runs the graph with the given inputs. @@ -289,7 +290,7 @@ class Graph: start_component_id = next( (vertex_id for vertex_id in self._is_input_vertices if "chat" in vertex_id.lower()), None ) - await self.process(start_component_id=start_component_id) + await self.process(start_component_id=start_component_id, fallback_to_env_vars=fallback_to_env_vars) self.increment_run_count() except Exception as exc: logger.exception(exc) @@ -315,6 +316,7 @@ class Graph: outputs: Optional[list[str]] = None, session_id: Optional[str] = None, stream: bool = False, + fallback_to_env_vars: bool = False, ) -> List[RunOutputs]: """ Run the graph with the given inputs and return the outputs. @@ -340,6 +342,7 @@ class Graph: outputs=outputs, session_id=session_id, stream=stream, + fallback_to_env_vars=fallback_to_env_vars, ) try: @@ -362,6 +365,7 @@ class Graph: outputs: Optional[list[str]] = None, session_id: Optional[str] = None, stream: bool = False, + fallback_to_env_vars: bool = False, ) -> List[RunOutputs]: """ Runs the graph with the given inputs. @@ -403,6 +407,7 @@ class Graph: outputs=outputs or [], stream=stream, session_id=session_id or "", + fallback_to_env_vars=fallback_to_env_vars, ) run_output_object = RunOutputs(inputs=run_inputs, outputs=run_outputs) logger.debug(f"Run outputs: {run_output_object}") @@ -710,6 +715,7 @@ class Graph: lock: asyncio.Lock, set_cache_coro: Callable[["Graph", asyncio.Lock], Coroutine], vertex_id: str, + fallback_to_env_vars: bool, inputs_dict: Optional[Dict[str, str]] = None, user_id: Optional[str] = None, ): @@ -733,7 +739,7 @@ class Graph: vertex = self.get_vertex(vertex_id) try: if not vertex.frozen or not vertex._built: - await vertex.build(user_id=user_id, inputs=inputs_dict) + await vertex.build(user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars) if vertex.result is not None: params = vertex._built_object_repr() @@ -796,7 +802,7 @@ class Graph: vertices.append(vertex) return vertices - async def process(self, start_component_id: Optional[str] = None) -> "Graph": + async def process(self, fallback_to_env_vars: bool, start_component_id: Optional[str] = None) -> "Graph": """Processes the graph with vertices in each layer run in parallel.""" first_layer = self.sort_vertices(start_component_id=start_component_id) @@ -821,6 +827,7 @@ class Graph: vertex_id=vertex_id, user_id=self.user_id, inputs_dict={}, + fallback_to_env_vars=fallback_to_env_vars, ), name=f"{vertex.display_name} Run {vertex_task_run_count.get(vertex_id, 0)}", ) diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index e250e9419..1256b0711 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -390,13 +390,17 @@ class Vertex: self.params = self._raw_params.copy() self.updated_raw_params = True - async def _build(self, user_id=None): + async def _build( + self, + fallback_to_env_vars, + user_id=None, + ): """ Initiate the build process. """ logger.debug(f"Building {self.display_name}") await self._build_each_vertex_in_params_dict(user_id) - await self._get_and_instantiate_class(user_id) + await self._get_and_instantiate_class(user_id, fallback_to_env_vars) self._validate_built_object() self._built = True @@ -606,7 +610,7 @@ class Vertex: if isinstance(self.params[key], list): self.params[key].extend(result) - async def _get_and_instantiate_class(self, user_id=None): + async def _get_and_instantiate_class(self, user_id=None, fallback_to_env_vars=False): """ Gets the class from a dictionary and instantiates it with the params. """ @@ -615,6 +619,7 @@ class Vertex: try: result = await loading.instantiate_class( user_id=user_id, + fallback_to_env_vars=fallback_to_env_vars, vertex=self, ) self._update_built_object_and_artifacts(result) diff --git a/src/backend/base/langflow/interface/initialize/loading.py b/src/backend/base/langflow/interface/initialize/loading.py index c1014ef90..2a02dabec 100644 --- a/src/backend/base/langflow/interface/initialize/loading.py +++ b/src/backend/base/langflow/interface/initialize/loading.py @@ -1,8 +1,8 @@ import inspect import json +import os from typing import TYPE_CHECKING, Any, Callable, Dict, Sequence, Type - import orjson from langchain.agents import agent as agent_module from langchain.agents.agent import AgentExecutor @@ -35,6 +35,7 @@ if TYPE_CHECKING: async def instantiate_class( vertex: "Vertex", + fallback_to_env_vars, user_id=None, ) -> Any: """Instantiate class from module type and key, and params""" @@ -143,7 +144,9 @@ async def instantiate_based_on_type( return class_object(**params) -def update_params_with_load_from_db_fields(custom_component: "CustomComponent", params, load_from_db_fields): +def update_params_with_load_from_db_fields( + custom_component: "CustomComponent", params, load_from_db_fields, fallback_to_env_vars=False +): # For each field in load_from_db_fields, we will check if it's in the params # and if it is, we will get the value from the custom_component.keys(name) # and update the params with the value @@ -151,14 +154,24 @@ def update_params_with_load_from_db_fields(custom_component: "CustomComponent", if field in params: try: key = custom_component.variables(params[field]) - params[field] = key if key else params[field] + if fallback_to_env_vars and key is None: + var = os.getenv(params[field]) + if var is None: + raise ValueError(f"Environment variable {params[field]} is not set.") + key = var + params[field] = key + logger.warning( + f"It was not possible to get value for field {field}. Setting value to None." + " If you want to fallback to an environment variable with the same name, " + "set LANGFLOW_FALLBACK_TO_ENV_VAR=True in your environment." + ) except Exception as exc: logger.error(f"Failed to get value for {field} from custom component. Error: {exc}") pass return params -async def instantiate_custom_component(params, user_id, vertex): +async def instantiate_custom_component(params, user_id, vertex, fallback_to_env_vars): params_copy = params.copy() class_object: Type["CustomComponent"] = eval_custom_component_code(params_copy.pop("code")) custom_component: "CustomComponent" = class_object( @@ -167,7 +180,9 @@ async def instantiate_custom_component(params, user_id, vertex): vertex=vertex, selected_output_type=vertex.selected_output_type, ) - params_copy = update_params_with_load_from_db_fields(custom_component, params_copy, vertex.load_from_db_fields) + params_copy = update_params_with_load_from_db_fields( + custom_component, params_copy, vertex.load_from_db_fields, fallback_to_env_vars + ) if "retriever" in params_copy and hasattr(params_copy["retriever"], "as_retriever"): params_copy["retriever"] = params_copy["retriever"].as_retriever() diff --git a/src/backend/base/langflow/processing/load.py b/src/backend/base/langflow/processing/load.py index cf354c3b3..20ec6572d 100644 --- a/src/backend/base/langflow/processing/load.py +++ b/src/backend/base/langflow/processing/load.py @@ -82,6 +82,7 @@ def run_flow_from_json( env_file: Optional[str] = None, cache: Optional[str] = None, disable_logs: Optional[bool] = True, + fallback_to_env_vars: Optional[bool] = False, ) -> List[RunOutputs]: """ Run a flow from a JSON file or dictionary. @@ -127,5 +128,6 @@ def run_flow_from_json( input_type=input_type, output_type=output_type, output_component=output_component, + fallback_to_env_vars=fallback_to_env_vars, ) return result diff --git a/src/backend/base/langflow/processing/process.py b/src/backend/base/langflow/processing/process.py index 2d8356159..d46274b4c 100644 --- a/src/backend/base/langflow/processing/process.py +++ b/src/backend/base/langflow/processing/process.py @@ -175,6 +175,7 @@ def run_graph( input_value: str, input_type: str, output_type: str, + fallback_to_env_vars: bool = False, output_component: Optional[str] = None, ) -> List[RunOutputs]: """ @@ -218,6 +219,7 @@ def run_graph( outputs or [], stream=False, session_id="", + fallback_to_env_vars=fallback_to_env_vars, ) return run_outputs diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 19bfb3e12..97abf2d2b 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -111,6 +111,10 @@ class Settings(BaseSettings): CELERY_ENABLED: bool = False + fallback_to_env_var: bool = True + """If set to True, Global Variables set in the UI will fallback to a environment variable + with the same name in case Langflow fails to retrieve the variable value.""" + store_environment_variables: bool = True """Whether to store environment variables as Global Variables in the database.""" variables_to_get_from_environment: list[str] = VARIABLES_TO_GET_FROM_ENVIRONMENT