feat: add EventManager to centralize callbacks (#3434)

* refactor: Update MessageBase text attribute based on isinstance check.

* feat: Add update_message function to update a message in the database.

* refactor(chat): Update imports and remove unnecessary config method in ChatComponent.

* refactor: Add stream_message method to ChatComponent.

* refactor: Update method call in ChatOutput component.

* feat: Add callback function to custom component and update build_results signature.

* feat: Add callback parameter to instantiate_class function.

* feat(graph): Add callback functions for sync and async operations.

* feat: Add callback function support to vertex build process.

* feat: Add handling for added message in InterfaceVertex class.

* feat: Add callback support to Graph methods.

* feat(chat): Add callback function to build_vertices function.

* refactor: Simplify update_message function and use session_scope for session management.

* fix: Call set_callback method if available on custom component.

* refactor(chat): Update chat message chunk handling and ID conversion.

* feat: Add null check before setting cache in build_vertex_stream function.

* refactor: Fix send_event_wrapper function and add callback parameter to _build_vertex function.

* refactor: Simplify conditional statement and import order in ChatOutput.

* refactor: move log method to Component class.

* refactor: Simplify CallbackFunction definition.

* feat: Initialize _current_output attribute in Component class.

* feat: store current output name in custom component during processing.

* feat: Add current output and component ID to log data.

* fix: Add condition to check current output before invoking callback.

* refactor: Update callback to log_callback in graph methods.

* feat: Add test for callback graph execution with log messages.

* update projects

* fix(chat.py): fix condition to check if message text is a string before updating message text in the database

* refactor(ChatOutput.py): update ChatOutput class to correctly store and assign the message value to ensure consistency and avoid potential bugs

* refactor(chat.py): update return type of store_message method to return a single Message object instead of a list of Messages
refactor(chat.py): update logic to correctly handle updating and returning a single stored message object instead of a list of messages

* update starter projects

* refactor(component.py): update type hint for name parameter in log method to be more explicit

* feat: Add EventManager class for managing events and event registration

* refactor: Update log_callback to event_manager in custom component classes

* refactor(component.py): rename _log_callback to _event_manager and update method call to on_log for better clarity and consistency

* refactor(chat.py): rename _log_callback method to _event_manager.on_token for clarity and consistency in method naming

* refactor: Rename log_callback to event_manager for clarity and consistency

* refactor: Update Vertex class to use EventManager instead of log_callback for better clarity and consistency

* refactor: update build_flow to use EventManager

* refactor: Update EventManager class to use Protocol for event callbacks

* if event_type is not passed, it uses the default send_event

* Add method to register event functions in EventManager

- Introduced `register_event_function` method to allow passing custom event functions.
- Updated `noop` method to accept `event_type` parameter.
- Adjusted `__getattr__` to return `EventCallback` type.

* update test_callback_graph

* Add unit tests for EventManager in test_event_manager.py

- Added tests for event registration, including default event type, empty string names, and specific event types.
- Added tests for custom event functions and unregistered event access.
- Added tests for event sending, including JSON formatting, empty data, and large payloads.
- Added tests for handling JSON serialization errors and the noop function.

* revert chatOutput change

* Add validation for event function in EventManager

- Introduced `_validate_event_function` method to ensure event functions are callable and have the correct parameters.
- Updated `register_event_function` to use the new validation method.

* Add tests for EventManager's event function validation logic

- Introduce `TestValidateEventFunction` class to test various scenarios for `_validate_event_function`.
- Add tests for valid event functions, non-callable event functions, invalid parameter counts, and parameter type validation.
- Include tests for handling unannotated parameters, flexible arguments, and keyword-only parameters.
- Ensure proper warnings and exceptions are raised for invalid event functions.

* Add type ignore comment to lambda function in test_event_manager.py

* refactor: Update EventManager class to use Protocol for event callbacks

* refactor(event_manager.py): simplify event registration and validation logic to enhance readability and maintainability
feat(event_manager.py): enforce event name conventions and improve callback handling for better error management

* refactor(chat.py): standardize event_manager method calls by using keyword arguments for better clarity and consistency
refactor(chat.py): extract message processing logic into separate methods for improved readability and maintainability
fix(chat.py): ensure proper handling of async iterators in message streaming
refactor(component.py): simplify event logging by removing unnecessary event name parameter in on_log method call

* update event manager tests

* Add callback validation and manager parameter in EventManager

- Introduced `_validate_callback` method to ensure callbacks are callable and have the correct parameters.
- Updated `register_event` to include `manager` parameter in the callback.

* Add support for passing callback through the Graph in test_callback_graph

* fix(event_manager.py): update EventCallback signature to include manager parameter for better context in event handling
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-09-02 10:41:19 -03:00 committed by GitHub
commit 3eaad7bc3a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 534 additions and 106 deletions

View file

@ -31,6 +31,7 @@ from langflow.api.v1.schemas import (
VertexBuildResponse,
VerticesOrderResponse,
)
from langflow.events.event_manager import EventManager, create_default_event_manager
from langflow.exceptions.component import ComponentBuildException
from langflow.graph.graph.base import Graph
from langflow.graph.utils import log_vertex_build
@ -204,7 +205,7 @@ async def build_flow(
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc
async def _build_vertex(vertex_id: str, graph: "Graph") -> VertexBuildResponse:
async def _build_vertex(vertex_id: str, graph: "Graph", event_manager: "EventManager") -> VertexBuildResponse:
flow_id_str = str(flow_id)
next_runnable_vertices = []
@ -222,6 +223,7 @@ async def build_flow(
files=files,
get_cache=chat_service.get_cache,
set_cache=chat_service.set_cache,
event_manager=event_manager,
)
result_dict = vertex_build_result.result_dict
params = vertex_build_result.params
@ -316,17 +318,13 @@ async def build_flow(
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc
def send_event(event_type: str, value: dict, queue: asyncio.Queue) -> None:
json_data = {"event": event_type, "data": value}
event_id = uuid.uuid4()
logger.debug(f"sending event {event_id}: {event_type}")
str_data = json.dumps(json_data) + "\n\n"
queue.put_nowait((event_id, str_data.encode("utf-8"), time.time()))
async def build_vertices(
vertex_id: str, graph: "Graph", queue: asyncio.Queue, client_consumed_queue: asyncio.Queue
vertex_id: str,
graph: "Graph",
client_consumed_queue: asyncio.Queue,
event_manager: "EventManager",
) -> None:
build_task = asyncio.create_task(await asyncio.to_thread(_build_vertex, vertex_id, graph))
build_task = asyncio.create_task(await asyncio.to_thread(_build_vertex, vertex_id, graph, event_manager))
try:
await build_task
except asyncio.CancelledError as exc:
@ -341,13 +339,15 @@ async def build_flow(
build_data = json.loads(vertex_build_response_json)
except Exception as exc:
raise ValueError(f"Error serializing vertex build response: {exc}") from exc
send_event("end_vertex", {"build_data": build_data}, queue)
event_manager.on_end_vertex(data={"build_data": build_data})
await client_consumed_queue.get()
if vertex_build_response.valid:
if vertex_build_response.next_vertices_ids:
tasks = []
for next_vertex_id in vertex_build_response.next_vertices_ids:
task = asyncio.create_task(build_vertices(next_vertex_id, graph, queue, client_consumed_queue))
task = asyncio.create_task(
build_vertices(next_vertex_id, graph, client_consumed_queue, event_manager)
)
tasks.append(task)
try:
await asyncio.gather(*tasks)
@ -356,7 +356,7 @@ async def build_flow(
task.cancel()
return
async def event_generator(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> None:
async def event_generator(event_manager: EventManager, client_consumed_queue: asyncio.Queue) -> None:
if not data:
# using another thread since the DB query is I/O bound
vertices_task = asyncio.create_task(await asyncio.to_thread(build_graph_and_get_order))
@ -367,9 +367,9 @@ async def build_flow(
return
except Exception as e:
if isinstance(e, HTTPException):
send_event("error", {"error": str(e.detail), "statusCode": e.status_code}, queue)
event_manager.on_error(data={"error": str(e.detail), "statusCode": e.status_code})
raise e
send_event("error", {"error": str(e)}, queue)
event_manager.on_error(data={"error": str(e)})
raise e
ids, vertices_to_run, graph = vertices_task.result()
@ -378,16 +378,16 @@ async def build_flow(
ids, vertices_to_run, graph = await build_graph_and_get_order()
except Exception as e:
if isinstance(e, HTTPException):
send_event("error", {"error": str(e.detail), "statusCode": e.status_code}, queue)
event_manager.on_error(data={"error": str(e.detail), "statusCode": e.status_code})
raise e
send_event("error", {"error": str(e)}, queue)
event_manager.on_error(data={"error": str(e)})
raise e
send_event("vertices_sorted", {"ids": ids, "to_run": vertices_to_run}, queue)
event_manager.on_vertices_sorted(data={"ids": ids, "to_run": vertices_to_run})
await client_consumed_queue.get()
tasks = []
for vertex_id in ids:
task = asyncio.create_task(build_vertices(vertex_id, graph, queue, client_consumed_queue))
task = asyncio.create_task(build_vertices(vertex_id, graph, client_consumed_queue, event_manager))
tasks.append(task)
try:
await asyncio.gather(*tasks)
@ -396,8 +396,8 @@ async def build_flow(
for task in tasks:
task.cancel()
return
send_event("end", {}, queue)
await queue.put((None, None, time.time))
event_manager.on_end(data={})
await event_manager.queue.put((None, None, time.time))
async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> typing.AsyncGenerator:
while True:
@ -414,7 +414,8 @@ async def build_flow(
asyncio_queue: asyncio.Queue = asyncio.Queue()
asyncio_queue_client_consumed: asyncio.Queue = asyncio.Queue()
main_task = asyncio.create_task(event_generator(asyncio_queue, asyncio_queue_client_consumed))
event_manager = create_default_event_manager(queue=asyncio_queue)
main_task = asyncio.create_task(event_generator(event_manager, asyncio_queue_client_consumed))
def on_disconnect():
logger.debug("Client disconnected, closing tasks")
@ -640,6 +641,7 @@ async def build_vertex_stream(
flow_id_str = str(flow_id)
async def stream_vertex():
graph = None
try:
cache = await chat_service.get_cache(flow_id_str)
if not cache:
@ -693,7 +695,8 @@ async def build_vertex_stream(
yield str(StreamData(event="error", data={"error": exc_message}))
finally:
logger.debug("Closing stream")
await chat_service.set_cache(flow_id_str, graph)
if graph:
await chat_service.set_cache(flow_id_str, graph)
yield str(StreamData(event="close", data={"message": "Stream closed"}))
return StreamingResponse(stream_vertex(), media_type="text/event-stream")

View file

@ -1,67 +1,70 @@
from typing import Optional, Union
from typing import AsyncIterator, Iterator, Optional, Union
from langflow.base.data.utils import IMG_FILE_TYPES, TEXT_FILE_TYPES
from langflow.custom import Component
from langflow.memory import store_message
from langflow.schema import Data
from langflow.schema.message import Message
from langflow.utils.constants import MESSAGE_SENDER_USER, MESSAGE_SENDER_AI
from langflow.services.database.models.message.crud import update_message
from langflow.utils.async_helpers import run_until_complete
class ChatComponent(Component):
display_name = "Chat Component"
description = "Use as base for chat components."
def build_config(self):
return {
"input_value": {
"input_types": ["Text"],
"display_name": "Text",
"multiline": True,
},
"sender": {
"options": [MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],
"display_name": "Sender Type",
"advanced": True,
},
"sender_name": {"display_name": "Sender Name", "advanced": True},
"session_id": {
"display_name": "Session ID",
"info": "If provided, the message will be stored in the memory.",
"advanced": True,
},
"return_message": {
"display_name": "Return Message",
"info": "Return the message as a Message containing the sender, sender_name, and session_id.",
"advanced": True,
},
"data_template": {
"display_name": "Data Template",
"multiline": True,
"info": "In case of Message being a Data, this template will be used to convert it to text.",
"advanced": True,
},
"files": {
"field_type": "file",
"display_name": "Files",
"file_types": TEXT_FILE_TYPES + IMG_FILE_TYPES,
"info": "Files to be sent with the message.",
"advanced": True,
},
}
# Keep this method for backward compatibility
def store_message(
self,
message: Message,
) -> list[Message]:
) -> Message:
messages = store_message(
message,
flow_id=self.graph.flow_id,
)
if len(messages) > 1:
raise ValueError("Only one message can be stored at a time.")
stored_message = messages[0]
if hasattr(self, "_event_manager") and self._event_manager and stored_message.id:
if not isinstance(message.text, str):
complete_message = self._stream_message(message, stored_message.id)
message_table = update_message(message_id=stored_message.id, message=dict(text=complete_message))
stored_message = Message(**message_table.model_dump())
self.vertex._added_message = stored_message
self.status = stored_message
return stored_message
self.status = messages
return messages
def _process_chunk(self, chunk: str, complete_message: str, message: Message, message_id: str) -> str:
complete_message += chunk
data = {
"text": complete_message,
"chunk": chunk,
"sender": message.sender,
"sender_name": message.sender_name,
"id": str(message_id),
}
if self._event_manager:
self._event_manager.on_token(data=data)
return complete_message
async def _handle_async_iterator(self, iterator: AsyncIterator, message: Message, message_id: str) -> str:
complete_message = ""
async for chunk in iterator:
complete_message = self._process_chunk(chunk.content, complete_message, message, message_id)
return complete_message
def _stream_message(self, message: Message, message_id: str) -> str:
iterator = message.text
if not isinstance(iterator, (AsyncIterator, Iterator)):
raise ValueError("The message must be an iterator or an async iterator.")
if isinstance(iterator, AsyncIterator):
return run_until_complete(self._handle_async_iterator(iterator, message, message_id))
complete_message = ""
for chunk in iterator:
complete_message = self._process_chunk(chunk.content, complete_message, message, message_id)
return complete_message
def build_with_data(
self,

View file

@ -3,7 +3,7 @@ from langflow.inputs import BoolInput
from langflow.io import DropdownInput, MessageTextInput, Output
from langflow.memory import store_message
from langflow.schema.message import Message
from langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI
from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER
class ChatOutput(ChatComponent):

View file

@ -1,17 +1,19 @@
import inspect
from collections.abc import Callable
from copy import deepcopy
from typing import TYPE_CHECKING, Any, ClassVar, get_type_hints
from collections.abc import Callable
from uuid import UUID
import nanoid # type: ignore
import yaml
from pydantic import BaseModel
from langflow.events.event_manager import EventManager
from langflow.graph.state.model import create_state_model
from langflow.helpers.custom import format_type
from langflow.schema.artifact import get_artifact_type, post_process_raw
from langflow.schema.data import Data
from langflow.schema.log import LoggableType
from langflow.schema.message import Message
from langflow.services.tracing.schema import Log
from langflow.template.field.base import UNDEFINED, Input, Output
@ -35,6 +37,7 @@ class Component(CustomComponent):
outputs: list[Output] = []
code_class_base_inheritance: ClassVar[str] = "Component"
_output_logs: dict[str, Log] = {}
_current_output: str = ""
def __init__(self, **kwargs):
# if key starts with _ it is a config
@ -56,6 +59,8 @@ class Component(CustomComponent):
self._parameters = inputs or {}
self._edges: list[EdgeData] = []
self._components: list[Component] = []
self._current_output = ""
self._event_manager: EventManager | None = None
self._state_model = None
self.set_attributes(self._parameters)
self._output_logs = {}
@ -77,6 +82,9 @@ class Component(CustomComponent):
self._set_output_types()
self.set_class_code()
def set_event_manager(self, event_manager: EventManager | None = None):
self._event_manager = event_manager
def _reset_all_output_values(self):
for output in self.outputs:
setattr(output, "value", UNDEFINED)
@ -601,7 +609,9 @@ class Component(CustomComponent):
async def _build_without_tracing(self):
return await self._build_results()
async def build_results(self):
async def build_results(
self,
):
if self._tracing_service:
return await self._build_with_tracing()
return await self._build_without_tracing()
@ -620,6 +630,7 @@ class Component(CustomComponent):
):
if output.method is None:
raise ValueError(f"Output {output.name} does not have a method defined.")
self._current_output = output.name
method: Callable = getattr(self, output.method)
if output.cache and output.value != UNDEFINED:
_results[output.name] = output.value
@ -638,6 +649,7 @@ class Component(CustomComponent):
result.set_flow_id(self._vertex.graph.flow_id)
_results[output.name] = result
output.value = result
custom_repr = self.custom_repr()
if custom_repr is None and isinstance(result, (dict, Data, str)):
custom_repr = result
@ -665,6 +677,7 @@ class Component(CustomComponent):
_artifacts[output.name] = artifact
self._output_logs[output.name] = self._logs
self._logs = []
self._current_output = ""
self._artifacts = _artifacts
self._results = _results
if self._tracing_service:
@ -720,6 +733,25 @@ class Component(CustomComponent):
return ComponentTool(component=self)
def get_project_name(self):
if hasattr(self, "_tracing_service"):
if hasattr(self, "_tracing_service") and self._tracing_service:
return self._tracing_service.project_name
return "Langflow"
def log(self, message: LoggableType | list[LoggableType], name: str | None = None):
"""
Logs a message.
Args:
message (LoggableType | list[LoggableType]): The message to log.
"""
if name is None:
name = f"Log {len(self._logs) + 1}"
log = Log(message=message, type=get_artifact_type(message), name=name)
self._logs.append(log)
if self._tracing_service and self._vertex:
self._tracing_service.add_log(trace_name=self.trace_name, log=log)
if self._event_manager is not None and self._current_output:
data = log.model_dump()
data["output"] = self._current_output
data["component_id"] = self._id
self._event_manager.on_log(data=data)

View file

@ -1,6 +1,6 @@
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Optional
from collections.abc import Callable, Sequence
import yaml
from cachetools import TTLCache
@ -10,9 +10,7 @@ from pydantic import BaseModel
from langflow.custom.custom_component.base_component import BaseComponent
from langflow.helpers.flow import list_flows, load_flow, run_flow
from langflow.schema import Data
from langflow.schema.artifact import get_artifact_type
from langflow.schema.dotdict import dotdict
from langflow.schema.log import LoggableType
from langflow.schema.schema import OutputValue
from langflow.services.deps import get_storage_service, get_variable_service, session_scope
from langflow.services.storage.service import StorageService
@ -509,20 +507,6 @@ class CustomComponent(BaseComponent):
"""
raise NotImplementedError
def log(self, message: LoggableType | list[LoggableType], name: str | None = None):
"""
Logs a message.
Args:
message (LoggableType | list[LoggableType]): The message to log.
"""
if name is None:
name = f"Log {len(self._logs) + 1}"
log = Log(message=message, type=get_artifact_type(message), name=name)
self._logs.append(log)
if self._tracing_service and self._vertex:
self._tracing_service.add_log(trace_name=self.trace_name, log=log)
def post_code_processing(self, new_frontend_node: dict, current_frontend_node: dict):
"""
This function is called after the code validation is done.

View file

@ -0,0 +1,69 @@
import asyncio
import inspect
import json
import time
import uuid
from functools import partial
from typing_extensions import Protocol
from langflow.schema.log import LoggableType
class EventCallback(Protocol):
def __call__(self, *, manager: "EventManager", event_type: str, data: LoggableType): ...
class PartialEventCallback(Protocol):
def __call__(self, *, data: LoggableType): ...
class EventManager:
def __init__(self, queue: asyncio.Queue):
self.queue = queue
self.events: dict[str, PartialEventCallback] = {}
@staticmethod
def _validate_callback(callback: EventCallback):
if not callable(callback):
raise ValueError("Callback must be callable")
# Check if it has `self, event_type and data`
sig = inspect.signature(callback)
if len(sig.parameters) != 3:
raise ValueError("Callback must have exactly 3 parameters")
if not all(param.name in ["manager", "event_type", "data"] for param in sig.parameters.values()):
raise ValueError("Callback must have exactly 3 parameters: manager, event_type, and data")
def register_event(self, name: str, event_type: str, callback: EventCallback | None = None):
if not name:
raise ValueError("Event name cannot be empty")
if not name.startswith("on_"):
raise ValueError("Event name must start with 'on_'")
if callback is None:
_callback = partial(self.send_event, event_type=event_type)
else:
_callback = partial(callback, manager=self, event_type=event_type)
self.events[name] = _callback
def send_event(self, *, event_type: str, data: LoggableType):
json_data = {"event": event_type, "data": data}
event_id = uuid.uuid4()
str_data = json.dumps(json_data) + "\n\n"
self.queue.put_nowait((event_id, str_data.encode("utf-8"), time.time()))
def noop(self, *, data: LoggableType):
pass
def __getattr__(self, name: str) -> PartialEventCallback:
return self.events.get(name, self.noop)
def create_default_event_manager(queue):
manager = EventManager(queue)
manager.register_event("on_token", "token")
manager.register_event("on_vertices_sorted", "vertices_sorted")
manager.register_event("on_error", "error")
manager.register_event("on_end", "end")
manager.register_event("on_message", "message")
manager.register_event("on_end_vertex", "end_vertex")
return manager

View file

@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Optional, cast
import nest_asyncio
from loguru import logger
from langflow.events.event_manager import EventManager
from langflow.exceptions.component import ComponentBuildException
from langflow.graph.edge.base import CycleEdge, Edge
from langflow.graph.edge.schema import EdgeData
@ -277,7 +278,12 @@ class Graph:
}
self._add_edge(edge_data)
async def async_start(self, inputs: list[dict] | None = None, max_iterations: int | None = None):
async def async_start(
self,
inputs: list[dict] | None = None,
max_iterations: int | None = None,
event_manager: EventManager | None = None,
):
if not self._prepared:
raise ValueError("Graph not prepared. Call prepare() first.")
# The idea is for this to return a generator that yields the result of
@ -291,7 +297,7 @@ class Graph:
yielded_counts: dict[str, int] = defaultdict(int)
while should_continue(yielded_counts, max_iterations):
result = await self.astep()
result = await self.astep(event_manager=event_manager)
yield result
if hasattr(result, "vertex"):
yielded_counts[result.vertex.id] += 1
@ -322,13 +328,14 @@ class Graph:
inputs: list[dict] | None = None,
max_iterations: int | None = None,
config: StartConfigDict | None = None,
event_manager: EventManager | None = None,
) -> Generator:
if config is not None:
self.__apply_config(config)
#! Change this ASAP
nest_asyncio.apply()
loop = asyncio.get_event_loop()
async_gen = self.async_start(inputs, max_iterations)
async_gen = self.async_start(inputs, max_iterations, event_manager)
async_gen_task = asyncio.ensure_future(async_gen.__anext__())
while True:
@ -1200,6 +1207,7 @@ class Graph:
inputs: Optional["InputValueRequest"] = None,
files: list[str] | None = None,
user_id: str | None = None,
event_manager: EventManager | None = None,
):
if not self._prepared:
raise ValueError("Graph not prepared. Call prepare() first.")
@ -1215,6 +1223,7 @@ class Graph:
files=files,
get_cache=chat_service.get_cache,
set_cache=chat_service.set_cache,
event_manager=event_manager,
)
next_runnable_vertices = await self.get_next_runnable_vertices(
@ -1266,6 +1275,7 @@ class Graph:
files: list[str] | None = None,
user_id: str | None = None,
fallback_to_env_vars: bool = False,
event_manager: EventManager | None = None,
) -> VertexBuildResult:
"""
Builds a vertex in the graph.
@ -1320,7 +1330,11 @@ class Graph:
if should_build:
await vertex.build(
user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files
user_id=user_id,
inputs=inputs_dict,
fallback_to_env_vars=fallback_to_env_vars,
files=files,
event_manager=event_manager,
)
if set_cache is not None:
vertex_dict = {

View file

@ -1,9 +1,10 @@
from typing import TYPE_CHECKING, NamedTuple
from typing import TYPE_CHECKING, NamedTuple, Protocol
from typing_extensions import NotRequired, TypedDict
from langflow.graph.edge.schema import EdgeData
from langflow.graph.vertex.schema import NodeData
from langflow.schema.log import LoggableType
if TYPE_CHECKING:
from langflow.graph.schema import ResultData
@ -44,3 +45,7 @@ class OutputConfigDict(TypedDict):
class StartConfigDict(TypedDict):
output: OutputConfigDict
class LogCallbackFunction(Protocol):
def __call__(self, event_name: str, log: LoggableType) -> None: ...

View file

@ -11,6 +11,7 @@ from typing import TYPE_CHECKING, Any, Optional
import pandas as pd
from loguru import logger
from langflow.events.event_manager import EventManager
from langflow.exceptions.component import ComponentBuildException
from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction
@ -451,6 +452,7 @@ class Vertex:
self,
fallback_to_env_vars,
user_id=None,
event_manager: EventManager | None = None,
):
"""
Initiate the build process.
@ -462,12 +464,19 @@ class Vertex:
raise ValueError(f"Base type for vertex {self.display_name} not found")
if not self._custom_component:
custom_component, custom_params = await initialize.loading.instantiate_class(user_id=user_id, vertex=self)
custom_component, custom_params = await initialize.loading.instantiate_class(
user_id=user_id, vertex=self, event_manager=event_manager
)
else:
custom_component = self._custom_component
self._custom_component.set_event_manager(event_manager)
custom_params = initialize.loading.get_params(self.params)
await self._build_results(custom_component, custom_params, fallback_to_env_vars)
await self._build_results(
custom_component=custom_component,
custom_params=custom_params,
fallback_to_env_vars=fallback_to_env_vars,
)
self._validate_built_object()
@ -755,6 +764,7 @@ class Vertex:
inputs: dict[str, Any] | None = None,
files: list[str] | None = None,
requester: Optional["Vertex"] = None,
event_manager: EventManager | None = None,
**kwargs,
) -> Any:
async with self._lock:
@ -784,9 +794,9 @@ class Vertex:
for step in self.steps:
if step not in self.steps_ran:
if inspect.iscoroutinefunction(step):
await step(user_id=user_id, **kwargs)
await step(user_id=user_id, event_manager=event_manager, **kwargs)
else:
step(user_id=user_id, **kwargs)
step(user_id=user_id, event_manager=event_manager, **kwargs)
self.steps_ran.append(step)
self._finalize_build()

View file

@ -199,6 +199,7 @@ class ComponentVertex(Vertex):
class InterfaceVertex(ComponentVertex):
def __init__(self, data: NodeData, graph):
super().__init__(data, graph=graph)
self._added_message = None
self.steps = [self._build, self._run]
def build_stream_url(self):

View file

@ -476,7 +476,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
},
"data_template": {
"advanced": true,

View file

@ -664,7 +664,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
},
"data_template": {
"advanced": true,

View file

@ -1197,7 +1197,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
},
"data_template": {
"advanced": true,

View file

@ -554,7 +554,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
},
"data_template": {
"advanced": true,

View file

@ -901,7 +901,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
},
"data_template": {
"advanced": true,

View file

@ -825,7 +825,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
},
"data_template": {
"advanced": true,

View file

@ -911,7 +911,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
},
"data_template": {
"advanced": true,

View file

@ -1289,7 +1289,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER, MESSAGE_SENDER_AI\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
"value": "from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\nfrom langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n"
},
"data_template": {
"advanced": true,

View file

@ -8,6 +8,7 @@ from loguru import logger
from pydantic import PydanticDeprecatedSince20
from langflow.custom.eval import eval_custom_component_code
from langflow.events.event_manager import EventManager
from langflow.schema import Data
from langflow.schema.artifact import get_artifact_type, post_process_raw
from langflow.services.deps import get_tracing_service
@ -20,6 +21,7 @@ if TYPE_CHECKING:
async def instantiate_class(
vertex: "Vertex",
user_id=None,
event_manager: EventManager | None = None,
) -> Any:
"""Instantiate class from module type and key, and params"""
@ -39,6 +41,8 @@ async def instantiate_class(
_vertex=vertex,
_tracing_service=get_tracing_service(),
)
if hasattr(custom_component, "set_event_manager"):
custom_component.set_event_manager(event_manager)
return custom_component, custom_params

View file

@ -0,0 +1,19 @@
from uuid import UUID
from langflow.services.database.models.message.model import MessageTable, MessageUpdate
from langflow.services.deps import session_scope
def update_message(message_id: UUID, message: MessageUpdate | dict):
if not isinstance(message, MessageUpdate):
message = MessageUpdate(**message)
with session_scope() as session:
db_message = session.get(MessageTable, message_id)
if not db_message:
raise ValueError("Message not found")
message_dict = message.model_dump(exclude_unset=True, exclude_none=True)
db_message.sqlmodel_update(message_dict)
session.add(db_message)
session.commit()
session.refresh(db_message)
return db_message

View file

@ -36,10 +36,16 @@ class MessageBase(SQLModel):
timestamp = message.timestamp
if not flow_id and message.flow_id:
flow_id = message.flow_id
if not isinstance(message.text, str):
# If the text is not a string, it means it could be
# async iterator so we simply add it as an empty string
message_text = ""
else:
message_text = message.text
return cls(
sender=message.sender,
sender_name=message.sender_name,
text=message.text,
text=message_text,
session_id=message.session_id,
files=message.files or [],
timestamp=timestamp,

View file

@ -0,0 +1,230 @@
import asyncio
import json
import time
import uuid
import pytest
from langflow.events.event_manager import EventManager
from langflow.schema.log import LoggableType
@pytest.fixture
def client():
pass
class TestEventManager:
# Registering an event with a valid name and callback using a mock callback function
def test_register_event_with_valid_name_and_callback_with_mock_callback(self):
def mock_callback(event_type: str, data: LoggableType):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", mock_callback)
assert "on_test_event" in manager.events
assert manager.events["on_test_event"].func == mock_callback
# Registering an event with an empty name
def test_register_event_with_empty_name(self):
queue = asyncio.Queue()
manager = EventManager(queue)
with pytest.raises(ValueError, match="Event name cannot be empty"):
manager.register_event("", "test_type")
# Registering an event with a valid name and no callback
def test_register_event_with_valid_name_and_no_callback(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type")
assert "on_test_event" in manager.events
assert manager.events["on_test_event"].func == manager.send_event
# Sending an event with valid event_type and data using pytest-asyncio plugin
@pytest.mark.asyncio
async def test_sending_event_with_valid_type_and_data_asyncio_plugin(self):
async def mock_queue_put_nowait(item):
await queue.put(item)
queue = asyncio.Queue()
queue.put_nowait = mock_queue_put_nowait
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", manager.noop)
event_type = "test_type"
data = "test_data"
manager.send_event(event_type=event_type, data=data)
await queue.join()
assert queue.empty()
# Accessing a non-registered event callback via __getattr__ with the recommended fix
def test_accessing_non_registered_event_callback_with_recommended_fix(self):
queue = asyncio.Queue()
manager = EventManager(queue)
result = manager.__getattr__("non_registered_event")
assert result == manager.noop
# Accessing a registered event callback via __getattr__
def test_accessing_registered_event_callback(self):
def mock_callback(event_type: str, data: LoggableType):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", mock_callback)
assert manager.on_test_event.func == mock_callback
# Handling a large number of events in the queue
def test_handling_large_number_of_events(self):
async def mock_queue_put_nowait(item):
pass
queue = asyncio.Queue()
queue.put_nowait = mock_queue_put_nowait
manager = EventManager(queue)
for i in range(1000):
manager.register_event(f"on_test_event_{i}", "test_type", manager.noop)
assert len(manager.events) == 1000
# Testing registration of an event with an invalid name with the recommended fix
def test_register_event_with_invalid_name_fixed(self):
def mock_callback(event_type, data):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
with pytest.raises(ValueError):
manager.register_event("", "test_type", mock_callback)
with pytest.raises(ValueError):
manager.register_event("invalid_name", "test_type", mock_callback)
# Sending an event with complex data and verifying successful event transmission
@pytest.mark.asyncio
async def test_sending_event_with_complex_data(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", manager.noop)
data = {"key": "value", "nested": [1, 2, 3]}
manager.send_event(event_type="test_type", data=data)
event_id, str_data, event_time = await queue.get()
assert event_id is not None
assert str_data is not None
assert event_time <= time.time()
# Sending an event with None data
def test_sending_event_with_none_data(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type")
assert "on_test_event" in manager.events
assert manager.events["on_test_event"].func.__name__ == "send_event"
# Ensuring thread-safety when accessing the events dictionary
def test_thread_safety_accessing_events_dictionary(self):
def mock_callback(event_type: str, data: LoggableType):
pass
async def register_events(manager):
manager.register_event("on_test_event_1", "test_type_1", mock_callback)
manager.register_event("on_test_event_2", "test_type_2", mock_callback)
async def access_events(manager):
assert "on_test_event_1" in manager.events
assert "on_test_event_2" in manager.events
queue = asyncio.Queue()
manager = EventManager(queue)
tasks = [register_events(manager), access_events(manager)]
asyncio.get_event_loop().run_until_complete(asyncio.gather(*tasks))
# Checking the performance impact of frequent event registrations
def test_performance_impact_frequent_registrations(self):
async def mock_callback(event_type: str, data: LoggableType):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
for i in range(1000):
manager.register_event(f"on_test_event_{i}", "test_type", mock_callback)
assert len(manager.events) == 1000
# Verifying the uniqueness of event IDs for each event triggered using await with asyncio decorator
import pytest
@pytest.mark.asyncio
async def test_event_id_uniqueness_with_await(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type")
manager.on_test_event(data={"data_1": "value_1"})
manager.on_test_event(data={"data_2": "value_2"})
try:
event_id_1, _, _ = await queue.get()
event_id_2, _, _ = await queue.get()
except asyncio.TimeoutError:
pytest.fail("Test timed out while waiting for queue items")
assert event_id_1 != event_id_2
# Ensuring the queue receives the correct event data format
@pytest.mark.asyncio
async def test_queue_receives_correct_event_data_format(self):
async def mock_queue_put_nowait(data):
pass
async def mock_queue_get():
return (uuid.uuid4(), b'{"event": "test_type", "data": "test_data"}\n\n', time.time())
queue = asyncio.Queue()
queue.put_nowait = mock_queue_put_nowait
queue.get = mock_queue_get
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", manager.noop)
event_data = "test_data"
manager.send_event(event_type="test_type", data=event_data)
event_id, str_data, _ = await queue.get()
assert isinstance(event_id, uuid.UUID)
assert isinstance(str_data, bytes)
assert json.loads(str_data.decode("utf-8")) == {"event": "test_type", "data": event_data}
# Registering an event without specifying the event_type argument and providing the event_type argument
def test_register_event_without_event_type_argument_fixed(self):
class MockQueue:
def __init__(self):
self.data = []
def put_nowait(self, item):
self.data.append(item)
queue = MockQueue()
event_manager = EventManager(queue)
event_manager.register_event("on_test_event", "test_event_type", callback=event_manager.noop)
event_manager.send_event(event_type="test_type", data={"key": "value"})
assert len(queue.data) == 1
event_id, str_data, timestamp = queue.data[0]
assert isinstance(event_id, uuid.UUID)
assert isinstance(str_data, bytes)
assert isinstance(timestamp, float)
# Accessing a non-registered event callback via __getattr__
def test_accessing_non_registered_callback(self):
class MockQueue:
def __init__(self):
pass
def put_nowait(self, item):
pass
queue = MockQueue()
event_manager = EventManager(queue)
# Accessing a non-registered event callback should return the 'noop' function
callback = event_manager.on_non_existing_event
assert callback.__name__ == "noop"

View file

@ -0,0 +1,48 @@
import asyncio
from langflow.components.outputs.ChatOutput import ChatOutput
from langflow.custom.custom_component.component import Component
from langflow.events.event_manager import EventManager
from langflow.graph.graph.base import Graph
from langflow.inputs.inputs import IntInput
from langflow.schema.message import Message
from langflow.template.field.base import Output
class LogComponent(Component):
name = "LogComponent"
inputs = [IntInput(name="times", value=1)]
outputs = [Output(name="call_log", method="call_log_method")]
def call_log_method(self) -> Message:
for i in range(self.times):
self.log(f"This is log message {i}", name=f"Log {i}")
return Message(text="Log called")
def test_callback_graph():
logs: list[tuple[str, dict]] = []
def mock_callback(manager, event_type: str, data: dict):
logs.append((event_type, data))
event_manager = EventManager(queue=asyncio.Queue())
event_manager.register_event("on_log", "log", callback=mock_callback)
log_component = LogComponent(_id="log_component")
log_component.set(times=3)
chat_output = ChatOutput(_id="chat_output")
chat_output.set(sender_name=log_component.call_log_method)
graph = Graph(start=log_component, end=chat_output)
results = list(graph.start(event_manager=event_manager))
assert len(results) == 3
assert len(logs) == 3
assert all(isinstance(log, tuple) for log in logs)
assert all(isinstance(log[1], dict) for log in logs)
assert logs[0][0] == "log"
assert logs[0][1]["name"] == "Log 0"
assert logs[1][0] == "log"
assert logs[1][1]["name"] == "Log 1"
assert logs[2][0] == "log"
assert logs[2][1]["name"] == "Log 2"