Merge branch 'zustand/io/migration' of personal:logspace-ai/langflow into zustand/io/migration

This commit is contained in:
anovazzi1 2024-02-27 20:50:24 -03:00
commit 0b1336988f
6 changed files with 88 additions and 29 deletions

3
.vscode/launch.json vendored
View file

@ -17,6 +17,9 @@
],
"jinja": true,
"justMyCode": true,
"env": {
"LANGFLOW_LOG_LEVEL": "debug"
},
"envFile": "${workspaceFolder}/.env"
},
{

View file

@ -1,6 +1,6 @@
import time
from typing import Optional
import uuid
from typing import TYPE_CHECKING, Optional
from fastapi import (
APIRouter,
@ -35,6 +35,9 @@ from langflow.services.chat.service import ChatService
from langflow.services.deps import get_chat_service, get_session
from langflow.services.monitor.utils import log_vertex_build
if TYPE_CHECKING:
from langflow.graph.vertex.types import ChatVertex
router = APIRouter(tags=["Chat"])
@ -237,27 +240,40 @@ async def build_vertex_stream(
else:
graph = cache.get("result")
vertex = graph.get_vertex(vertex_id)
vertex: "ChatVertex" = graph.get_vertex(vertex_id)
if not hasattr(vertex, "stream"):
raise ValueError(f"Vertex {vertex_id} does not support streaming")
if not vertex.pinned or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": "Building vertex"},
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
number_of_chunks = 0
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
number_of_chunks += 1
yield str(stream_data)
logger.debug(f"Number of chunks: {number_of_chunks}")
elif vertex.result is not None:
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
else:
raise ValueError(f"No result found for vertex {vertex_id}")
except Exception as exc:
logger.error(f"Error building vertex: {exc}")
yield str(StreamData(event="error", data={"error": str(exc)}))
yield str(StreamData(event="close", data={"message": "Stream closed"}))
finally:
logger.debug("Closing stream")
yield str(StreamData(event="close", data={"message": "Stream closed"}))
return StreamingResponse(stream_vertex(), media_type="text/event-stream")
except Exception as exc:

View file

@ -8,7 +8,7 @@ from loguru import logger
from langflow.graph.edge.base import ContractEdge
from langflow.graph.graph.constants import lazy_load_vertex_dict
from langflow.graph.graph.utils import process_flow
from langflow.graph.schema import InterfaceComponentTypes
from langflow.graph.schema import INPUT_FIELD_NAME, InterfaceComponentTypes
from langflow.graph.vertex.base import Vertex
from langflow.graph.vertex.types import (
ChatVertex,

View file

@ -424,15 +424,16 @@ class ChatVertex(StatelessVertex):
message = message.text if hasattr(message, "text") else message
yield message
complete_message += message
self._built_object = Record(text=complete_message, data=self.artifacts)
self._built_result = complete_message
# Update artifacts with the message
# and remove the stream_url
self.artifacts = ChatOutputResponse(
message=complete_message,
sender=self.params.get("sender", ""),
sender_name=self.params.get("sender_name", ""),
).model_dump()
self.params[INPUT_FIELD_NAME] = complete_message
self._built_object = Record(text=complete_message, data=self.artifacts)
self._built_result = complete_message
# Update artifacts with the message
# and remove the stream_url
logger.debug(f"Streamed message: {complete_message}")
await log_message(
@ -443,6 +444,9 @@ class ChatVertex(StatelessVertex):
artifacts=self.artifacts,
)
self._validate_built_object()
self._built = True
class RoutingVertex(StatelessVertex):
def __init__(self, data: Dict, graph):

View file

@ -27,14 +27,18 @@ from langflow.utils import validate
from langflow.utils.util import get_base_classes
def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: List[str]):
def add_output_types(
frontend_node: CustomComponentFrontendNode, return_types: List[str]
):
"""Add output types to the frontend node"""
for return_type in return_types:
if return_type is None:
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid return type. Please check your code and try again."),
"error": (
"Invalid return type. Please check your code and try again."
),
"traceback": traceback.format_exc(),
},
)
@ -63,14 +67,18 @@ def reorder_fields(frontend_node: CustomComponentFrontendNode, field_order: List
frontend_node.template.fields = reordered_fields
def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: List[str]):
def add_base_classes(
frontend_node: CustomComponentFrontendNode, return_types: List[str]
):
"""Add base classes to the frontend node"""
for return_type_instance in return_types:
if return_type_instance is None:
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid return type. Please check your code and try again."),
"error": (
"Invalid return type. Please check your code and try again."
),
"traceback": traceback.format_exc(),
},
)
@ -145,10 +153,14 @@ def add_new_custom_field(
# If options is a list, then it's a dropdown
# If options is None, then it's a list of strings
is_list = isinstance(field_config.get("options"), list)
field_config["is_list"] = is_list or field_config.get("is_list", False) or field_contains_list
field_config["is_list"] = (
is_list or field_config.get("is_list", False) or field_contains_list
)
if "name" in field_config:
warnings.warn("The 'name' key in field_config is used to build the object and can't be changed.")
warnings.warn(
"The 'name' key in field_config is used to build the object and can't be changed."
)
required = field_config.pop("required", field_required)
placeholder = field_config.pop("placeholder", "")
@ -179,7 +191,9 @@ def add_extra_fields(frontend_node, field_config, function_args):
if "name" not in extra_field or extra_field["name"] == "self":
continue
field_name, field_type, field_value, field_required = get_field_properties(extra_field)
field_name, field_type, field_value, field_required = get_field_properties(
extra_field
)
config = field_config.get(field_name, {})
frontend_node = add_new_custom_field(
frontend_node,
@ -217,7 +231,9 @@ def run_build_config(
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid type convertion. Please check your code and try again."),
"error": (
"Invalid type convertion. Please check your code and try again."
),
"traceback": traceback.format_exc(),
},
) from exc
@ -245,7 +261,9 @@ def run_build_config(
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid type convertion. Please check your code and try again."),
"error": (
"Invalid type convertion. Please check your code and try again."
),
"traceback": traceback.format_exc(),
},
) from exc
@ -300,16 +318,24 @@ def build_custom_component_template(
frontend_node = build_frontend_node(custom_component.template_config)
logger.debug("Updated attributes")
field_config, custom_instance = run_build_config(custom_component, user_id=user_id, update_field=update_field)
field_config, custom_instance = run_build_config(
custom_component, user_id=user_id, update_field=update_field
)
logger.debug("Built field config")
entrypoint_args = custom_component.get_function_entrypoint_args
add_extra_fields(frontend_node, field_config, entrypoint_args)
frontend_node = add_code_field(frontend_node, custom_component.code, field_config.get("code", {}))
frontend_node = add_code_field(
frontend_node, custom_component.code, field_config.get("code", {})
)
add_base_classes(frontend_node, custom_component.get_function_entrypoint_return_type)
add_output_types(frontend_node, custom_component.get_function_entrypoint_return_type)
add_base_classes(
frontend_node, custom_component.get_function_entrypoint_return_type
)
add_output_types(
frontend_node, custom_component.get_function_entrypoint_return_type
)
logger.debug("Added base classes")
reorder_fields(frontend_node, custom_instance._get_field_order())
@ -321,7 +347,9 @@ def build_custom_component_template(
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid type convertion. Please check your code and try again."),
"error": (
"Invalid type convertion. Please check your code and try again."
),
"traceback": traceback.format_exc(),
},
) from exc
@ -345,7 +373,9 @@ def build_custom_components(settings_service):
if not settings_service.settings.COMPONENTS_PATH:
return {}
logger.info(f"Building custom components from {settings_service.settings.COMPONENTS_PATH}")
logger.info(
f"Building custom components from {settings_service.settings.COMPONENTS_PATH}"
)
custom_components_from_file = {}
processed_paths = set()
for path in settings_service.settings.COMPONENTS_PATH:
@ -356,7 +386,9 @@ def build_custom_components(settings_service):
custom_component_dict = build_custom_component_list_from_path(path_str)
if custom_component_dict:
category = next(iter(custom_component_dict))
logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}")
logger.info(
f"Loading {len(custom_component_dict[category])} component(s) from category {category}"
)
custom_components_from_file = merge_nested_dicts_with_renaming(
custom_components_from_file, custom_component_dict
)
@ -400,8 +432,9 @@ def sanitize_field_config(field_config: Dict):
def build_component(component):
"""Build a single component."""
component_name = determine_component_name(component)
logger.debug(f"Building component: {component_name}")
component_template = create_component_template(component)
logger.debug(f"Building component: {component_name, component.get('output_types')}")
return component_name, component_template

View file

@ -25,7 +25,10 @@ def patching(record):
def configure(log_level: Optional[str] = None, log_file: Optional[Path] = None):
if os.getenv("LANGFLOW_LOG_LEVEL") in VALID_LOG_LEVELS and log_level is None:
if (
os.getenv("LANGFLOW_LOG_LEVEL", "").upper() in VALID_LOG_LEVELS
and log_level is None
):
log_level = os.getenv("LANGFLOW_LOG_LEVEL")
if log_level is None:
log_level = "INFO"