diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py
index 80eafb1b9..9cda7628e 100644
--- a/src/backend/langflow/api/v1/chat.py
+++ b/src/backend/langflow/api/v1/chat.py
@@ -1,6 +1,15 @@
import time
+from typing import Optional
-from fastapi import APIRouter, Body, Depends, HTTPException, WebSocket, WebSocketException, status
+from fastapi import (
+ APIRouter,
+ Body,
+ Depends,
+ HTTPException,
+ WebSocket,
+ WebSocketException,
+ status,
+)
from fastapi.responses import StreamingResponse
from langflow.api.utils import build_input_keys_response, format_elapsed_time
from langflow.api.v1.schemas import (
@@ -15,7 +24,10 @@ from langflow.api.v1.schemas import (
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import StatelessVertex
from langflow.processing.process import process_tweaks_on_graph
-from langflow.services.auth.utils import get_current_active_user, get_current_user_for_websocket
+from langflow.services.auth.utils import (
+ get_current_active_user,
+ get_current_user_for_websocket,
+)
from langflow.services.cache.service import BaseCacheService
from langflow.services.cache.utils import update_build_status
from langflow.services.chat.service import ChatService
@@ -40,9 +52,13 @@ async def chat(
user = await get_current_user_for_websocket(websocket, db)
await websocket.accept()
if not user:
- await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized")
+ await websocket.close(
+ code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized"
+ )
elif not user.is_active:
- await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized")
+ await websocket.close(
+ code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized"
+ )
if client_id in chat_service.cache_service:
await chat_service.handle_websocket(client_id, websocket)
@@ -58,7 +74,9 @@ async def chat(
logger.error(f"Error in chat websocket: {exc}")
messsage = exc.detail if isinstance(exc, HTTPException) else str(exc)
if "Could not validate credentials" in str(exc):
- await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized")
+ await websocket.close(
+ code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized"
+ )
else:
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=messsage)
@@ -100,10 +118,15 @@ async def init_build(
@router.get("/build/{flow_id}/status", response_model=BuiltResponse)
-async def build_status(flow_id: str, cache_service: "BaseCacheService" = Depends(get_cache_service)):
+async def build_status(
+ flow_id: str, cache_service: "BaseCacheService" = Depends(get_cache_service)
+):
"""Check the flow_id is in the cache_service."""
try:
- built = flow_id in cache_service and cache_service[flow_id]["status"] == BuildStatus.SUCCESS
+ built = (
+ flow_id in cache_service
+ and cache_service[flow_id]["status"] == BuildStatus.SUCCESS
+ )
return BuiltResponse(
built=built,
@@ -174,7 +197,9 @@ async def stream_build(
valid = True
logger.debug(f"Building node {str(vertex.vertex_type)}")
- logger.debug(f"Output: {params[:100]}{'...' if len(params) > 100 else ''}")
+ logger.debug(
+ f"Output: {params[:100]}{'...' if len(params) > 100 else ''}"
+ )
if vertex.artifacts:
# The artifacts will be prompt variables
# passed to build_input_keys_response
@@ -187,7 +212,9 @@ async def stream_build(
time_elapsed = format_elapsed_time(time.perf_counter() - start_time)
update_build_status(cache_service, flow_id, BuildStatus.FAILURE)
- vertex_id = vertex.parent_node_id if vertex.parent_is_top_level else vertex.id
+ vertex_id = (
+ vertex.parent_node_id if vertex.parent_is_top_level else vertex.id
+ )
if vertex_id in graph.top_level_vertices:
response = {
"valid": valid,
@@ -202,7 +229,9 @@ async def stream_build(
langchain_object = await graph.build()
# Now we need to check the input_keys to send them to the client
if hasattr(langchain_object, "input_keys"):
- input_keys_response = build_input_keys_response(langchain_object, artifacts)
+ input_keys_response = build_input_keys_response(
+ langchain_object, artifacts
+ )
else:
input_keys_response = {
"input_keys": None,
@@ -249,17 +278,31 @@ async def try_running_celery_task(vertex, user_id):
@router.get("/build/{flow_id}/vertices", response_model=VerticesOrderResponse)
async def get_vertices(
flow_id: str,
+ component_id: Optional[str] = None,
chat_service: "ChatService" = Depends(get_chat_service),
session=Depends(get_session),
):
"""Check the flow_id is in the flow_data_store."""
try:
+ # First, we need to check if the flow_id is in the cache
+ graph = None
+ if cache := chat_service.get_cache(flow_id):
+ graph: Graph = cache.get("result")
+
flow: Flow = session.get(Flow, flow_id)
if not flow or not flow.data:
raise ValueError("Invalid flow ID")
- graph = Graph.from_payload(flow.data)
+ other_graph = Graph.from_payload(flow.data)
+ if graph is None:
+ graph = other_graph
+ else:
+ graph = graph.update(other_graph)
chat_service.set_cache(flow_id, graph)
- vertices = graph.layered_topological_sort()
+
+ if component_id:
+ vertices = graph.sort_up_to_vertex(component_id)
+ else:
+ vertices = graph.layered_topological_sort()
# Now vertices is a list of lists
# We need to get the id of each vertex
# and return the same structure but only with the ids
@@ -276,7 +319,7 @@ async def build_vertex(
flow_id: str,
vertex_id: str,
chat_service: "ChatService" = Depends(get_chat_service),
- # current_user=Depends(get_current_active_user),
+ current_user=Depends(get_current_active_user),
tweaks: dict = Body(None),
inputs: dict = Body(None),
):
@@ -295,7 +338,7 @@ async def build_vertex(
raise ValueError("Invalid vertex")
try:
if isinstance(vertex, StatelessVertex) or not vertex._built:
- await vertex.build(user_id=None)
+ await vertex.build(user_id=current_user.id)
params = vertex._built_object_repr()
valid = True
result_dict = vertex.get_built_result()
@@ -305,13 +348,21 @@ async def build_vertex(
artifacts = vertex.artifacts
timedelta = time.perf_counter() - start_time
duration = format_elapsed_time(timedelta)
- result_dict = ResultDict(results=result_dict, artifacts=artifacts, duration=duration, timedelta=timedelta)
+ result_dict = ResultDict(
+ results=result_dict,
+ artifacts=artifacts,
+ duration=duration,
+ timedelta=timedelta,
+ )
+ chat_service.set_cache(flow_id, graph)
except Exception as exc:
params = str(exc)
valid = False
result_dict = ResultDict(results={})
artifacts = {}
- chat_service.set_cache(flow_id, graph)
+ # If there's an error building the vertex
+ # we need to clear the cache
+ chat_service.clear_cache(flow_id)
await log_vertex_build(
flow_id=flow_id,
vertex_id=vertex_id,
diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py
index 066934024..e84e65b51 100644
--- a/src/backend/langflow/api/v1/schemas.py
+++ b/src/backend/langflow/api/v1/schemas.py
@@ -161,7 +161,9 @@ class StreamData(BaseModel):
data: dict
def __str__(self) -> str:
- return f"event: {self.event}\ndata: {orjson_dumps(self.data, indent_2=False)}\n\n"
+ return (
+ f"event: {self.event}\ndata: {orjson_dumps(self.data, indent_2=False)}\n\n"
+ )
class CustomComponentCode(BaseModel):
@@ -246,4 +248,3 @@ class VertexBuildResponse(BaseModel):
class VerticesBuiltResponse(BaseModel):
vertices: List[VertexBuildResponse]
- vertices: List[VertexBuildResponse]
diff --git a/src/backend/langflow/components/textsplitters/CharacterTextSplitter.py b/src/backend/langflow/components/textsplitters/CharacterTextSplitter.py
index 1bfd2d2e3..d165f47fd 100644
--- a/src/backend/langflow/components/textsplitters/CharacterTextSplitter.py
+++ b/src/backend/langflow/components/textsplitters/CharacterTextSplitter.py
@@ -24,6 +24,8 @@ class CharacterTextSplitterComponent(CustomComponent):
chunk_size: int = 1000,
separator: str = "\n",
) -> List[Document]:
+ # separator may come escaped from the frontend
+ separator = separator.encode().decode("unicode_escape")
docs = CharacterTextSplitter(
chunk_overlap=chunk_overlap,
chunk_size=chunk_size,
diff --git a/src/backend/langflow/components/vectorstores/Chroma.py b/src/backend/langflow/components/vectorstores/Chroma.py
index 6eb8fedd6..a031f090f 100644
--- a/src/backend/langflow/components/vectorstores/Chroma.py
+++ b/src/backend/langflow/components/vectorstores/Chroma.py
@@ -17,6 +17,7 @@ class ChromaComponent(CustomComponent):
description: str = "Implementation of Vector Store using Chroma"
documentation = "https://python.langchain.com/docs/integrations/vectorstores/chroma"
beta: bool = True
+ icon = "Chroma"
def build_config(self):
"""
@@ -28,7 +29,7 @@ class ChromaComponent(CustomComponent):
return {
"collection_name": {"display_name": "Collection Name", "value": "langflow"},
"persist": {"display_name": "Persist"},
- "persist_directory": {"display_name": "Persist Directory"},
+ "index_directory": {"display_name": "Persist Directory"},
"code": {"advanced": True, "display_name": "Code"},
"documents": {"display_name": "Documents", "is_list": True},
"embedding": {"display_name": "Embedding"},
@@ -54,7 +55,7 @@ class ChromaComponent(CustomComponent):
persist: bool,
embedding: Embeddings,
chroma_server_ssl_enabled: bool,
- persist_directory: Optional[str] = None,
+ index_directory: Optional[str] = None,
documents: Optional[List[Document]] = None,
chroma_server_cors_allow_origins: Optional[str] = None,
chroma_server_host: Optional[str] = None,
@@ -66,7 +67,7 @@ class ChromaComponent(CustomComponent):
Args:
- collection_name (str): The name of the collection.
- - persist_directory (Optional[str]): The directory to persist the Vector Store to.
+ - index_directory (Optional[str]): The directory to persist the Vector Store to.
- chroma_server_ssl_enabled (bool): Whether to enable SSL for the Chroma server.
- persist (bool): Whether to persist the Vector Store or not.
- embedding (Optional[Embeddings]): The embeddings to use for the Vector Store.
@@ -85,7 +86,8 @@ class ChromaComponent(CustomComponent):
if chroma_server_host is not None:
chroma_settings = chromadb.config.Settings(
- chroma_server_cors_allow_origins=chroma_server_cors_allow_origins or None,
+ chroma_server_cors_allow_origins=chroma_server_cors_allow_origins
+ or None,
chroma_server_host=chroma_server_host,
chroma_server_port=chroma_server_port or None,
chroma_server_grpc_port=chroma_server_grpc_port or None,
@@ -93,15 +95,25 @@ class ChromaComponent(CustomComponent):
)
# If documents, then we need to create a Chroma instance using .from_documents
+
+ # Check index_directory and expand it if it is a relative path
+
+ index_directory = self.resolve_path(index_directory)
+
if documents is not None and embedding is not None:
if len(documents) == 0:
- raise ValueError("If documents are provided, there must be at least one document.")
- return Chroma.from_documents(
+ raise ValueError(
+ "If documents are provided, there must be at least one document."
+ )
+ chroma = Chroma.from_documents(
documents=documents, # type: ignore
- persist_directory=persist_directory if persist else None,
+ persist_directory=index_directory if persist else None,
collection_name=collection_name,
embedding=embedding,
client_settings=chroma_settings,
)
-
- return Chroma(persist_directory=persist_directory, client_settings=chroma_settings)
+ else:
+ chroma = Chroma(
+ persist_directory=index_directory, client_settings=chroma_settings
+ )
+ return chroma
diff --git a/src/backend/langflow/components/vectorstores/ChromaSearch.py b/src/backend/langflow/components/vectorstores/ChromaSearch.py
index 512c53a8c..1ca91c4ad 100644
--- a/src/backend/langflow/components/vectorstores/ChromaSearch.py
+++ b/src/backend/langflow/components/vectorstores/ChromaSearch.py
@@ -99,7 +99,7 @@ class ChromaSearchComponent(CustomComponent):
chroma_server_grpc_port=chroma_server_grpc_port or None,
chroma_server_ssl_enabled=chroma_server_ssl_enabled,
)
-
+ index_directory = self.resolve_path(index_directory)
chroma = Chroma(
embedding_function=embedding,
collection_name=collection_name,
diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py
index f57f01f7d..9fec3a7ba 100644
--- a/src/backend/langflow/graph/graph/base.py
+++ b/src/backend/langflow/graph/graph/base.py
@@ -1,12 +1,17 @@
from collections import defaultdict, deque
-from typing import Dict, Generator, List, Type, Union
+from typing import Dict, Generator, List, Optional, Type, Union
from langchain.chains.base import Chain
from langflow.graph.edge.base import ContractEdge
from langflow.graph.graph.constants import lazy_load_vertex_dict
from langflow.graph.graph.utils import process_flow
from langflow.graph.vertex.base import Vertex
-from langflow.graph.vertex.types import ChatVertex, FileToolVertex, LLMVertex, ToolkitVertex
+from langflow.graph.vertex.types import (
+ ChatVertex,
+ FileToolVertex,
+ LLMVertex,
+ ToolkitVertex,
+)
from langflow.interface.tools.constants import FILE_TOOLS
from langflow.utils import payload
from loguru import logger
@@ -71,6 +76,25 @@ class Graph:
return False
return self.__repr__() == other.__repr__()
+ # update this graph with another graph by comparing the __repr__ of each vertex
+ # and if the __repr__ of a vertex is not the same as the other
+ # then update the .data of the vertex to the self
+ # both graphs have the same vertices and edges
+ # but the data of the vertices might be different
+ def update(self, other: "Graph", different_vertices: List[str] = None) -> None:
+ if different_vertices is None:
+ different_vertices = []
+ for vertex in self.vertices:
+ if (
+ vertex.id in different_vertices
+ or vertex.__repr__() != other.get_vertex(vertex.id).__repr__()
+ ):
+ vertex.data = other.get_vertex(vertex.id).data
+ vertex._build_params()
+ vertex.graph = self
+ vertex._built = False
+ return self
+
def _build_graph(self) -> None:
"""Builds the graph from the vertices and edges."""
self.vertices = self._build_vertices()
@@ -127,7 +151,9 @@ class Graph:
return
for vertex in self.vertices:
if not self._validate_vertex(vertex):
- raise ValueError(f"{vertex.vertex_type} is not connected to any other components")
+ raise ValueError(
+ f"{vertex.vertex_type} is not connected to any other components"
+ )
def _validate_vertex(self, vertex: Vertex) -> bool:
"""Validates a vertex."""
@@ -140,7 +166,11 @@ class Graph:
def get_vertex_edges(self, vertex_id: str) -> List[ContractEdge]:
"""Returns a list of edges for a given vertex."""
- return [edge for edge in self.edges if edge.source_id == vertex_id or edge.target_id == vertex_id]
+ return [
+ edge
+ for edge in self.edges
+ if edge.source_id == vertex_id or edge.target_id == vertex_id
+ ]
def get_vertices_with_target(self, vertex_id: str) -> List[Vertex]:
"""Returns the vertices connected to a vertex."""
@@ -178,7 +208,9 @@ class Graph:
def dfs(vertex):
if state[vertex] == 1:
# We have a cycle
- raise ValueError("Graph contains a cycle, cannot perform topological sort")
+ raise ValueError(
+ "Graph contains a cycle, cannot perform topological sort"
+ )
if state[vertex] == 0:
state[vertex] = 1
for edge in vertex.edges:
@@ -237,7 +269,9 @@ class Graph:
edges.append(ContractEdge(source, target, edge))
return edges
- def _get_vertex_class(self, node_type: str, node_base_type: str, node_id: str) -> Type[Vertex]:
+ def _get_vertex_class(
+ self, node_type: str, node_base_type: str, node_id: str
+ ) -> Type[Vertex]:
"""Returns the node class based on the node type."""
# First we check for the node_base_type
node_name = node_id.split("-")[0]
@@ -267,14 +301,18 @@ class Graph:
vertex_type: str = vertex_data["type"] # type: ignore
vertex_base_type: str = vertex_data["node"]["template"]["_type"] # type: ignore
- VertexClass = self._get_vertex_class(vertex_type, vertex_base_type, vertex_data["id"])
+ VertexClass = self._get_vertex_class(
+ vertex_type, vertex_base_type, vertex_data["id"]
+ )
vertex_instance = VertexClass(vertex, graph=self)
vertex_instance.set_top_level(self.top_level_vertices)
vertices.append(vertex_instance)
return vertices
- def get_children_by_vertex_type(self, vertex: Vertex, vertex_type: str) -> List[Vertex]:
+ def get_children_by_vertex_type(
+ self, vertex: Vertex, vertex_type: str
+ ) -> List[Vertex]:
"""Returns the children of a vertex based on the vertex type."""
children = []
vertex_types = [vertex.data["type"]]
@@ -286,20 +324,44 @@ class Graph:
def __repr__(self):
vertex_ids = [vertex.id for vertex in self.vertices]
- edges_repr = "\n".join([f"{edge.source_id} --> {edge.target_id}" for edge in self.edges])
+ edges_repr = "\n".join(
+ [f"{edge.source_id} --> {edge.target_id}" for edge in self.edges]
+ )
return f"Graph:\nNodes: {vertex_ids}\nConnections:\n{edges_repr}"
- def layered_topological_sort(self):
- in_degree = {vertex.id: 0 for vertex in self.vertices} # Initialize in-degrees
+ def sort_up_to_vertex(self, vertex_id: str) -> "Graph":
+ """Cuts the graph up to a given vertex."""
+ # Get the vertices that are connected to the vertex
+ # and the vertex itself
+ vertices = [self.get_vertex(vertex_id)]
+ for edge in self.get_vertex(edge.target_id).edges:
+ vertices.append(self.get_vertex(edge.target_id))
+
+ edges = [edge for vertex in vertices for edge in vertex.edges]
+
+ return self.layered_topological_sort(vertices, edges)
+
+ def layered_topological_sort(
+ self,
+ vertices: Optional[List[Vertex]] = None,
+ edges: Optional[List[ContractEdge]] = None,
+ ) -> List[List[str]]:
+ """Performs a layered topological sort of the vertices in the graph."""
+ if vertices is None:
+ vertices = self.vertices
+ if edges is None:
+ edges = self.edges
+
+ in_degree = {vertex.id: 0 for vertex in vertices} # Initialize in-degrees
graph = defaultdict(list) # Adjacency list representation
# Build graph and compute in-degrees
- for edge in self.edges:
+ for edge in edges:
graph[edge.source_id].append(edge.target_id)
in_degree[edge.target_id] += 1
# Queue for vertices with no incoming edges
- queue = deque(vertex.id for vertex in self.vertices if in_degree[vertex.id] == 0)
+ queue = deque(vertex.id for vertex in vertices if in_degree[vertex.id] == 0)
layers = []
current_layer = 0
@@ -314,9 +376,40 @@ class Graph:
if in_degree[neighbor] == 0:
queue.append(neighbor)
current_layer += 1 # Next layer
+ new_layers = self.refine_layers(graph, layers)
+ return new_layers
- return layers
- return layers
- return layers
- return layers
- return layers
+ def refine_layers(self, graph, initial_layers):
+ # Map each vertex to its current layer
+ vertex_to_layer = {}
+ for layer_index, layer in enumerate(initial_layers):
+ for vertex in layer:
+ vertex_to_layer[vertex] = layer_index
+
+ # Build the adjacency list for reverse lookup (dependencies)
+
+ refined_layers = [[] for _ in initial_layers] # Start with empty layers
+ new_layer_index_map = defaultdict(
+ int
+ ) # Map each vertex to its highest dependency layer
+
+ for vertex_id, deps in graph.items():
+ for dep in deps:
+ new_layer_index_map[vertex_id] = (
+ max(new_layer_index_map[vertex_id], vertex_to_layer[dep]) - 1
+ )
+
+ for layer_index, layer in enumerate(initial_layers):
+ for vertex_id in layer:
+ # Place the vertex in the highest possible layer where its dependencies are met
+ new_layer_index = new_layer_index_map[vertex_id]
+ if new_layer_index > layer_index:
+ refined_layers[new_layer_index].append(vertex_id)
+ vertex_to_layer[vertex_id] = new_layer_index
+ else:
+ refined_layers[layer_index].append(vertex_id)
+
+ # Remove empty layers if any
+ refined_layers = [layer for layer in refined_layers if layer]
+
+ return refined_layers
diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py
index 3a2005a63..503855e6e 100644
--- a/src/backend/langflow/graph/vertex/base.py
+++ b/src/backend/langflow/graph/vertex/base.py
@@ -78,14 +78,18 @@ class Vertex:
):
if edge.target_id not in edge_results:
edge_results[edge.target_id] = {}
- edge_results[edge.target_id][edge.target_param] = await edge.get_result(source=self, target=target)
+ edge_results[edge.target_id][edge.target_param] = await edge.get_result(
+ source=self, target=target
+ )
return edge_results
def get_built_result(self):
# If the Vertex.type is a power component
# then we need to return the built object
# instead of the result dict
- if self.is_interface_component and not isinstance(self._built_object, UnbuiltObject):
+ if self.is_interface_component and not isinstance(
+ self._built_object, UnbuiltObject
+ ):
result = self._built_object
# if it is not a dict or a string and hasattr model_dump then
# return the model_dump
@@ -95,7 +99,11 @@ class Vertex:
if isinstance(self._built_result, UnbuiltResult):
return {}
- return self._built_result if isinstance(self._built_result, dict) else {"result": self._built_result}
+ return (
+ self._built_result
+ if isinstance(self._built_result, dict)
+ else {"result": self._built_result}
+ )
def set_artifacts(self) -> None:
pass
@@ -149,18 +157,29 @@ class Vertex:
self.data = self._data["data"]
self.output = self.data["node"]["base_classes"]
self.pinned = self.data["node"].get("pinned", False)
- template_dicts = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)}
- template_dicts = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)}
+ template_dicts = {
+ key: value
+ for key, value in self.data["node"]["template"].items()
+ if isinstance(value, dict)
+ }
self.required_inputs = [
- template_dicts[key]["type"] for key, value in template_dicts.items() if value["required"]
+ template_dicts[key]["type"]
+ for key, value in template_dicts.items()
+ if value["required"]
]
self.optional_inputs = [
- template_dicts[key]["type"] for key, value in template_dicts.items() if not value["required"]
+ template_dicts[key]["type"]
+ for key, value in template_dicts.items()
+ if not value["required"]
]
# Add the template_dicts[key]["input_types"] to the optional_inputs
self.optional_inputs.extend(
- [input_type for value in template_dicts.values() for input_type in value.get("input_types", [])]
+ [
+ input_type
+ for value in template_dicts.values()
+ for input_type in value.get("input_types", [])
+ ]
)
template_dict = self.data["node"]["template"]
@@ -203,7 +222,11 @@ class Vertex:
if self.graph is None:
raise ValueError("Graph not found")
- template_dict = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)}
+ template_dict = {
+ key: value
+ for key, value in self.data["node"]["template"].items()
+ if isinstance(value, dict)
+ }
params = self.params.copy() if self.params else {}
for edge in self.edges:
@@ -255,7 +278,11 @@ class Vertex:
# list of dicts, so we need to convert it to a dict
# before passing it to the build method
if isinstance(val, list):
- params[key] = {k: v for item in value.get("value", []) for k, v in item.items()}
+ params[key] = {
+ k: v
+ for item in value.get("value", [])
+ for k, v in item.items()
+ }
elif isinstance(val, dict):
params[key] = val
elif value.get("type") == "int" and val is not None:
@@ -292,7 +319,12 @@ class Vertex:
self._built = True
- async def _run(self, user_id: str, inputs: Optional[dict] = None, session_id: Optional[str] = None):
+ async def _run(
+ self,
+ user_id: str,
+ inputs: Optional[dict] = None,
+ session_id: Optional[str] = None,
+ ):
# user_id is just for compatibility with the other build methods
inputs = inputs or {}
# inputs = {key: value or "" for key, value in inputs.items()}
@@ -307,7 +339,9 @@ class Vertex:
if isinstance(self._built_object, str):
self._built_result = self._built_object
- result = await generate_result(self._built_object, inputs, self.has_external_output, session_id)
+ result = await generate_result(
+ self._built_object, inputs, self.has_external_output, session_id
+ )
self._built_result = result
async def _build_each_node_in_params_dict(self, user_id=None):
@@ -335,7 +369,9 @@ class Vertex:
"""
return all(self._is_node(node) for node in value)
- async def get_result(self, requester: Optional["Vertex"] = None, user_id=None, timeout=None) -> Any:
+ async def get_result(
+ self, requester: Optional["Vertex"] = None, user_id=None, timeout=None
+ ) -> Any:
# PLEASE REVIEW THIS IF STATEMENT
# Check if the Vertex was built already
if self._built:
@@ -369,7 +405,9 @@ class Vertex:
self._extend_params_list_with_result(key, result)
self.params[key] = result
- async def _build_list_of_nodes_and_update_params(self, key, nodes: List["Vertex"], user_id=None):
+ async def _build_list_of_nodes_and_update_params(
+ self, key, nodes: List["Vertex"], user_id=None
+ ):
"""
Iterates over a list of nodes, builds each and updates the params dictionary.
"""
@@ -421,7 +459,9 @@ class Vertex:
self._update_built_object_and_artifacts(result)
except Exception as exc:
logger.exception(exc)
- raise ValueError(f"Error building node {self.vertex_type}(ID:{self.id}): {str(exc)}") from exc
+ raise ValueError(
+ f"Error building node {self.vertex_type}(ID:{self.id}): {str(exc)}"
+ ) from exc
def _update_built_object_and_artifacts(self, result):
"""
@@ -458,7 +498,7 @@ class Vertex:
requester: Optional["Vertex"] = None,
**kwargs,
) -> Any:
- if self.pinned:
+ if self.pinned and self._built:
return self.get_requester_result(requester)
self._reset()
@@ -480,9 +520,15 @@ class Vertex:
return self._built_object
# Get the requester edge
- requester_edge = next((edge for edge in self.edges if edge.target_id == requester.id), None)
+ requester_edge = next(
+ (edge for edge in self.edges if edge.target_id == requester.id), None
+ )
# Return the result of the requester edge
- return None if requester_edge is None else await requester_edge.get_result(source=self, target=requester)
+ return (
+ None
+ if requester_edge is None
+ else await requester_edge.get_result(source=self, target=requester)
+ )
def add_edge(self, edge: "ContractEdge") -> None:
if edge not in self.edges:
@@ -502,7 +548,11 @@ class Vertex:
def _built_object_repr(self):
# Add a message with an emoji, stars for sucess,
- return "Built sucessfully ✨" if self._built_object is not None else "Failed to build 😵💫"
+ return (
+ "Built sucessfully ✨"
+ if self._built_object is not None
+ else "Failed to build 😵💫"
+ )
class StatefulVertex(Vertex):
diff --git a/src/backend/langflow/interface/custom/custom_component/component.py b/src/backend/langflow/interface/custom/custom_component/component.py
index b65cb6542..3af7acea5 100644
--- a/src/backend/langflow/interface/custom/custom_component/component.py
+++ b/src/backend/langflow/interface/custom/custom_component/component.py
@@ -5,7 +5,6 @@ from typing import Any, ClassVar, Optional
import emoji
from cachetools import TTLCache, cachedmethod
from fastapi import HTTPException
-
from langflow.interface.custom.code_parser import CodeParser
from langflow.interface.custom.eval import eval_custom_component_code
from langflow.utils import validate
@@ -21,7 +20,9 @@ class ComponentFunctionEntrypointNameNullError(HTTPException):
class Component:
ERROR_CODE_NULL: ClassVar[str] = "Python code must be provided."
- ERROR_FUNCTION_ENTRYPOINT_NAME_NULL: ClassVar[str] = "The name of the entrypoint function must be provided."
+ ERROR_FUNCTION_ENTRYPOINT_NAME_NULL: ClassVar[str] = (
+ "The name of the entrypoint function must be provided."
+ )
code: Optional[str] = None
_function_entrypoint_name: str = "build"
@@ -36,10 +37,6 @@ class Component:
else:
setattr(self, key, value)
- # Validate the emoji at the icon field
- if hasattr(self, "icon") and self.icon:
- self.icon = self.validate_icon(self.icon)
-
def __setattr__(self, key, value):
if key == "_user_id" and hasattr(self, "_user_id"):
warnings.warn("user_id is immutable and cannot be changed.")
@@ -68,8 +65,8 @@ class Component:
return validate.create_function(self.code, self._function_entrypoint_name)
- def getattr_return_str(self, component, value):
- value = getattr(component, value)
+ def getattr_return_str(self, value):
+
return str(value) if value else ""
def build_template_config(self) -> dict:
@@ -89,19 +86,22 @@ class Component:
for attribute, func in attributes_func_mapping.items():
if hasattr(component_instance, attribute):
- template_config[attribute] = func(component=component_instance, value=attribute)
+ value = getattr(component_instance, attribute)
+ if value is not None:
+ template_config[attribute] = func(value=value)
- return template_config
+ return template_config
def validate_icon(self, value: str, *args, **kwargs):
# we are going to use the emoji library to validate the emoji
# emojis can be defined using the :emoji_name: syntax
if not value.startswith(":") or not value.endswith(":"):
- raise ValueError("Invalid emoji. Please use the :emoji_name: syntax.")
-
+ warnings.warn("Invalid emoji. Please use the :emoji_name: syntax.")
+ return value
emoji_value = emoji.emojize(value, variant="emoji_type")
if value == emoji_value:
- raise ValueError(f"Invalid emoji. {value} is not a valid emoji.")
+ warnings.warn(f"Invalid emoji. {value} is not a valid emoji.")
+ return value
return emoji_value
def build(self, *args: Any, **kwargs: Any) -> Any:
diff --git a/src/backend/langflow/interface/custom/custom_component/custom_component.py b/src/backend/langflow/interface/custom/custom_component/custom_component.py
index 945d7db2c..999782ffc 100644
--- a/src/backend/langflow/interface/custom/custom_component/custom_component.py
+++ b/src/backend/langflow/interface/custom/custom_component/custom_component.py
@@ -1,11 +1,11 @@
import operator
+from pathlib import Path
from typing import Any, Callable, ClassVar, List, Optional, Union
from uuid import UUID
import yaml
from cachetools import TTLCache, cachedmethod
from fastapi import HTTPException
-
from langflow.interface.custom.code_parser.utils import (
extract_inner_type_from_generic_alias,
extract_union_types_from_generic_alias,
@@ -13,7 +13,11 @@ from langflow.interface.custom.code_parser.utils import (
from langflow.interface.custom.custom_component.component import Component
from langflow.services.database.models.flow import Flow
from langflow.services.database.utils import session_getter
-from langflow.services.deps import get_credential_service, get_db_service, get_storage_service
+from langflow.services.deps import (
+ get_credential_service,
+ get_db_service,
+ get_storage_service,
+)
from langflow.services.storage.service import StorageService
from langflow.utils import validate
@@ -42,6 +46,16 @@ class CustomComponent(Component):
self.cache = TTLCache(maxsize=1024, ttl=60)
super().__init__(**data)
+ @staticmethod
+ def resolve_path(path: str) -> str:
+ """Resolves the path to an absolute path."""
+ path_object = Path(path)
+ if path_object.parts[0] == "~":
+ path_object = path_object.expanduser()
+ elif path_object.is_relative_to("."):
+ path_object = path_object.resolve()
+ return str(path_object)
+
def get_full_path(self, path: str) -> str:
storage_svc: "StorageService" = get_storage_service()
@@ -78,7 +92,8 @@ class CustomComponent(Component):
detail={
"error": "Type hint Error",
"traceback": (
- "Prompt type is not supported in the build method." " Try using PromptTemplate instead."
+ "Prompt type is not supported in the build method."
+ " Try using PromptTemplate instead."
),
},
)
@@ -92,14 +107,20 @@ class CustomComponent(Component):
if not self.code:
return {}
- component_classes = [cls for cls in self.tree["classes"] if self.code_class_base_inheritance in cls["bases"]]
+ component_classes = [
+ cls
+ for cls in self.tree["classes"]
+ if self.code_class_base_inheritance in cls["bases"]
+ ]
if not component_classes:
return {}
# Assume the first Component class is the one we're interested in
component_class = component_classes[0]
build_methods = [
- method for method in component_class["methods"] if method["name"] == self.function_entrypoint_name
+ method
+ for method in component_class["methods"]
+ if method["name"] == self.function_entrypoint_name
]
return build_methods[0] if build_methods else {}
@@ -112,7 +133,10 @@ class CustomComponent(Component):
return_type = build_method["return_type"]
# If list or List is in the return type, then we remove it and return the inner type
- if hasattr(return_type, "__origin__") and return_type.__origin__ in [list, List]:
+ if hasattr(return_type, "__origin__") and return_type.__origin__ in [
+ list,
+ List,
+ ]:
return_type = extract_inner_type_from_generic_alias(return_type)
# If the return type is not a Union, then we just return it as a list
@@ -153,7 +177,9 @@ class CustomComponent(Component):
# Retrieve and decrypt the credential by name for the current user
db_service = get_db_service()
with session_getter(db_service) as session:
- return credential_service.get_credential(user_id=self._user_id or "", name=name, session=session)
+ return credential_service.get_credential(
+ user_id=self._user_id or "", name=name, session=session
+ )
return get_credential
@@ -163,7 +189,9 @@ class CustomComponent(Component):
credential_service = get_credential_service()
db_service = get_db_service()
with session_getter(db_service) as session:
- return credential_service.list_credentials(user_id=self._user_id, session=session)
+ return credential_service.list_credentials(
+ user_id=self._user_id, session=session
+ )
def index(self, value: int = 0):
"""Returns a function that returns the value at the given index in the iterable."""
@@ -214,7 +242,11 @@ class CustomComponent(Component):
if flow_id:
flow = session.query(Flow).get(flow_id)
elif flow_name:
- flow = (session.query(Flow).filter(Flow.name == flow_name).filter(Flow.user_id == self.user_id)).first()
+ flow = (
+ session.query(Flow)
+ .filter(Flow.name == flow_name)
+ .filter(Flow.user_id == self.user_id)
+ ).first()
else:
raise ValueError("Either flow_name or flow_id must be provided")
diff --git a/src/backend/langflow/services/chat/service.py b/src/backend/langflow/services/chat/service.py
index 0d7f9ca6c..ab4f7c9a8 100644
--- a/src/backend/langflow/services/chat/service.py
+++ b/src/backend/langflow/services/chat/service.py
@@ -59,7 +59,9 @@ class ChatService(Service):
"""Send the last chat message to the client."""
client_id = self.chat_cache.current_client_id
if client_id in self.active_connections:
- chat_response = self.chat_history.get_history(client_id, filter_messages=False)[-1]
+ chat_response = self.chat_history.get_history(
+ client_id, filter_messages=False
+ )[-1]
if chat_response.is_bot:
# Process FileResponse
if isinstance(chat_response, FileResponse):
@@ -86,7 +88,9 @@ class ChatService(Service):
data_type=self.last_cached_object_dict["type"],
)
- self.chat_history.add_message(self.chat_cache.current_client_id, chat_response)
+ self.chat_history.add_message(
+ self.chat_cache.current_client_id, chat_response
+ )
async def connect(self, client_id: str, websocket: WebSocket):
self.active_connections[client_id] = websocket
@@ -209,7 +213,9 @@ class ChatService(Service):
await self.process_message(client_id, payload, build_result)
else:
- raise RuntimeError(f"Could not find a build result for client_id {client_id}")
+ raise RuntimeError(
+ f"Could not find a build result for client_id {client_id}"
+ )
except Exception as exc:
# Handle any exceptions that might occur
logger.exception(f"Error handling websocket: {exc}")
@@ -241,6 +247,12 @@ class ChatService(Service):
"""
return self.cache_service.get(client_id)
+ def clear_cache(self, client_id: str):
+ """
+ Clear the cache for a client.
+ """
+ self.cache_service.clear(client_id)
+
def dict_to_markdown_table(my_dict):
markdown_table = "| Key | Value |\n|---|---|\n"
diff --git a/src/backend/langflow/services/monitor/schema.py b/src/backend/langflow/services/monitor/schema.py
index 1cb6cecff..0cca430f6 100644
--- a/src/backend/langflow/services/monitor/schema.py
+++ b/src/backend/langflow/services/monitor/schema.py
@@ -7,7 +7,9 @@ from pydantic import BaseModel, Field, field_serializer, validator
class TransactionModel(BaseModel):
id: Optional[int] = Field(default=None, alias="id")
- timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp")
+ timestamp: Optional[datetime] = Field(
+ default_factory=datetime.now, alias="timestamp"
+ )
source: str
target: str
target_args: dict
@@ -89,18 +91,13 @@ class VertexBuildModel(BaseModel):
return v
-# create a function that turns dicts into a
-# dict like this:
-# my_map_dict = {
-# "key": [
-# 1, 2, 3
-# ],
-# "value": [
-# "one", "two", "three"
-# ]
-# }
-# so map has a "key" and a "value" list
-# containing the keys and values of the dict
+class VertexBuildResponseModel(VertexBuildModel):
+
+ @field_serializer("data", "artifacts")
+ def serialize_dict(v):
+ return v
+
+
def to_map(value: dict):
keys = list(value.keys())
values = list(value.values())
@@ -108,13 +105,13 @@ def to_map(value: dict):
class VertexBuildMapModel(BaseModel):
- vertex_builds: dict[str, list[VertexBuildModel]]
+ vertex_builds: dict[str, list[VertexBuildResponseModel]]
@classmethod
def from_list_of_dicts(cls, vertex_build_dicts):
vertex_build_map = {}
for vertex_build_dict in vertex_build_dicts:
- vertex_build = VertexBuildModel(**vertex_build_dict)
+ vertex_build = VertexBuildResponseModel(**vertex_build_dict)
if vertex_build.id not in vertex_build_map:
vertex_build_map[vertex_build.id] = []
vertex_build_map[vertex_build.id].append(vertex_build)
diff --git a/src/backend/langflow/services/storage/local.py b/src/backend/langflow/services/storage/local.py
index a3fa6887b..dcc564441 100644
--- a/src/backend/langflow/services/storage/local.py
+++ b/src/backend/langflow/services/storage/local.py
@@ -1,13 +1,12 @@
from pathlib import Path
-import aiofiles
from loguru import logger
from .service import StorageService
class LocalStorageService(StorageService):
- """A service class for handling local storage operations."""
+ """A service class for handling local storage operations without aiofiles."""
def __init__(self, session_service, settings_service):
"""Initialize the local storage service with session and settings services."""
@@ -35,8 +34,8 @@ class LocalStorageService(StorageService):
file_path = folder_path / file_name
try:
- async with aiofiles.open(file_path, "wb") as f:
- await f.write(data)
+ with open(file_path, "wb") as f:
+ f.write(data)
logger.info(f"File {file_name} saved successfully in flow {flow_id}.")
except Exception as e:
logger.error(f"Error saving file {file_name} in flow {flow_id}: {e}")
@@ -56,9 +55,9 @@ class LocalStorageService(StorageService):
logger.warning(f"File {file_name} not found in flow {flow_id}.")
raise FileNotFoundError(f"File {file_name} not found in flow {flow_id}")
- async with aiofiles.open(file_path, "rb") as f:
+ with open(file_path, "rb") as f:
logger.info(f"File {file_name} retrieved successfully from flow {flow_id}.")
- return await f.read()
+ return f.read()
async def list_files(self, flow_id: str):
"""
@@ -89,8 +88,10 @@ class LocalStorageService(StorageService):
file_path.unlink()
logger.info(f"File {file_name} deleted successfully from flow {flow_id}.")
else:
- logger.warning(f"Attempted to delete non-existent file {file_name} in flow {flow_id}.")
+ logger.warning(
+ f"Attempted to delete non-existent file {file_name} in flow {flow_id}."
+ )
def teardown(self):
"""Perform any cleanup operations when the service is being torn down."""
- pass # No specific teardown actions required for local storage at the moment.
+ pass # No specific teardown actions required for local
diff --git a/src/frontend/src/CustomNodes/GenericNode/components/parameterComponent/index.tsx b/src/frontend/src/CustomNodes/GenericNode/components/parameterComponent/index.tsx
index 936e65893..8b45a40ef 100644
--- a/src/frontend/src/CustomNodes/GenericNode/components/parameterComponent/index.tsx
+++ b/src/frontend/src/CustomNodes/GenericNode/components/parameterComponent/index.tsx
@@ -406,13 +406,27 @@ export default function ParameterComponent({
data-testid={"textarea-" + data.node.template[name].name}
/>
) : (
-