From b455284e974dfaf7735d05b6e2087dc79816d740 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 1 Mar 2024 19:00:16 -0300 Subject: [PATCH] Refactor vertex base class and add error handling for streaming components --- src/backend/langflow/graph/vertex/base.py | 33 ++++++++++++++++++----- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index e16bc683f..4e711329f 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -2,13 +2,24 @@ import ast import inspect import types from enum import Enum -from typing import (TYPE_CHECKING, Any, Callable, Coroutine, Dict, List, - Optional) +from typing import ( + TYPE_CHECKING, + Any, + AsyncIterator, + Callable, + Coroutine, + Dict, + Iterator, + List, + Optional, +) -from loguru import logger - -from langflow.graph.schema import (INPUT_COMPONENTS, OUTPUT_COMPONENTS, - InterfaceComponentTypes, ResultData) +from langflow.graph.schema import ( + INPUT_COMPONENTS, + OUTPUT_COMPONENTS, + InterfaceComponentTypes, + ResultData, +) from langflow.graph.utils import UnbuiltObject, UnbuiltResult from langflow.graph.vertex.utils import generate_result from langflow.interface.initialize import loading @@ -16,6 +27,7 @@ from langflow.interface.listing import lazy_load_dict from langflow.services.deps import get_storage_service from langflow.utils.constants import DIRECT_TYPES from langflow.utils.util import sync_to_async +from loguru import logger if TYPE_CHECKING: from langflow.graph.edge.base import ContractEdge @@ -519,7 +531,6 @@ class Vertex: f"Error building node {self.display_name}: {str(e)}" ) from e - def _handle_func(self, key, result): """ Handles 'func' key by checking if the result is a function and setting it as coroutine. @@ -588,6 +599,14 @@ class Vertex: message += " Make sure your build method returns a component." logger.warning(message) + elif isinstance(self._built_object, (Iterator, AsyncIterator)): + if self.display_name in ["Text Output"]: + raise ValueError( + f"You are trying to stream to a {self.display_name}. Try using a Chat Output instead." + ) + raise ValueError( + f"{self.display_name}: You are trying to stream to a non-streamable component." + ) def _reset(self, params_update: Optional[Dict[str, Any]] = None): self._built = False