From cda65c07ded943ba197d68eefc737970dc404ba1 Mon Sep 17 00:00:00 2001 From: ogabrielluiz Date: Tue, 11 Jun 2024 15:44:05 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20(chat.py):=20Fix=20issue=20with?= =?UTF-8?q?=20logs=20assignment=20in=20build=5Fvertex=20function=20?= =?UTF-8?q?=F0=9F=90=9B=20(schemas.py):=20Fix=20issue=20with=20logs=20fiel?= =?UTF-8?q?d=20type=20in=20ResultDataResponse=20class=20=F0=9F=90=9B=20(Pr?= =?UTF-8?q?ompt.py):=20Fix=20issue=20with=20kwargs=20assignment=20in=20bui?= =?UTF-8?q?ld=5Fprompt=20method=20=F0=9F=90=9B=20(component.py):=20Fix=20i?= =?UTF-8?q?ssue=20with=20results=20and=20arguments=20assignment=20in=20Com?= =?UTF-8?q?ponent=20class=20=F0=9F=90=9B=20(schema.py):=20Fix=20issue=20wi?= =?UTF-8?q?th=20logs=20field=20type=20in=20ResultData=20class?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 📝 (types.py): Add ResultData import and build_logs_from_artifacts function for better logging and message extraction 📝 (setup.py): Set DEFAULT_PROMPT_INTUT_TYPES for missing template keys to provide default input types 📝 (loading.py): Update build_component function to return artifacts along with build_results 📝 (schema.py): Add build_logs_from_artifacts function to generate logs from artifacts 📝 (utils.py): Update log_transaction function to use source instead of vertex and handle source result for outputs 📝 (prompt.py): Define DEFAULT_PROMPT_INTUT_TYPES constant and use it for missing template input types --- src/backend/base/langflow/api/v1/chat.py | 13 +++- src/backend/base/langflow/api/v1/schemas.py | 2 +- .../langflow/components/inputs/ChatInput.py | 4 +- .../base/langflow/components/inputs/Prompt.py | 3 +- .../custom/custom_component/component.py | 26 +++++++- src/backend/base/langflow/graph/schema.py | 22 ++++--- .../base/langflow/graph/vertex/base.py | 8 +-- .../base/langflow/graph/vertex/types.py | 61 ++++++++++++++++++- .../base/langflow/initial_setup/setup.py | 7 ++- .../langflow/interface/initialize/loading.py | 5 +- src/backend/base/langflow/schema/schema.py | 20 ++++++ .../base/langflow/services/monitor/utils.py | 8 +-- .../base/langflow/template/field/prompt.py | 4 +- 13 files changed, 148 insertions(+), 35 deletions(-) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index f6a2efbae..35d1f4406 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -180,14 +180,21 @@ async def build_vertex( inputs_dict=inputs.model_dump() if inputs else {}, files=files, ) - log_obj = Log(message=vertex.artifacts_raw, type=vertex.artifacts_type) + if isinstance(vertex.artifacts_raw, dict): + logs = {} + for key in vertex.artifacts_raw: + log_obj = Log(message=vertex.artifacts_raw[key], type=vertex.artifacts_type[key]) + logs[key] = log_obj + else: + logs = [Log(message=vertex.artifacts_raw, type=vertex.artifacts_type)] + result_data_response = ResultDataResponse(**result_dict.model_dump()) except Exception as exc: logger.exception(f"Error building vertex: {exc}") params = format_exception_message(exc) valid = False - log_obj = Log(message=params, type="error") + logs = [Log(message=params, type="error")] result_data_response = ResultDataResponse(results={}) artifacts = {} # If there's an error building the vertex @@ -195,7 +202,7 @@ async def build_vertex( await chat_service.clear_cache(flow_id_str) result_data_response.message = artifacts - result_data_response.logs.append(log_obj) + result_data_response.logs = logs # Log the vertex build if not vertex.will_stream: diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index 1e0308bd5..e8d40bcf6 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -245,7 +245,7 @@ class VerticesOrderResponse(BaseModel): class ResultDataResponse(BaseModel): results: Optional[Any] = Field(default_factory=dict) - logs: List[Log | None] = Field(default_factory=list) + logs: dict[str, list[Log]] = Field(default_factory=dict) message: Optional[Any] = Field(default_factory=dict) artifacts: Optional[Any] = Field(default_factory=dict) timedelta: Optional[float] = None diff --git a/src/backend/base/langflow/components/inputs/ChatInput.py b/src/backend/base/langflow/components/inputs/ChatInput.py index 406de75b6..4c0c17959 100644 --- a/src/backend/base/langflow/components/inputs/ChatInput.py +++ b/src/backend/base/langflow/components/inputs/ChatInput.py @@ -33,8 +33,8 @@ class ChatInput(ChatComponent): ), ] outputs = [ - Output(display_name="Message", name="message", method="text_response"), - Output(display_name="Record", name="record", method="message_response"), + Output(display_name="Text", name="text", method="text_response"), + Output(display_name="Message", name="message", method="message_response"), ] def text_response(self) -> Text: diff --git a/src/backend/base/langflow/components/inputs/Prompt.py b/src/backend/base/langflow/components/inputs/Prompt.py index efb227c5e..afc1e2a5b 100644 --- a/src/backend/base/langflow/components/inputs/Prompt.py +++ b/src/backend/base/langflow/components/inputs/Prompt.py @@ -31,6 +31,7 @@ class PromptComponent(Component): async def build_prompt( self, ) -> Prompt: - prompt = await Prompt.from_template_and_variables(self.template, self.kwargs) + kwargs = {k: v for k, v in self._arguments.items() if k != "template"} + prompt = await Prompt.from_template_and_variables(self.template, kwargs) self.status = prompt.format_text() return prompt diff --git a/src/backend/base/langflow/custom/custom_component/component.py b/src/backend/base/langflow/custom/custom_component/component.py index c25a34d93..e6308da04 100644 --- a/src/backend/base/langflow/custom/custom_component/component.py +++ b/src/backend/base/langflow/custom/custom_component/component.py @@ -17,6 +17,7 @@ import yaml from loguru import logger from pydantic import BaseModel +from langflow.schema.artifact import get_artifact_type, post_process_raw from langflow.schema.record import Record from langflow.template.field.base import UNDEFINED, Input, Output @@ -51,12 +52,15 @@ class Component(CustomComponent): inputs: Optional[List[Input]] = None outputs: Optional[List[Output]] = None code_class_base_inheritance: ClassVar[str] = "Component" + _results: dict = {} + _arguments: dict = {} def set_attributes(self, params: dict): for key, value in params.items(): if key in self.__dict__: raise ValueError(f"Key {key} already exists in {self.__class__.__name__}") setattr(self, key, value) + self._arguments = params def _set_outputs(self, outputs: List[dict]): self.outputs = [Output(**output) for output in outputs] @@ -65,7 +69,7 @@ class Component(CustomComponent): async def build_results(self, vertex: "Vertex"): _results = {} - + _artifacts = {} if hasattr(self, "outputs"): self._set_outputs(vertex.outputs) for output in self.outputs: @@ -73,7 +77,7 @@ class Component(CustomComponent): # or if it's not connected to any vertex if not vertex.outgoing_edges or output.name in vertex.edges_source_names: method: Callable | Awaitable = getattr(self, output.method) - if output.cache and not isinstance(output.value, UNDEFINED): + if output.cache and output.value != UNDEFINED: _results[output.name] = output.value else: result = method() @@ -82,8 +86,24 @@ class Component(CustomComponent): result = await result _results[output.name] = result output.value = result + custom_repr = self.custom_repr() + if custom_repr is None and isinstance(result, (dict, Record, str)): + custom_repr = result + if not isinstance(custom_repr, str): + custom_repr = str(custom_repr) + raw = self.status + if hasattr(raw, "data") and raw is not None: + raw = raw.data - return _results + elif hasattr(raw, "model_dump") and raw is not None: + raw = raw.model_dump() + artifact_type = get_artifact_type(self.status, result) + raw = post_process_raw(raw, artifact_type) + artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} + _artifacts[output.name] = artifact + self._artifacts = _artifacts + self._results = _results + return _results, _artifacts def custom_repr(self): # ! Temporary REPR diff --git a/src/backend/base/langflow/graph/schema.py b/src/backend/base/langflow/graph/schema.py index 766575364..21050aaaa 100644 --- a/src/backend/base/langflow/graph/schema.py +++ b/src/backend/base/langflow/graph/schema.py @@ -11,7 +11,7 @@ from langflow.utils.schemas import ChatOutputResponse, ContainsEnumMeta class ResultData(BaseModel): results: Optional[Any] = Field(default_factory=dict) artifacts: Optional[Any] = Field(default_factory=dict) - logs: Optional[List[dict]] = Field(default_factory=list) + logs: Optional[dict] = Field(default_factory=dict) messages: Optional[list[ChatOutputResponse]] = Field(default_factory=list) timedelta: Optional[float] = None duration: Optional[str] = None @@ -30,17 +30,19 @@ class ResultData(BaseModel): def validate_model(cls, values): if not values.get("logs") and values.get("artifacts"): # Build the log from the artifacts - message = values["artifacts"] - # ! Temporary fix - if not isinstance(message, dict): - message = {"message": message} + for key in values["artifacts"]: + message = values["artifacts"][key] - if "stream_url" in message and "type" in message: - stream_url = StreamURL(location=message["stream_url"]) - values["logs"] = [Log(message=stream_url, type=message["type"])] - elif "type" in message: - values["logs"] = [Log(message=message, type=message["type"])] + # ! Temporary fix + if not isinstance(message, dict): + message = {"message": message} + + if "stream_url" in message and "type" in message: + stream_url = StreamURL(location=message["stream_url"]) + values["logs"].update({key: Log(message=stream_url, type=message["type"])}) + elif "type" in message: + values["logs"].update({Log(message=message, type=message["type"])}) return values diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index 541990426..4a1b412d4 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -64,8 +64,8 @@ class Vertex: self._built_result = None self._built = False self.artifacts: Dict[str, Any] = {} - self.artifacts_raw: Any = None - self.artifacts_type: Optional[str] = None + self.artifacts_raw: Dict[str, Any] = {} + self.artifacts_type: Dict[str, str] = {} self.steps: List[Callable] = [self._build] self.steps_ran: List[Callable] = [] self.task_id: Optional[str] = None @@ -555,11 +555,11 @@ class Vertex: """ flow_id = self.graph.flow_id if not self._built: - log_transaction(flow_id, vertex=self, target=requester, status="error") + log_transaction(flow_id, source=self, target=requester, status="error") raise ValueError(f"Component {self.display_name} has not been built yet") result = self._built_result if self.use_result else self._built_object - log_transaction(flow_id, vertex=self, target=requester, status="success") + log_transaction(flow_id, source=self, target=requester, status="success") return result async def _build_vertex_and_update_params(self, key, vertex: "Vertex"): diff --git a/src/backend/base/langflow/graph/vertex/types.py b/src/backend/base/langflow/graph/vertex/types.py index f3bb1d58f..048125f85 100644 --- a/src/backend/base/langflow/graph/vertex/types.py +++ b/src/backend/base/langflow/graph/vertex/types.py @@ -5,12 +5,12 @@ import yaml from langchain_core.messages import AIMessage, AIMessageChunk from loguru import logger -from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes +from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes, ResultData from langflow.graph.utils import UnbuiltObject, serialize_field from langflow.graph.vertex.base import Vertex from langflow.schema import Record from langflow.schema.artifact import ArtifactType -from langflow.schema.schema import INPUT_FIELD_NAME +from langflow.schema.schema import INPUT_FIELD_NAME, Log, build_logs_from_artifacts from langflow.services.monitor.utils import log_transaction, log_vertex_build from langflow.utils.schemas import ChatOutputResponse, RecordOutputResponse from langflow.utils.util import unescape_string @@ -22,6 +22,7 @@ if TYPE_CHECKING: class CustomComponentVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="custom_components") + self.logs: Dict[str, Log] = {} def _built_object_repr(self): if self.artifacts and "repr" in self.artifacts: @@ -45,6 +46,10 @@ class ComponentVertex(Vertex): self._built_object, self.artifacts = result elif len(result) == 3: self._custom_component, self._built_object, self.artifacts = result + for key in self.artifacts: + self.artifacts_raw[key] = self.artifacts[key].get("raw", None) + self.artifacts_type[key] = self.artifacts[key].get("type", None) or ArtifactType.UNKNOWN.value + self.logs = build_logs_from_artifacts(self.artifacts) else: self._built_object = result @@ -92,6 +97,58 @@ class ComponentVertex(Vertex): log_transaction(source=self, target=requester, flow_id=self.graph.flow_id, status="success") return result + def extract_messages_from_artifacts(self, artifacts: Dict[str, Any]) -> List[dict]: + """ + Extracts messages from the artifacts. + + Args: + artifacts (Dict[str, Any]): The artifacts to extract messages from. + + Returns: + List[str]: The extracted messages. + """ + messages = [] + for key in artifacts: + artifact = artifacts[key] + if any(key not in artifact for key in ["text", "sender", "sender_name", "session_id", "stream_url"]): + continue + try: + messages.append( + ChatOutputResponse( + message=artifacts[key]["text"], + sender=artifacts[key].get("sender"), + sender_name=artifacts[key].get("sender_name"), + session_id=artifacts[key].get("session_id"), + stream_url=artifacts[key].get("stream_url"), + files=[ + {"path": file} if isinstance(file, str) else file + for file in artifacts[key].get("files", []) + ], + component_id=self.id, + type=self.artifacts_type[key], + ).model_dump(exclude_none=True) + ) + except KeyError: + pass + return messages + + def _finalize_build(self): + result_dict = self.get_built_result() + # We need to set the artifacts to pass information + # to the frontend + self.set_artifacts() + artifacts = self.artifacts_raw + messages = self.extract_messages_from_artifacts(artifacts) + result_dict = ResultData( + results=result_dict, + artifacts=artifacts, + logs=self.logs, + messages=messages, + component_display_name=self.display_name, + component_id=self.id, + ) + self.set_result(result_dict) + class InterfaceVertex(ComponentVertex): def __init__(self, data: Dict, graph): diff --git a/src/backend/base/langflow/initial_setup/setup.py b/src/backend/base/langflow/initial_setup/setup.py index d10a6ea28..359bd2f21 100644 --- a/src/backend/base/langflow/initial_setup/setup.py +++ b/src/backend/base/langflow/initial_setup/setup.py @@ -23,6 +23,7 @@ from langflow.services.database.models.folder.model import Folder, FolderCreate from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist from langflow.services.database.models.user.crud import get_user_by_username from langflow.services.deps import get_settings_service, get_storage_service, get_variable_service, session_scope +from langflow.template.field.prompt import DEFAULT_PROMPT_INTUT_TYPES STARTER_FOLDER_NAME = "Starter Projects" STARTER_FOLDER_DESCRIPTION = "Starter projects to help you get started in Langflow." @@ -72,6 +73,9 @@ def update_projects_components_with_latest_component_versions(project_data, all_ } ) node_data["template"][key]["value"] = value["value"] + for key, value in node_data["template"].items(): + if key not in latest_template: + node_data["template"][key]["input_types"] = DEFAULT_PROMPT_INTUT_TYPES node_changes_log[node_data["display_name"]].append( { "attr": "_type", @@ -173,6 +177,7 @@ def update_new_output(data): "types": types, "selected": selected, "name": " | ".join(types), + "display_name": " | ".join(types), } ) deduplicated_outputs = [] @@ -557,7 +562,7 @@ async def create_or_update_starter_projects(): try: Graph.from_payload(updated_project_data) except Exception as e: - raise ValueError(f"Error loading graph from project {project_name}: {e}") + logger.error(e) if updated_project_data != project_data: project_data = updated_project_data # We also need to update the project data in the file diff --git a/src/backend/base/langflow/interface/initialize/loading.py b/src/backend/base/langflow/interface/initialize/loading.py index 132703b88..d3d52d8b2 100644 --- a/src/backend/base/langflow/interface/initialize/loading.py +++ b/src/backend/base/langflow/interface/initialize/loading.py @@ -122,10 +122,9 @@ async def build_component( # Now set the params as attributes of the custom_component custom_component.set_attributes(params) - build_results = await custom_component.build_results(vertex) - custom_repr = custom_component.custom_repr() + build_results, artifacts = await custom_component.build_results(vertex) - return custom_component, build_results, {"repr": custom_repr} + return custom_component, build_results, artifacts async def build_custom_component(params: dict, custom_component: "CustomComponent"): diff --git a/src/backend/base/langflow/schema/schema.py b/src/backend/base/langflow/schema/schema.py index 5153941a5..d82ceca83 100644 --- a/src/backend/base/langflow/schema/schema.py +++ b/src/backend/base/langflow/schema/schema.py @@ -1,3 +1,4 @@ +from collections import defaultdict from typing import Literal from typing_extensions import TypedDict @@ -15,3 +16,22 @@ class StreamURL(TypedDict): class Log(TypedDict): message: str | dict | StreamURL type: str + + +def build_logs_from_artifacts(artifacts: dict) -> dict: + logs = defaultdict(list) + for key in artifacts: + message = artifacts[key] + + if not isinstance(message, dict): + message = {"message": message} + + if "stream_url" in message and "type" in message: + stream_url = StreamURL(location=message["stream_url"]) + log = Log(message=stream_url, type=message["type"]) + elif "type" in message: + log = Log(message=message, type=message["type"]) + + logs[key].append(log) + + return logs diff --git a/src/backend/base/langflow/services/monitor/utils.py b/src/backend/base/langflow/services/monitor/utils.py index 706d62348..b3daf6a47 100644 --- a/src/backend/base/langflow/services/monitor/utils.py +++ b/src/backend/base/langflow/services/monitor/utils.py @@ -178,15 +178,15 @@ def build_clean_params(target: "Vertex") -> dict: return params -def log_transaction(flow_id, vertex: "Vertex", status, target: Optional["Vertex"] = None, error=None): +def log_transaction(flow_id, source: "Vertex", status, target: Optional["Vertex"] = None, error=None): try: monitor_service = get_monitor_service() - clean_params = build_clean_params(vertex) + clean_params = build_clean_params(source) data = { - "vertex_id": str(vertex.id), + "vertex_id": str(source.id), "target_id": str(target.id) if target else None, "inputs": clean_params, - "outputs": vertex.result.model_dump_json() if vertex.result else None, + "outputs": source.result.model_dump_json() if source.result else None, "timestamp": monitor_service.get_timestamp(), "status": status, "error": error, diff --git a/src/backend/base/langflow/template/field/prompt.py b/src/backend/base/langflow/template/field/prompt.py index c57b40e36..f8f3b51bd 100644 --- a/src/backend/base/langflow/template/field/prompt.py +++ b/src/backend/base/langflow/template/field/prompt.py @@ -2,6 +2,8 @@ from typing import Optional from langflow.template.field.base import Input +DEFAULT_PROMPT_INTUT_TYPES = ["Document", "Message", "Record", "Text"] + class DefaultPromptField(Input): name: str @@ -10,5 +12,5 @@ class DefaultPromptField(Input): advanced: bool = False multiline: bool = True - input_types: list[str] = ["Document", "Message", "Record", "Text"] + input_types: list[str] = DEFAULT_PROMPT_INTUT_TYPES value: str = "" # Set the value to empty string