From 3eaad7bc3a9c748c87a0dc701efaf7f48ab27554 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 2 Sep 2024 10:41:19 -0300 Subject: [PATCH] feat: add EventManager to centralize callbacks (#3434) * refactor: Update MessageBase text attribute based on isinstance check. * feat: Add update_message function to update a message in the database. * refactor(chat): Update imports and remove unnecessary config method in ChatComponent. * refactor: Add stream_message method to ChatComponent. * refactor: Update method call in ChatOutput component. * feat: Add callback function to custom component and update build_results signature. * feat: Add callback parameter to instantiate_class function. * feat(graph): Add callback functions for sync and async operations. * feat: Add callback function support to vertex build process. * feat: Add handling for added message in InterfaceVertex class. * feat: Add callback support to Graph methods. * feat(chat): Add callback function to build_vertices function. * refactor: Simplify update_message function and use session_scope for session management. * fix: Call set_callback method if available on custom component. * refactor(chat): Update chat message chunk handling and ID conversion. * feat: Add null check before setting cache in build_vertex_stream function. * refactor: Fix send_event_wrapper function and add callback parameter to _build_vertex function. * refactor: Simplify conditional statement and import order in ChatOutput. * refactor: move log method to Component class. * refactor: Simplify CallbackFunction definition. * feat: Initialize _current_output attribute in Component class. * feat: store current output name in custom component during processing. * feat: Add current output and component ID to log data. * fix: Add condition to check current output before invoking callback. * refactor: Update callback to log_callback in graph methods. * feat: Add test for callback graph execution with log messages. * update projects * fix(chat.py): fix condition to check if message text is a string before updating message text in the database * refactor(ChatOutput.py): update ChatOutput class to correctly store and assign the message value to ensure consistency and avoid potential bugs * refactor(chat.py): update return type of store_message method to return a single Message object instead of a list of Messages refactor(chat.py): update logic to correctly handle updating and returning a single stored message object instead of a list of messages * update starter projects * refactor(component.py): update type hint for name parameter in log method to be more explicit * feat: Add EventManager class for managing events and event registration * refactor: Update log_callback to event_manager in custom component classes * refactor(component.py): rename _log_callback to _event_manager and update method call to on_log for better clarity and consistency * refactor(chat.py): rename _log_callback method to _event_manager.on_token for clarity and consistency in method naming * refactor: Rename log_callback to event_manager for clarity and consistency * refactor: Update Vertex class to use EventManager instead of log_callback for better clarity and consistency * refactor: update build_flow to use EventManager * refactor: Update EventManager class to use Protocol for event callbacks * if event_type is not passed, it uses the default send_event * Add method to register event functions in EventManager - Introduced `register_event_function` method to allow passing custom event functions. - Updated `noop` method to accept `event_type` parameter. - Adjusted `__getattr__` to return `EventCallback` type. * update test_callback_graph * Add unit tests for EventManager in test_event_manager.py - Added tests for event registration, including default event type, empty string names, and specific event types. - Added tests for custom event functions and unregistered event access. - Added tests for event sending, including JSON formatting, empty data, and large payloads. - Added tests for handling JSON serialization errors and the noop function. * revert chatOutput change * Add validation for event function in EventManager - Introduced `_validate_event_function` method to ensure event functions are callable and have the correct parameters. - Updated `register_event_function` to use the new validation method. * Add tests for EventManager's event function validation logic - Introduce `TestValidateEventFunction` class to test various scenarios for `_validate_event_function`. - Add tests for valid event functions, non-callable event functions, invalid parameter counts, and parameter type validation. - Include tests for handling unannotated parameters, flexible arguments, and keyword-only parameters. - Ensure proper warnings and exceptions are raised for invalid event functions. * Add type ignore comment to lambda function in test_event_manager.py * refactor: Update EventManager class to use Protocol for event callbacks * refactor(event_manager.py): simplify event registration and validation logic to enhance readability and maintainability feat(event_manager.py): enforce event name conventions and improve callback handling for better error management * refactor(chat.py): standardize event_manager method calls by using keyword arguments for better clarity and consistency refactor(chat.py): extract message processing logic into separate methods for improved readability and maintainability fix(chat.py): ensure proper handling of async iterators in message streaming refactor(component.py): simplify event logging by removing unnecessary event name parameter in on_log method call * update event manager tests * Add callback validation and manager parameter in EventManager - Introduced `_validate_callback` method to ensure callbacks are callable and have the correct parameters. - Updated `register_event` to include `manager` parameter in the callback. * Add support for passing callback through the Graph in test_callback_graph * fix(event_manager.py): update EventCallback signature to include manager parameter for better context in event handling --- src/backend/base/langflow/api/v1/chat.py | 49 ++-- src/backend/base/langflow/base/io/chat.py | 91 +++---- .../langflow/components/outputs/ChatOutput.py | 2 +- .../custom/custom_component/component.py | 38 ++- .../custom_component/custom_component.py | 18 +- src/backend/base/langflow/events/__init__.py | 0 .../base/langflow/events/event_manager.py | 69 ++++++ src/backend/base/langflow/graph/graph/base.py | 22 +- .../base/langflow/graph/graph/schema.py | 7 +- .../base/langflow/graph/vertex/base.py | 18 +- .../base/langflow/graph/vertex/types.py | 1 + .../Basic Prompting (Hello, World).json | 2 +- .../starter_projects/Blog Writer.json | 2 +- .../starter_projects/Complex Agent.json | 2 +- .../starter_projects/Document QA.json | 2 +- .../starter_projects/Hierarchical Agent.json | 2 +- .../starter_projects/Memory Chatbot.json | 2 +- .../starter_projects/Sequential Agent.json | 2 +- .../starter_projects/Vector Store RAG.json | 2 +- .../langflow/interface/initialize/loading.py | 4 + .../services/database/models/message/crud.py | 19 ++ .../services/database/models/message/model.py | 8 +- .../tests/unit/events/test_event_manager.py | 230 ++++++++++++++++++ .../unit/graph/graph/test_callback_graph.py | 48 ++++ 24 files changed, 534 insertions(+), 106 deletions(-) create mode 100644 src/backend/base/langflow/events/__init__.py create mode 100644 src/backend/base/langflow/events/event_manager.py create mode 100644 src/backend/base/langflow/services/database/models/message/crud.py create mode 100644 src/backend/tests/unit/events/test_event_manager.py create mode 100644 src/backend/tests/unit/graph/graph/test_callback_graph.py diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index a500eb8ce..5990807f3 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -31,6 +31,7 @@ from langflow.api.v1.schemas import ( VertexBuildResponse, VerticesOrderResponse, ) +from langflow.events.event_manager import EventManager, create_default_event_manager from langflow.exceptions.component import ComponentBuildException from langflow.graph.graph.base import Graph from langflow.graph.utils import log_vertex_build @@ -204,7 +205,7 @@ async def build_flow( logger.exception(exc) raise HTTPException(status_code=500, detail=str(exc)) from exc - async def _build_vertex(vertex_id: str, graph: "Graph") -> VertexBuildResponse: + async def _build_vertex(vertex_id: str, graph: "Graph", event_manager: "EventManager") -> VertexBuildResponse: flow_id_str = str(flow_id) next_runnable_vertices = [] @@ -222,6 +223,7 @@ async def build_flow( files=files, get_cache=chat_service.get_cache, set_cache=chat_service.set_cache, + event_manager=event_manager, ) result_dict = vertex_build_result.result_dict params = vertex_build_result.params @@ -316,17 +318,13 @@ async def build_flow( message = parse_exception(exc) raise HTTPException(status_code=500, detail=message) from exc - def send_event(event_type: str, value: dict, queue: asyncio.Queue) -> None: - json_data = {"event": event_type, "data": value} - event_id = uuid.uuid4() - logger.debug(f"sending event {event_id}: {event_type}") - str_data = json.dumps(json_data) + "\n\n" - queue.put_nowait((event_id, str_data.encode("utf-8"), time.time())) - async def build_vertices( - vertex_id: str, graph: "Graph", queue: asyncio.Queue, client_consumed_queue: asyncio.Queue + vertex_id: str, + graph: "Graph", + client_consumed_queue: asyncio.Queue, + event_manager: "EventManager", ) -> None: - build_task = asyncio.create_task(await asyncio.to_thread(_build_vertex, vertex_id, graph)) + build_task = asyncio.create_task(await asyncio.to_thread(_build_vertex, vertex_id, graph, event_manager)) try: await build_task except asyncio.CancelledError as exc: @@ -341,13 +339,15 @@ async def build_flow( build_data = json.loads(vertex_build_response_json) except Exception as exc: raise ValueError(f"Error serializing vertex build response: {exc}") from exc - send_event("end_vertex", {"build_data": build_data}, queue) + event_manager.on_end_vertex(data={"build_data": build_data}) await client_consumed_queue.get() if vertex_build_response.valid: if vertex_build_response.next_vertices_ids: tasks = [] for next_vertex_id in vertex_build_response.next_vertices_ids: - task = asyncio.create_task(build_vertices(next_vertex_id, graph, queue, client_consumed_queue)) + task = asyncio.create_task( + build_vertices(next_vertex_id, graph, client_consumed_queue, event_manager) + ) tasks.append(task) try: await asyncio.gather(*tasks) @@ -356,7 +356,7 @@ async def build_flow( task.cancel() return - async def event_generator(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> None: + async def event_generator(event_manager: EventManager, client_consumed_queue: asyncio.Queue) -> None: if not data: # using another thread since the DB query is I/O bound vertices_task = asyncio.create_task(await asyncio.to_thread(build_graph_and_get_order)) @@ -367,9 +367,9 @@ async def build_flow( return except Exception as e: if isinstance(e, HTTPException): - send_event("error", {"error": str(e.detail), "statusCode": e.status_code}, queue) + event_manager.on_error(data={"error": str(e.detail), "statusCode": e.status_code}) raise e - send_event("error", {"error": str(e)}, queue) + event_manager.on_error(data={"error": str(e)}) raise e ids, vertices_to_run, graph = vertices_task.result() @@ -378,16 +378,16 @@ async def build_flow( ids, vertices_to_run, graph = await build_graph_and_get_order() except Exception as e: if isinstance(e, HTTPException): - send_event("error", {"error": str(e.detail), "statusCode": e.status_code}, queue) + event_manager.on_error(data={"error": str(e.detail), "statusCode": e.status_code}) raise e - send_event("error", {"error": str(e)}, queue) + event_manager.on_error(data={"error": str(e)}) raise e - send_event("vertices_sorted", {"ids": ids, "to_run": vertices_to_run}, queue) + event_manager.on_vertices_sorted(data={"ids": ids, "to_run": vertices_to_run}) await client_consumed_queue.get() tasks = [] for vertex_id in ids: - task = asyncio.create_task(build_vertices(vertex_id, graph, queue, client_consumed_queue)) + task = asyncio.create_task(build_vertices(vertex_id, graph, client_consumed_queue, event_manager)) tasks.append(task) try: await asyncio.gather(*tasks) @@ -396,8 +396,8 @@ async def build_flow( for task in tasks: task.cancel() return - send_event("end", {}, queue) - await queue.put((None, None, time.time)) + event_manager.on_end(data={}) + await event_manager.queue.put((None, None, time.time)) async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> typing.AsyncGenerator: while True: @@ -414,7 +414,8 @@ async def build_flow( asyncio_queue: asyncio.Queue = asyncio.Queue() asyncio_queue_client_consumed: asyncio.Queue = asyncio.Queue() - main_task = asyncio.create_task(event_generator(asyncio_queue, asyncio_queue_client_consumed)) + event_manager = create_default_event_manager(queue=asyncio_queue) + main_task = asyncio.create_task(event_generator(event_manager, asyncio_queue_client_consumed)) def on_disconnect(): logger.debug("Client disconnected, closing tasks") @@ -640,6 +641,7 @@ async def build_vertex_stream( flow_id_str = str(flow_id) async def stream_vertex(): + graph = None try: cache = await chat_service.get_cache(flow_id_str) if not cache: @@ -693,7 +695,8 @@ async def build_vertex_stream( yield str(StreamData(event="error", data={"error": exc_message})) finally: logger.debug("Closing stream") - await chat_service.set_cache(flow_id_str, graph) + if graph: + await chat_service.set_cache(flow_id_str, graph) yield str(StreamData(event="close", data={"message": "Stream closed"})) return StreamingResponse(stream_vertex(), media_type="text/event-stream") diff --git a/src/backend/base/langflow/base/io/chat.py b/src/backend/base/langflow/base/io/chat.py index b9ff4f3d1..e0390c91a 100644 --- a/src/backend/base/langflow/base/io/chat.py +++ b/src/backend/base/langflow/base/io/chat.py @@ -1,67 +1,70 @@ -from typing import Optional, Union +from typing import AsyncIterator, Iterator, Optional, Union -from langflow.base.data.utils import IMG_FILE_TYPES, TEXT_FILE_TYPES from langflow.custom import Component from langflow.memory import store_message from langflow.schema import Data from langflow.schema.message import Message -from langflow.utils.constants import MESSAGE_SENDER_USER, MESSAGE_SENDER_AI +from langflow.services.database.models.message.crud import update_message +from langflow.utils.async_helpers import run_until_complete class ChatComponent(Component): display_name = "Chat Component" description = "Use as base for chat components." - def build_config(self): - return { - "input_value": { - "input_types": ["Text"], - "display_name": "Text", - "multiline": True, - }, - "sender": { - "options": [MESSAGE_SENDER_AI, MESSAGE_SENDER_USER], - "display_name": "Sender Type", - "advanced": True, - }, - "sender_name": {"display_name": "Sender Name", "advanced": True}, - "session_id": { - "display_name": "Session ID", - "info": "If provided, the message will be stored in the memory.", - "advanced": True, - }, - "return_message": { - "display_name": "Return Message", - "info": "Return the message as a Message containing the sender, sender_name, and session_id.", - "advanced": True, - }, - "data_template": { - "display_name": "Data Template", - "multiline": True, - "info": "In case of Message being a Data, this template will be used to convert it to text.", - "advanced": True, - }, - "files": { - "field_type": "file", - "display_name": "Files", - "file_types": TEXT_FILE_TYPES + IMG_FILE_TYPES, - "info": "Files to be sent with the message.", - "advanced": True, - }, - } - # Keep this method for backward compatibility def store_message( self, message: Message, - ) -> list[Message]: + ) -> Message: messages = store_message( message, flow_id=self.graph.flow_id, ) + if len(messages) > 1: + raise ValueError("Only one message can be stored at a time.") + stored_message = messages[0] + if hasattr(self, "_event_manager") and self._event_manager and stored_message.id: + if not isinstance(message.text, str): + complete_message = self._stream_message(message, stored_message.id) + message_table = update_message(message_id=stored_message.id, message=dict(text=complete_message)) + stored_message = Message(**message_table.model_dump()) + self.vertex._added_message = stored_message + self.status = stored_message + return stored_message - self.status = messages - return messages + def _process_chunk(self, chunk: str, complete_message: str, message: Message, message_id: str) -> str: + complete_message += chunk + data = { + "text": complete_message, + "chunk": chunk, + "sender": message.sender, + "sender_name": message.sender_name, + "id": str(message_id), + } + if self._event_manager: + self._event_manager.on_token(data=data) + return complete_message + + async def _handle_async_iterator(self, iterator: AsyncIterator, message: Message, message_id: str) -> str: + complete_message = "" + async for chunk in iterator: + complete_message = self._process_chunk(chunk.content, complete_message, message, message_id) + return complete_message + + def _stream_message(self, message: Message, message_id: str) -> str: + iterator = message.text + if not isinstance(iterator, (AsyncIterator, Iterator)): + raise ValueError("The message must be an iterator or an async iterator.") + + if isinstance(iterator, AsyncIterator): + return run_until_complete(self._handle_async_iterator(iterator, message, message_id)) + + complete_message = "" + for chunk in iterator: + complete_message = self._process_chunk(chunk.content, complete_message, message, message_id) + + return complete_message def build_with_data( self, diff --git a/src/backend/base/langflow/components/outputs/ChatOutput.py b/src/backend/base/langflow/components/outputs/ChatOutput.py index 45972712c..a0a9b8eeb 100644 --- a/src/backend/base/langflow/components/outputs/ChatOutput.py +++ b/src/backend/base/langflow/components/outputs/ChatOutput.py @@ -3,7 +3,7 @@ from langflow.inputs import BoolInput from langflow.io import DropdownInput, MessageTextInput, Output from langflow.memory import store_message from langflow.schema.message import Message -from langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI +from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER class ChatOutput(ChatComponent): diff --git a/src/backend/base/langflow/custom/custom_component/component.py b/src/backend/base/langflow/custom/custom_component/component.py index c7bd61216..354fcffb4 100644 --- a/src/backend/base/langflow/custom/custom_component/component.py +++ b/src/backend/base/langflow/custom/custom_component/component.py @@ -1,17 +1,19 @@ import inspect +from collections.abc import Callable from copy import deepcopy from typing import TYPE_CHECKING, Any, ClassVar, get_type_hints -from collections.abc import Callable from uuid import UUID import nanoid # type: ignore import yaml from pydantic import BaseModel +from langflow.events.event_manager import EventManager from langflow.graph.state.model import create_state_model from langflow.helpers.custom import format_type from langflow.schema.artifact import get_artifact_type, post_process_raw from langflow.schema.data import Data +from langflow.schema.log import LoggableType from langflow.schema.message import Message from langflow.services.tracing.schema import Log from langflow.template.field.base import UNDEFINED, Input, Output @@ -35,6 +37,7 @@ class Component(CustomComponent): outputs: list[Output] = [] code_class_base_inheritance: ClassVar[str] = "Component" _output_logs: dict[str, Log] = {} + _current_output: str = "" def __init__(self, **kwargs): # if key starts with _ it is a config @@ -56,6 +59,8 @@ class Component(CustomComponent): self._parameters = inputs or {} self._edges: list[EdgeData] = [] self._components: list[Component] = [] + self._current_output = "" + self._event_manager: EventManager | None = None self._state_model = None self.set_attributes(self._parameters) self._output_logs = {} @@ -77,6 +82,9 @@ class Component(CustomComponent): self._set_output_types() self.set_class_code() + def set_event_manager(self, event_manager: EventManager | None = None): + self._event_manager = event_manager + def _reset_all_output_values(self): for output in self.outputs: setattr(output, "value", UNDEFINED) @@ -601,7 +609,9 @@ class Component(CustomComponent): async def _build_without_tracing(self): return await self._build_results() - async def build_results(self): + async def build_results( + self, + ): if self._tracing_service: return await self._build_with_tracing() return await self._build_without_tracing() @@ -620,6 +630,7 @@ class Component(CustomComponent): ): if output.method is None: raise ValueError(f"Output {output.name} does not have a method defined.") + self._current_output = output.name method: Callable = getattr(self, output.method) if output.cache and output.value != UNDEFINED: _results[output.name] = output.value @@ -638,6 +649,7 @@ class Component(CustomComponent): result.set_flow_id(self._vertex.graph.flow_id) _results[output.name] = result output.value = result + custom_repr = self.custom_repr() if custom_repr is None and isinstance(result, (dict, Data, str)): custom_repr = result @@ -665,6 +677,7 @@ class Component(CustomComponent): _artifacts[output.name] = artifact self._output_logs[output.name] = self._logs self._logs = [] + self._current_output = "" self._artifacts = _artifacts self._results = _results if self._tracing_service: @@ -720,6 +733,25 @@ class Component(CustomComponent): return ComponentTool(component=self) def get_project_name(self): - if hasattr(self, "_tracing_service"): + if hasattr(self, "_tracing_service") and self._tracing_service: return self._tracing_service.project_name return "Langflow" + + def log(self, message: LoggableType | list[LoggableType], name: str | None = None): + """ + Logs a message. + + Args: + message (LoggableType | list[LoggableType]): The message to log. + """ + if name is None: + name = f"Log {len(self._logs) + 1}" + log = Log(message=message, type=get_artifact_type(message), name=name) + self._logs.append(log) + if self._tracing_service and self._vertex: + self._tracing_service.add_log(trace_name=self.trace_name, log=log) + if self._event_manager is not None and self._current_output: + data = log.model_dump() + data["output"] = self._current_output + data["component_id"] = self._id + self._event_manager.on_log(data=data) diff --git a/src/backend/base/langflow/custom/custom_component/custom_component.py b/src/backend/base/langflow/custom/custom_component/custom_component.py index e3357c22d..a6c5836b1 100644 --- a/src/backend/base/langflow/custom/custom_component/custom_component.py +++ b/src/backend/base/langflow/custom/custom_component/custom_component.py @@ -1,6 +1,6 @@ +from collections.abc import Callable, Sequence from pathlib import Path from typing import TYPE_CHECKING, Any, ClassVar, Optional -from collections.abc import Callable, Sequence import yaml from cachetools import TTLCache @@ -10,9 +10,7 @@ from pydantic import BaseModel from langflow.custom.custom_component.base_component import BaseComponent from langflow.helpers.flow import list_flows, load_flow, run_flow from langflow.schema import Data -from langflow.schema.artifact import get_artifact_type from langflow.schema.dotdict import dotdict -from langflow.schema.log import LoggableType from langflow.schema.schema import OutputValue from langflow.services.deps import get_storage_service, get_variable_service, session_scope from langflow.services.storage.service import StorageService @@ -509,20 +507,6 @@ class CustomComponent(BaseComponent): """ raise NotImplementedError - def log(self, message: LoggableType | list[LoggableType], name: str | None = None): - """ - Logs a message. - - Args: - message (LoggableType | list[LoggableType]): The message to log. - """ - if name is None: - name = f"Log {len(self._logs) + 1}" - log = Log(message=message, type=get_artifact_type(message), name=name) - self._logs.append(log) - if self._tracing_service and self._vertex: - self._tracing_service.add_log(trace_name=self.trace_name, log=log) - def post_code_processing(self, new_frontend_node: dict, current_frontend_node: dict): """ This function is called after the code validation is done. diff --git a/src/backend/base/langflow/events/__init__.py b/src/backend/base/langflow/events/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/base/langflow/events/event_manager.py b/src/backend/base/langflow/events/event_manager.py new file mode 100644 index 000000000..121d46544 --- /dev/null +++ b/src/backend/base/langflow/events/event_manager.py @@ -0,0 +1,69 @@ +import asyncio +import inspect +import json +import time +import uuid +from functools import partial + +from typing_extensions import Protocol + +from langflow.schema.log import LoggableType + + +class EventCallback(Protocol): + def __call__(self, *, manager: "EventManager", event_type: str, data: LoggableType): ... + + +class PartialEventCallback(Protocol): + def __call__(self, *, data: LoggableType): ... + + +class EventManager: + def __init__(self, queue: asyncio.Queue): + self.queue = queue + self.events: dict[str, PartialEventCallback] = {} + + @staticmethod + def _validate_callback(callback: EventCallback): + if not callable(callback): + raise ValueError("Callback must be callable") + # Check if it has `self, event_type and data` + sig = inspect.signature(callback) + if len(sig.parameters) != 3: + raise ValueError("Callback must have exactly 3 parameters") + if not all(param.name in ["manager", "event_type", "data"] for param in sig.parameters.values()): + raise ValueError("Callback must have exactly 3 parameters: manager, event_type, and data") + + def register_event(self, name: str, event_type: str, callback: EventCallback | None = None): + if not name: + raise ValueError("Event name cannot be empty") + if not name.startswith("on_"): + raise ValueError("Event name must start with 'on_'") + if callback is None: + _callback = partial(self.send_event, event_type=event_type) + else: + _callback = partial(callback, manager=self, event_type=event_type) + self.events[name] = _callback + + def send_event(self, *, event_type: str, data: LoggableType): + json_data = {"event": event_type, "data": data} + event_id = uuid.uuid4() + str_data = json.dumps(json_data) + "\n\n" + self.queue.put_nowait((event_id, str_data.encode("utf-8"), time.time())) + + def noop(self, *, data: LoggableType): + pass + + def __getattr__(self, name: str) -> PartialEventCallback: + return self.events.get(name, self.noop) + + +def create_default_event_manager(queue): + manager = EventManager(queue) + manager.register_event("on_token", "token") + manager.register_event("on_vertices_sorted", "vertices_sorted") + manager.register_event("on_error", "error") + manager.register_event("on_end", "end") + manager.register_event("on_message", "message") + manager.register_event("on_end_vertex", "end_vertex") + return manager diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 069fb6d17..546a6f82f 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Optional, cast import nest_asyncio from loguru import logger +from langflow.events.event_manager import EventManager from langflow.exceptions.component import ComponentBuildException from langflow.graph.edge.base import CycleEdge, Edge from langflow.graph.edge.schema import EdgeData @@ -277,7 +278,12 @@ class Graph: } self._add_edge(edge_data) - async def async_start(self, inputs: list[dict] | None = None, max_iterations: int | None = None): + async def async_start( + self, + inputs: list[dict] | None = None, + max_iterations: int | None = None, + event_manager: EventManager | None = None, + ): if not self._prepared: raise ValueError("Graph not prepared. Call prepare() first.") # The idea is for this to return a generator that yields the result of @@ -291,7 +297,7 @@ class Graph: yielded_counts: dict[str, int] = defaultdict(int) while should_continue(yielded_counts, max_iterations): - result = await self.astep() + result = await self.astep(event_manager=event_manager) yield result if hasattr(result, "vertex"): yielded_counts[result.vertex.id] += 1 @@ -322,13 +328,14 @@ class Graph: inputs: list[dict] | None = None, max_iterations: int | None = None, config: StartConfigDict | None = None, + event_manager: EventManager | None = None, ) -> Generator: if config is not None: self.__apply_config(config) #! Change this ASAP nest_asyncio.apply() loop = asyncio.get_event_loop() - async_gen = self.async_start(inputs, max_iterations) + async_gen = self.async_start(inputs, max_iterations, event_manager) async_gen_task = asyncio.ensure_future(async_gen.__anext__()) while True: @@ -1200,6 +1207,7 @@ class Graph: inputs: Optional["InputValueRequest"] = None, files: list[str] | None = None, user_id: str | None = None, + event_manager: EventManager | None = None, ): if not self._prepared: raise ValueError("Graph not prepared. Call prepare() first.") @@ -1215,6 +1223,7 @@ class Graph: files=files, get_cache=chat_service.get_cache, set_cache=chat_service.set_cache, + event_manager=event_manager, ) next_runnable_vertices = await self.get_next_runnable_vertices( @@ -1266,6 +1275,7 @@ class Graph: files: list[str] | None = None, user_id: str | None = None, fallback_to_env_vars: bool = False, + event_manager: EventManager | None = None, ) -> VertexBuildResult: """ Builds a vertex in the graph. @@ -1320,7 +1330,11 @@ class Graph: if should_build: await vertex.build( - user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files + user_id=user_id, + inputs=inputs_dict, + fallback_to_env_vars=fallback_to_env_vars, + files=files, + event_manager=event_manager, ) if set_cache is not None: vertex_dict = { diff --git a/src/backend/base/langflow/graph/graph/schema.py b/src/backend/base/langflow/graph/graph/schema.py index 4a1dcc3a3..62ad99a27 100644 --- a/src/backend/base/langflow/graph/graph/schema.py +++ b/src/backend/base/langflow/graph/graph/schema.py @@ -1,9 +1,10 @@ -from typing import TYPE_CHECKING, NamedTuple +from typing import TYPE_CHECKING, NamedTuple, Protocol from typing_extensions import NotRequired, TypedDict from langflow.graph.edge.schema import EdgeData from langflow.graph.vertex.schema import NodeData +from langflow.schema.log import LoggableType if TYPE_CHECKING: from langflow.graph.schema import ResultData @@ -44,3 +45,7 @@ class OutputConfigDict(TypedDict): class StartConfigDict(TypedDict): output: OutputConfigDict + + +class LogCallbackFunction(Protocol): + def __call__(self, event_name: str, log: LoggableType) -> None: ... diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index cc5a0ae21..761262d0e 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -11,6 +11,7 @@ from typing import TYPE_CHECKING, Any, Optional import pandas as pd from loguru import logger +from langflow.events.event_manager import EventManager from langflow.exceptions.component import ComponentBuildException from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData from langflow.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction @@ -451,6 +452,7 @@ class Vertex: self, fallback_to_env_vars, user_id=None, + event_manager: EventManager | None = None, ): """ Initiate the build process. @@ -462,12 +464,19 @@ class Vertex: raise ValueError(f"Base type for vertex {self.display_name} not found") if not self._custom_component: - custom_component, custom_params = await initialize.loading.instantiate_class(user_id=user_id, vertex=self) + custom_component, custom_params = await initialize.loading.instantiate_class( + user_id=user_id, vertex=self, event_manager=event_manager + ) else: custom_component = self._custom_component + self._custom_component.set_event_manager(event_manager) custom_params = initialize.loading.get_params(self.params) - await self._build_results(custom_component, custom_params, fallback_to_env_vars) + await self._build_results( + custom_component=custom_component, + custom_params=custom_params, + fallback_to_env_vars=fallback_to_env_vars, + ) self._validate_built_object() @@ -755,6 +764,7 @@ class Vertex: inputs: dict[str, Any] | None = None, files: list[str] | None = None, requester: Optional["Vertex"] = None, + event_manager: EventManager | None = None, **kwargs, ) -> Any: async with self._lock: @@ -784,9 +794,9 @@ class Vertex: for step in self.steps: if step not in self.steps_ran: if inspect.iscoroutinefunction(step): - await step(user_id=user_id, **kwargs) + await step(user_id=user_id, event_manager=event_manager, **kwargs) else: - step(user_id=user_id, **kwargs) + step(user_id=user_id, event_manager=event_manager, **kwargs) self.steps_ran.append(step) self._finalize_build() diff --git a/src/backend/base/langflow/graph/vertex/types.py b/src/backend/base/langflow/graph/vertex/types.py index 0f0e0cb1a..a44c4ef12 100644 --- a/src/backend/base/langflow/graph/vertex/types.py +++ b/src/backend/base/langflow/graph/vertex/types.py @@ -199,6 +199,7 @@ class ComponentVertex(Vertex): class InterfaceVertex(ComponentVertex): def __init__(self, data: NodeData, graph): super().__init__(data, graph=graph) + self._added_message = None self.steps = [self._build, self._run] def build_stream_url(self): diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Basic Prompting (Hello, World).json b/src/backend/base/langflow/initial_setup/starter_projects/Basic Prompting (Hello, World).json index c79bdb107..7550dacb0 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Basic Prompting (Hello, World).json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Basic Prompting (Hello, World).json @@ -476,7 +476,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" + "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" }, "data_template": { "advanced": true, diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Blog Writer.json b/src/backend/base/langflow/initial_setup/starter_projects/Blog Writer.json index 24651b620..a0c2635c3 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Blog Writer.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Blog Writer.json @@ -664,7 +664,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" + "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" }, "data_template": { "advanced": true, diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Complex Agent.json b/src/backend/base/langflow/initial_setup/starter_projects/Complex Agent.json index db2db6158..2a2ec1465 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Complex Agent.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Complex Agent.json @@ -1197,7 +1197,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" + "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" }, "data_template": { "advanced": true, diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json b/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json index 5a8fe5dab..fc32aeb6d 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json @@ -554,7 +554,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" + "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" }, "data_template": { "advanced": true, diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Hierarchical Agent.json b/src/backend/base/langflow/initial_setup/starter_projects/Hierarchical Agent.json index ce0a3442a..0b92a7813 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Hierarchical Agent.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Hierarchical Agent.json @@ -901,7 +901,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" + "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" }, "data_template": { "advanced": true, diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json b/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json index 089b84c53..ac3a1f92b 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json @@ -825,7 +825,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" + "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" }, "data_template": { "advanced": true, diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Sequential Agent.json b/src/backend/base/langflow/initial_setup/starter_projects/Sequential Agent.json index 258af3ae7..24fafa050 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Sequential Agent.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Sequential Agent.json @@ -911,7 +911,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" + "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" }, "data_template": { "advanced": true, diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json index c62b2492c..48f3f6495 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json @@ -1289,7 +1289,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" + "value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n" }, "data_template": { "advanced": true, diff --git a/src/backend/base/langflow/interface/initialize/loading.py b/src/backend/base/langflow/interface/initialize/loading.py index dfdd7c9b2..35a6e23b4 100644 --- a/src/backend/base/langflow/interface/initialize/loading.py +++ b/src/backend/base/langflow/interface/initialize/loading.py @@ -8,6 +8,7 @@ from loguru import logger from pydantic import PydanticDeprecatedSince20 from langflow.custom.eval import eval_custom_component_code +from langflow.events.event_manager import EventManager from langflow.schema import Data from langflow.schema.artifact import get_artifact_type, post_process_raw from langflow.services.deps import get_tracing_service @@ -20,6 +21,7 @@ if TYPE_CHECKING: async def instantiate_class( vertex: "Vertex", user_id=None, + event_manager: EventManager | None = None, ) -> Any: """Instantiate class from module type and key, and params""" @@ -39,6 +41,8 @@ async def instantiate_class( _vertex=vertex, _tracing_service=get_tracing_service(), ) + if hasattr(custom_component, "set_event_manager"): + custom_component.set_event_manager(event_manager) return custom_component, custom_params diff --git a/src/backend/base/langflow/services/database/models/message/crud.py b/src/backend/base/langflow/services/database/models/message/crud.py new file mode 100644 index 000000000..05c566fe9 --- /dev/null +++ b/src/backend/base/langflow/services/database/models/message/crud.py @@ -0,0 +1,19 @@ +from uuid import UUID + +from langflow.services.database.models.message.model import MessageTable, MessageUpdate +from langflow.services.deps import session_scope + + +def update_message(message_id: UUID, message: MessageUpdate | dict): + if not isinstance(message, MessageUpdate): + message = MessageUpdate(**message) + with session_scope() as session: + db_message = session.get(MessageTable, message_id) + if not db_message: + raise ValueError("Message not found") + message_dict = message.model_dump(exclude_unset=True, exclude_none=True) + db_message.sqlmodel_update(message_dict) + session.add(db_message) + session.commit() + session.refresh(db_message) + return db_message diff --git a/src/backend/base/langflow/services/database/models/message/model.py b/src/backend/base/langflow/services/database/models/message/model.py index 0f2f675be..69e7458c1 100644 --- a/src/backend/base/langflow/services/database/models/message/model.py +++ b/src/backend/base/langflow/services/database/models/message/model.py @@ -36,10 +36,16 @@ class MessageBase(SQLModel): timestamp = message.timestamp if not flow_id and message.flow_id: flow_id = message.flow_id + if not isinstance(message.text, str): + # If the text is not a string, it means it could be + # async iterator so we simply add it as an empty string + message_text = "" + else: + message_text = message.text return cls( sender=message.sender, sender_name=message.sender_name, - text=message.text, + text=message_text, session_id=message.session_id, files=message.files or [], timestamp=timestamp, diff --git a/src/backend/tests/unit/events/test_event_manager.py b/src/backend/tests/unit/events/test_event_manager.py new file mode 100644 index 000000000..98d19a15e --- /dev/null +++ b/src/backend/tests/unit/events/test_event_manager.py @@ -0,0 +1,230 @@ +import asyncio +import json +import time +import uuid + +import pytest + +from langflow.events.event_manager import EventManager +from langflow.schema.log import LoggableType + + +@pytest.fixture +def client(): + pass + + +class TestEventManager: + # Registering an event with a valid name and callback using a mock callback function + def test_register_event_with_valid_name_and_callback_with_mock_callback(self): + def mock_callback(event_type: str, data: LoggableType): + pass + + queue = asyncio.Queue() + manager = EventManager(queue) + manager.register_event("on_test_event", "test_type", mock_callback) + assert "on_test_event" in manager.events + assert manager.events["on_test_event"].func == mock_callback + + # Registering an event with an empty name + + def test_register_event_with_empty_name(self): + queue = asyncio.Queue() + manager = EventManager(queue) + with pytest.raises(ValueError, match="Event name cannot be empty"): + manager.register_event("", "test_type") + + # Registering an event with a valid name and no callback + def test_register_event_with_valid_name_and_no_callback(self): + queue = asyncio.Queue() + manager = EventManager(queue) + manager.register_event("on_test_event", "test_type") + assert "on_test_event" in manager.events + assert manager.events["on_test_event"].func == manager.send_event + + # Sending an event with valid event_type and data using pytest-asyncio plugin + @pytest.mark.asyncio + async def test_sending_event_with_valid_type_and_data_asyncio_plugin(self): + async def mock_queue_put_nowait(item): + await queue.put(item) + + queue = asyncio.Queue() + queue.put_nowait = mock_queue_put_nowait + manager = EventManager(queue) + manager.register_event("on_test_event", "test_type", manager.noop) + event_type = "test_type" + data = "test_data" + manager.send_event(event_type=event_type, data=data) + await queue.join() + assert queue.empty() + + # Accessing a non-registered event callback via __getattr__ with the recommended fix + def test_accessing_non_registered_event_callback_with_recommended_fix(self): + queue = asyncio.Queue() + manager = EventManager(queue) + result = manager.__getattr__("non_registered_event") + assert result == manager.noop + + # Accessing a registered event callback via __getattr__ + def test_accessing_registered_event_callback(self): + def mock_callback(event_type: str, data: LoggableType): + pass + + queue = asyncio.Queue() + manager = EventManager(queue) + manager.register_event("on_test_event", "test_type", mock_callback) + assert manager.on_test_event.func == mock_callback + + # Handling a large number of events in the queue + def test_handling_large_number_of_events(self): + async def mock_queue_put_nowait(item): + pass + + queue = asyncio.Queue() + queue.put_nowait = mock_queue_put_nowait + manager = EventManager(queue) + + for i in range(1000): + manager.register_event(f"on_test_event_{i}", "test_type", manager.noop) + + assert len(manager.events) == 1000 + + # Testing registration of an event with an invalid name with the recommended fix + def test_register_event_with_invalid_name_fixed(self): + def mock_callback(event_type, data): + pass + + queue = asyncio.Queue() + manager = EventManager(queue) + with pytest.raises(ValueError): + manager.register_event("", "test_type", mock_callback) + with pytest.raises(ValueError): + manager.register_event("invalid_name", "test_type", mock_callback) + + # Sending an event with complex data and verifying successful event transmission + @pytest.mark.asyncio + async def test_sending_event_with_complex_data(self): + queue = asyncio.Queue() + manager = EventManager(queue) + manager.register_event("on_test_event", "test_type", manager.noop) + data = {"key": "value", "nested": [1, 2, 3]} + manager.send_event(event_type="test_type", data=data) + event_id, str_data, event_time = await queue.get() + assert event_id is not None + assert str_data is not None + assert event_time <= time.time() + + # Sending an event with None data + def test_sending_event_with_none_data(self): + queue = asyncio.Queue() + manager = EventManager(queue) + manager.register_event("on_test_event", "test_type") + assert "on_test_event" in manager.events + assert manager.events["on_test_event"].func.__name__ == "send_event" + + # Ensuring thread-safety when accessing the events dictionary + def test_thread_safety_accessing_events_dictionary(self): + def mock_callback(event_type: str, data: LoggableType): + pass + + async def register_events(manager): + manager.register_event("on_test_event_1", "test_type_1", mock_callback) + manager.register_event("on_test_event_2", "test_type_2", mock_callback) + + async def access_events(manager): + assert "on_test_event_1" in manager.events + assert "on_test_event_2" in manager.events + + queue = asyncio.Queue() + manager = EventManager(queue) + + tasks = [register_events(manager), access_events(manager)] + asyncio.get_event_loop().run_until_complete(asyncio.gather(*tasks)) + + # Checking the performance impact of frequent event registrations + def test_performance_impact_frequent_registrations(self): + async def mock_callback(event_type: str, data: LoggableType): + pass + + queue = asyncio.Queue() + manager = EventManager(queue) + for i in range(1000): + manager.register_event(f"on_test_event_{i}", "test_type", mock_callback) + assert len(manager.events) == 1000 + + # Verifying the uniqueness of event IDs for each event triggered using await with asyncio decorator + import pytest + + @pytest.mark.asyncio + async def test_event_id_uniqueness_with_await(self): + queue = asyncio.Queue() + manager = EventManager(queue) + manager.register_event("on_test_event", "test_type") + manager.on_test_event(data={"data_1": "value_1"}) + manager.on_test_event(data={"data_2": "value_2"}) + try: + event_id_1, _, _ = await queue.get() + event_id_2, _, _ = await queue.get() + except asyncio.TimeoutError: + pytest.fail("Test timed out while waiting for queue items") + + assert event_id_1 != event_id_2 + + # Ensuring the queue receives the correct event data format + @pytest.mark.asyncio + async def test_queue_receives_correct_event_data_format(self): + async def mock_queue_put_nowait(data): + pass + + async def mock_queue_get(): + return (uuid.uuid4(), b'{"event": "test_type", "data": "test_data"}\n\n', time.time()) + + queue = asyncio.Queue() + queue.put_nowait = mock_queue_put_nowait + queue.get = mock_queue_get + + manager = EventManager(queue) + manager.register_event("on_test_event", "test_type", manager.noop) + event_data = "test_data" + manager.send_event(event_type="test_type", data=event_data) + + event_id, str_data, _ = await queue.get() + assert isinstance(event_id, uuid.UUID) + assert isinstance(str_data, bytes) + assert json.loads(str_data.decode("utf-8")) == {"event": "test_type", "data": event_data} + + # Registering an event without specifying the event_type argument and providing the event_type argument + def test_register_event_without_event_type_argument_fixed(self): + class MockQueue: + def __init__(self): + self.data = [] + + def put_nowait(self, item): + self.data.append(item) + + queue = MockQueue() + event_manager = EventManager(queue) + event_manager.register_event("on_test_event", "test_event_type", callback=event_manager.noop) + event_manager.send_event(event_type="test_type", data={"key": "value"}) + + assert len(queue.data) == 1 + event_id, str_data, timestamp = queue.data[0] + assert isinstance(event_id, uuid.UUID) + assert isinstance(str_data, bytes) + assert isinstance(timestamp, float) + + # Accessing a non-registered event callback via __getattr__ + def test_accessing_non_registered_callback(self): + class MockQueue: + def __init__(self): + pass + + def put_nowait(self, item): + pass + + queue = MockQueue() + event_manager = EventManager(queue) + + # Accessing a non-registered event callback should return the 'noop' function + callback = event_manager.on_non_existing_event + assert callback.__name__ == "noop" diff --git a/src/backend/tests/unit/graph/graph/test_callback_graph.py b/src/backend/tests/unit/graph/graph/test_callback_graph.py new file mode 100644 index 000000000..89a63bfda --- /dev/null +++ b/src/backend/tests/unit/graph/graph/test_callback_graph.py @@ -0,0 +1,48 @@ +import asyncio + +from langflow.components.outputs.ChatOutput import ChatOutput +from langflow.custom.custom_component.component import Component +from langflow.events.event_manager import EventManager +from langflow.graph.graph.base import Graph +from langflow.inputs.inputs import IntInput +from langflow.schema.message import Message +from langflow.template.field.base import Output + + +class LogComponent(Component): + name = "LogComponent" + inputs = [IntInput(name="times", value=1)] + outputs = [Output(name="call_log", method="call_log_method")] + + def call_log_method(self) -> Message: + for i in range(self.times): + self.log(f"This is log message {i}", name=f"Log {i}") + return Message(text="Log called") + + +def test_callback_graph(): + logs: list[tuple[str, dict]] = [] + + def mock_callback(manager, event_type: str, data: dict): + logs.append((event_type, data)) + + event_manager = EventManager(queue=asyncio.Queue()) + event_manager.register_event("on_log", "log", callback=mock_callback) + + log_component = LogComponent(_id="log_component") + log_component.set(times=3) + chat_output = ChatOutput(_id="chat_output") + chat_output.set(sender_name=log_component.call_log_method) + graph = Graph(start=log_component, end=chat_output) + + results = list(graph.start(event_manager=event_manager)) + assert len(results) == 3 + assert len(logs) == 3 + assert all(isinstance(log, tuple) for log in logs) + assert all(isinstance(log[1], dict) for log in logs) + assert logs[0][0] == "log" + assert logs[0][1]["name"] == "Log 0" + assert logs[1][0] == "log" + assert logs[1][1]["name"] == "Log 1" + assert logs[2][0] == "log" + assert logs[2][1]["name"] == "Log 2"