From 915de79cf24e3dd946c0b5014e98a93228674353 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 27 Feb 2024 20:39:07 -0300 Subject: [PATCH 1/6] Refactor logger configuration logic --- src/backend/langflow/utils/logger.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/backend/langflow/utils/logger.py b/src/backend/langflow/utils/logger.py index 060ad9731..6755e53b1 100644 --- a/src/backend/langflow/utils/logger.py +++ b/src/backend/langflow/utils/logger.py @@ -25,7 +25,10 @@ def patching(record): def configure(log_level: Optional[str] = None, log_file: Optional[Path] = None): - if os.getenv("LANGFLOW_LOG_LEVEL") in VALID_LOG_LEVELS and log_level is None: + if ( + os.getenv("LANGFLOW_LOG_LEVEL", "").upper() in VALID_LOG_LEVELS + and log_level is None + ): log_level = os.getenv("LANGFLOW_LOG_LEVEL") if log_level is None: log_level = "INFO" From b2549d92fc6232ec26495322c381d26cea7fe1c9 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 27 Feb 2024 20:39:39 -0300 Subject: [PATCH 2/6] Refactor custom component building functions --- .../langflow/interface/custom/utils.py | 67 ++++++++++++++----- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/src/backend/langflow/interface/custom/utils.py b/src/backend/langflow/interface/custom/utils.py index d2be47333..381193116 100644 --- a/src/backend/langflow/interface/custom/utils.py +++ b/src/backend/langflow/interface/custom/utils.py @@ -27,14 +27,18 @@ from langflow.utils import validate from langflow.utils.util import get_base_classes -def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: List[str]): +def add_output_types( + frontend_node: CustomComponentFrontendNode, return_types: List[str] +): """Add output types to the frontend node""" for return_type in return_types: if return_type is None: raise HTTPException( status_code=400, detail={ - "error": ("Invalid return type. Please check your code and try again."), + "error": ( + "Invalid return type. Please check your code and try again." + ), "traceback": traceback.format_exc(), }, ) @@ -63,14 +67,18 @@ def reorder_fields(frontend_node: CustomComponentFrontendNode, field_order: List frontend_node.template.fields = reordered_fields -def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: List[str]): +def add_base_classes( + frontend_node: CustomComponentFrontendNode, return_types: List[str] +): """Add base classes to the frontend node""" for return_type_instance in return_types: if return_type_instance is None: raise HTTPException( status_code=400, detail={ - "error": ("Invalid return type. Please check your code and try again."), + "error": ( + "Invalid return type. Please check your code and try again." + ), "traceback": traceback.format_exc(), }, ) @@ -145,10 +153,14 @@ def add_new_custom_field( # If options is a list, then it's a dropdown # If options is None, then it's a list of strings is_list = isinstance(field_config.get("options"), list) - field_config["is_list"] = is_list or field_config.get("is_list", False) or field_contains_list + field_config["is_list"] = ( + is_list or field_config.get("is_list", False) or field_contains_list + ) if "name" in field_config: - warnings.warn("The 'name' key in field_config is used to build the object and can't be changed.") + warnings.warn( + "The 'name' key in field_config is used to build the object and can't be changed." + ) required = field_config.pop("required", field_required) placeholder = field_config.pop("placeholder", "") @@ -179,7 +191,9 @@ def add_extra_fields(frontend_node, field_config, function_args): if "name" not in extra_field or extra_field["name"] == "self": continue - field_name, field_type, field_value, field_required = get_field_properties(extra_field) + field_name, field_type, field_value, field_required = get_field_properties( + extra_field + ) config = field_config.get(field_name, {}) frontend_node = add_new_custom_field( frontend_node, @@ -217,7 +231,9 @@ def run_build_config( raise HTTPException( status_code=400, detail={ - "error": ("Invalid type convertion. Please check your code and try again."), + "error": ( + "Invalid type convertion. Please check your code and try again." + ), "traceback": traceback.format_exc(), }, ) from exc @@ -245,7 +261,9 @@ def run_build_config( raise HTTPException( status_code=400, detail={ - "error": ("Invalid type convertion. Please check your code and try again."), + "error": ( + "Invalid type convertion. Please check your code and try again." + ), "traceback": traceback.format_exc(), }, ) from exc @@ -300,16 +318,24 @@ def build_custom_component_template( frontend_node = build_frontend_node(custom_component.template_config) logger.debug("Updated attributes") - field_config, custom_instance = run_build_config(custom_component, user_id=user_id, update_field=update_field) + field_config, custom_instance = run_build_config( + custom_component, user_id=user_id, update_field=update_field + ) logger.debug("Built field config") entrypoint_args = custom_component.get_function_entrypoint_args add_extra_fields(frontend_node, field_config, entrypoint_args) - frontend_node = add_code_field(frontend_node, custom_component.code, field_config.get("code", {})) + frontend_node = add_code_field( + frontend_node, custom_component.code, field_config.get("code", {}) + ) - add_base_classes(frontend_node, custom_component.get_function_entrypoint_return_type) - add_output_types(frontend_node, custom_component.get_function_entrypoint_return_type) + add_base_classes( + frontend_node, custom_component.get_function_entrypoint_return_type + ) + add_output_types( + frontend_node, custom_component.get_function_entrypoint_return_type + ) logger.debug("Added base classes") reorder_fields(frontend_node, custom_instance._get_field_order()) @@ -321,7 +347,9 @@ def build_custom_component_template( raise HTTPException( status_code=400, detail={ - "error": ("Invalid type convertion. Please check your code and try again."), + "error": ( + "Invalid type convertion. Please check your code and try again." + ), "traceback": traceback.format_exc(), }, ) from exc @@ -345,7 +373,9 @@ def build_custom_components(settings_service): if not settings_service.settings.COMPONENTS_PATH: return {} - logger.info(f"Building custom components from {settings_service.settings.COMPONENTS_PATH}") + logger.info( + f"Building custom components from {settings_service.settings.COMPONENTS_PATH}" + ) custom_components_from_file = {} processed_paths = set() for path in settings_service.settings.COMPONENTS_PATH: @@ -356,7 +386,9 @@ def build_custom_components(settings_service): custom_component_dict = build_custom_component_list_from_path(path_str) if custom_component_dict: category = next(iter(custom_component_dict)) - logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") + logger.info( + f"Loading {len(custom_component_dict[category])} component(s) from category {category}" + ) custom_components_from_file = merge_nested_dicts_with_renaming( custom_components_from_file, custom_component_dict ) @@ -400,8 +432,9 @@ def sanitize_field_config(field_config: Dict): def build_component(component): """Build a single component.""" component_name = determine_component_name(component) + logger.debug(f"Building component: {component_name}") component_template = create_component_template(component) - logger.debug(f"Building component: {component_name, component.get('output_types')}") + return component_name, component_template From 8b7ee7225206aeda99c54f0bec6527c8d4574f6c Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 27 Feb 2024 20:39:59 -0300 Subject: [PATCH 3/6] Refactor ChatVertex class to update artifacts and params --- src/backend/langflow/graph/vertex/types.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/backend/langflow/graph/vertex/types.py b/src/backend/langflow/graph/vertex/types.py index d60b7412c..100390d11 100644 --- a/src/backend/langflow/graph/vertex/types.py +++ b/src/backend/langflow/graph/vertex/types.py @@ -424,15 +424,16 @@ class ChatVertex(StatelessVertex): message = message.text if hasattr(message, "text") else message yield message complete_message += message - self._built_object = Record(text=complete_message, data=self.artifacts) - self._built_result = complete_message - # Update artifacts with the message - # and remove the stream_url self.artifacts = ChatOutputResponse( message=complete_message, sender=self.params.get("sender", ""), sender_name=self.params.get("sender_name", ""), ).model_dump() + self.params[INPUT_FIELD_NAME] = complete_message + self._built_object = Record(text=complete_message, data=self.artifacts) + self._built_result = complete_message + # Update artifacts with the message + # and remove the stream_url logger.debug(f"Streamed message: {complete_message}") await log_message( @@ -443,6 +444,9 @@ class ChatVertex(StatelessVertex): artifacts=self.artifacts, ) + self._validate_built_object() + self._built = True + class RoutingVertex(StatelessVertex): def __init__(self, data: Dict, graph): From 12fe2ffbe6c8e3fb33b7e47ed1fbf9eeb662ea1b Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 27 Feb 2024 20:40:11 -0300 Subject: [PATCH 4/6] Update langflow.graph.schema import --- src/backend/langflow/graph/graph/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 0ffc9825b..3f5e376a5 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -8,7 +8,7 @@ from loguru import logger from langflow.graph.edge.base import ContractEdge from langflow.graph.graph.constants import lazy_load_vertex_dict from langflow.graph.graph.utils import process_flow -from langflow.graph.schema import InterfaceComponentTypes +from langflow.graph.schema import INPUT_FIELD_NAME, InterfaceComponentTypes from langflow.graph.vertex.base import Vertex from langflow.graph.vertex.types import ( ChatVertex, From 34d48992d576f685d9a5c5c5a9cb5239b529c54c Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 27 Feb 2024 20:40:24 -0300 Subject: [PATCH 5/6] Refactor chat.py to support vertex streaming --- src/backend/langflow/api/v1/chat.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 7f85bb211..e1b52aeb5 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -1,6 +1,6 @@ import time -from typing import Optional import uuid +from typing import TYPE_CHECKING, Optional from fastapi import ( APIRouter, @@ -35,6 +35,9 @@ from langflow.services.chat.service import ChatService from langflow.services.deps import get_chat_service, get_session from langflow.services.monitor.utils import log_vertex_build +if TYPE_CHECKING: + from langflow.graph.vertex.types import ChatVertex + router = APIRouter(tags=["Chat"]) @@ -237,27 +240,40 @@ async def build_vertex_stream( else: graph = cache.get("result") - vertex = graph.get_vertex(vertex_id) + vertex: "ChatVertex" = graph.get_vertex(vertex_id) + if not hasattr(vertex, "stream"): + raise ValueError(f"Vertex {vertex_id} does not support streaming") if not vertex.pinned or not vertex._built: + logger.debug(f"Streaming vertex {vertex_id}") stream_data = StreamData( event="message", - data={"message": "Building vertex"}, + data={"message": f"Streaming vertex {vertex_id}"}, ) yield str(stream_data) - + number_of_chunks = 0 async for chunk in vertex.stream(): stream_data = StreamData( event="message", data={"chunk": chunk}, ) + number_of_chunks += 1 yield str(stream_data) + logger.debug(f"Number of chunks: {number_of_chunks}") + elif vertex.result is not None: + stream_data = StreamData( + event="message", + data={"chunk": vertex._built_result}, + ) + yield str(stream_data) else: raise ValueError(f"No result found for vertex {vertex_id}") except Exception as exc: + logger.error(f"Error building vertex: {exc}") yield str(StreamData(event="error", data={"error": str(exc)})) - - yield str(StreamData(event="close", data={"message": "Stream closed"})) + finally: + logger.debug("Closing stream") + yield str(StreamData(event="close", data={"message": "Stream closed"})) return StreamingResponse(stream_vertex(), media_type="text/event-stream") except Exception as exc: From b42caa27ad17f4a56e4246fbfd92883d660653c0 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 27 Feb 2024 20:40:49 -0300 Subject: [PATCH 6/6] Add LANGFLOW_LOG_LEVEL debug to launch.json --- .vscode/launch.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.vscode/launch.json b/.vscode/launch.json index a8229b155..3332b67e9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -17,6 +17,9 @@ ], "jinja": true, "justMyCode": true, + "env": { + "LANGFLOW_LOG_LEVEL": "debug" + }, "envFile": "${workspaceFolder}/.env" }, {