Merge branch 'zustand/io/migration' of github.com:logspace-ai/langflow into zustand/io/migration

This commit is contained in:
igorrCarvalho 2024-02-18 13:38:17 -03:00
commit ba4d51a77f
18 changed files with 433 additions and 150 deletions

View file

@ -1,6 +1,15 @@
import time
from typing import Optional
from fastapi import APIRouter, Body, Depends, HTTPException, WebSocket, WebSocketException, status
from fastapi import (
APIRouter,
Body,
Depends,
HTTPException,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import StreamingResponse
from langflow.api.utils import build_input_keys_response, format_elapsed_time
from langflow.api.v1.schemas import (
@ -15,7 +24,10 @@ from langflow.api.v1.schemas import (
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import StatelessVertex
from langflow.processing.process import process_tweaks_on_graph
from langflow.services.auth.utils import get_current_active_user, get_current_user_for_websocket
from langflow.services.auth.utils import (
get_current_active_user,
get_current_user_for_websocket,
)
from langflow.services.cache.service import BaseCacheService
from langflow.services.cache.utils import update_build_status
from langflow.services.chat.service import ChatService
@ -40,9 +52,13 @@ async def chat(
user = await get_current_user_for_websocket(websocket, db)
await websocket.accept()
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized")
await websocket.close(
code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized"
)
elif not user.is_active:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized")
await websocket.close(
code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized"
)
if client_id in chat_service.cache_service:
await chat_service.handle_websocket(client_id, websocket)
@ -58,7 +74,9 @@ async def chat(
logger.error(f"Error in chat websocket: {exc}")
messsage = exc.detail if isinstance(exc, HTTPException) else str(exc)
if "Could not validate credentials" in str(exc):
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized")
await websocket.close(
code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized"
)
else:
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=messsage)
@ -100,10 +118,15 @@ async def init_build(
@router.get("/build/{flow_id}/status", response_model=BuiltResponse)
async def build_status(flow_id: str, cache_service: "BaseCacheService" = Depends(get_cache_service)):
async def build_status(
flow_id: str, cache_service: "BaseCacheService" = Depends(get_cache_service)
):
"""Check the flow_id is in the cache_service."""
try:
built = flow_id in cache_service and cache_service[flow_id]["status"] == BuildStatus.SUCCESS
built = (
flow_id in cache_service
and cache_service[flow_id]["status"] == BuildStatus.SUCCESS
)
return BuiltResponse(
built=built,
@ -174,7 +197,9 @@ async def stream_build(
valid = True
logger.debug(f"Building node {str(vertex.vertex_type)}")
logger.debug(f"Output: {params[:100]}{'...' if len(params) > 100 else ''}")
logger.debug(
f"Output: {params[:100]}{'...' if len(params) > 100 else ''}"
)
if vertex.artifacts:
# The artifacts will be prompt variables
# passed to build_input_keys_response
@ -187,7 +212,9 @@ async def stream_build(
time_elapsed = format_elapsed_time(time.perf_counter() - start_time)
update_build_status(cache_service, flow_id, BuildStatus.FAILURE)
vertex_id = vertex.parent_node_id if vertex.parent_is_top_level else vertex.id
vertex_id = (
vertex.parent_node_id if vertex.parent_is_top_level else vertex.id
)
if vertex_id in graph.top_level_vertices:
response = {
"valid": valid,
@ -202,7 +229,9 @@ async def stream_build(
langchain_object = await graph.build()
# Now we need to check the input_keys to send them to the client
if hasattr(langchain_object, "input_keys"):
input_keys_response = build_input_keys_response(langchain_object, artifacts)
input_keys_response = build_input_keys_response(
langchain_object, artifacts
)
else:
input_keys_response = {
"input_keys": None,
@ -249,17 +278,31 @@ async def try_running_celery_task(vertex, user_id):
@router.get("/build/{flow_id}/vertices", response_model=VerticesOrderResponse)
async def get_vertices(
flow_id: str,
component_id: Optional[str] = None,
chat_service: "ChatService" = Depends(get_chat_service),
session=Depends(get_session),
):
"""Check the flow_id is in the flow_data_store."""
try:
# First, we need to check if the flow_id is in the cache
graph = None
if cache := chat_service.get_cache(flow_id):
graph: Graph = cache.get("result")
flow: Flow = session.get(Flow, flow_id)
if not flow or not flow.data:
raise ValueError("Invalid flow ID")
graph = Graph.from_payload(flow.data)
other_graph = Graph.from_payload(flow.data)
if graph is None:
graph = other_graph
else:
graph = graph.update(other_graph)
chat_service.set_cache(flow_id, graph)
vertices = graph.layered_topological_sort()
if component_id:
vertices = graph.sort_up_to_vertex(component_id)
else:
vertices = graph.layered_topological_sort()
# Now vertices is a list of lists
# We need to get the id of each vertex
# and return the same structure but only with the ids
@ -276,7 +319,7 @@ async def build_vertex(
flow_id: str,
vertex_id: str,
chat_service: "ChatService" = Depends(get_chat_service),
# current_user=Depends(get_current_active_user),
current_user=Depends(get_current_active_user),
tweaks: dict = Body(None),
inputs: dict = Body(None),
):
@ -295,7 +338,7 @@ async def build_vertex(
raise ValueError("Invalid vertex")
try:
if isinstance(vertex, StatelessVertex) or not vertex._built:
await vertex.build(user_id=None)
await vertex.build(user_id=current_user.id)
params = vertex._built_object_repr()
valid = True
result_dict = vertex.get_built_result()
@ -305,13 +348,21 @@ async def build_vertex(
artifacts = vertex.artifacts
timedelta = time.perf_counter() - start_time
duration = format_elapsed_time(timedelta)
result_dict = ResultDict(results=result_dict, artifacts=artifacts, duration=duration, timedelta=timedelta)
result_dict = ResultDict(
results=result_dict,
artifacts=artifacts,
duration=duration,
timedelta=timedelta,
)
chat_service.set_cache(flow_id, graph)
except Exception as exc:
params = str(exc)
valid = False
result_dict = ResultDict(results={})
artifacts = {}
chat_service.set_cache(flow_id, graph)
# If there's an error building the vertex
# we need to clear the cache
chat_service.clear_cache(flow_id)
await log_vertex_build(
flow_id=flow_id,
vertex_id=vertex_id,

View file

@ -161,7 +161,9 @@ class StreamData(BaseModel):
data: dict
def __str__(self) -> str:
return f"event: {self.event}\ndata: {orjson_dumps(self.data, indent_2=False)}\n\n"
return (
f"event: {self.event}\ndata: {orjson_dumps(self.data, indent_2=False)}\n\n"
)
class CustomComponentCode(BaseModel):
@ -246,4 +248,3 @@ class VertexBuildResponse(BaseModel):
class VerticesBuiltResponse(BaseModel):
vertices: List[VertexBuildResponse]
vertices: List[VertexBuildResponse]

View file

@ -24,6 +24,8 @@ class CharacterTextSplitterComponent(CustomComponent):
chunk_size: int = 1000,
separator: str = "\n",
) -> List[Document]:
# separator may come escaped from the frontend
separator = separator.encode().decode("unicode_escape")
docs = CharacterTextSplitter(
chunk_overlap=chunk_overlap,
chunk_size=chunk_size,

View file

@ -17,6 +17,7 @@ class ChromaComponent(CustomComponent):
description: str = "Implementation of Vector Store using Chroma"
documentation = "https://python.langchain.com/docs/integrations/vectorstores/chroma"
beta: bool = True
icon = "Chroma"
def build_config(self):
"""
@ -28,7 +29,7 @@ class ChromaComponent(CustomComponent):
return {
"collection_name": {"display_name": "Collection Name", "value": "langflow"},
"persist": {"display_name": "Persist"},
"persist_directory": {"display_name": "Persist Directory"},
"index_directory": {"display_name": "Persist Directory"},
"code": {"advanced": True, "display_name": "Code"},
"documents": {"display_name": "Documents", "is_list": True},
"embedding": {"display_name": "Embedding"},
@ -54,7 +55,7 @@ class ChromaComponent(CustomComponent):
persist: bool,
embedding: Embeddings,
chroma_server_ssl_enabled: bool,
persist_directory: Optional[str] = None,
index_directory: Optional[str] = None,
documents: Optional[List[Document]] = None,
chroma_server_cors_allow_origins: Optional[str] = None,
chroma_server_host: Optional[str] = None,
@ -66,7 +67,7 @@ class ChromaComponent(CustomComponent):
Args:
- collection_name (str): The name of the collection.
- persist_directory (Optional[str]): The directory to persist the Vector Store to.
- index_directory (Optional[str]): The directory to persist the Vector Store to.
- chroma_server_ssl_enabled (bool): Whether to enable SSL for the Chroma server.
- persist (bool): Whether to persist the Vector Store or not.
- embedding (Optional[Embeddings]): The embeddings to use for the Vector Store.
@ -85,7 +86,8 @@ class ChromaComponent(CustomComponent):
if chroma_server_host is not None:
chroma_settings = chromadb.config.Settings(
chroma_server_cors_allow_origins=chroma_server_cors_allow_origins or None,
chroma_server_cors_allow_origins=chroma_server_cors_allow_origins
or None,
chroma_server_host=chroma_server_host,
chroma_server_port=chroma_server_port or None,
chroma_server_grpc_port=chroma_server_grpc_port or None,
@ -93,15 +95,25 @@ class ChromaComponent(CustomComponent):
)
# If documents, then we need to create a Chroma instance using .from_documents
# Check index_directory and expand it if it is a relative path
index_directory = self.resolve_path(index_directory)
if documents is not None and embedding is not None:
if len(documents) == 0:
raise ValueError("If documents are provided, there must be at least one document.")
return Chroma.from_documents(
raise ValueError(
"If documents are provided, there must be at least one document."
)
chroma = Chroma.from_documents(
documents=documents, # type: ignore
persist_directory=persist_directory if persist else None,
persist_directory=index_directory if persist else None,
collection_name=collection_name,
embedding=embedding,
client_settings=chroma_settings,
)
return Chroma(persist_directory=persist_directory, client_settings=chroma_settings)
else:
chroma = Chroma(
persist_directory=index_directory, client_settings=chroma_settings
)
return chroma

View file

@ -99,7 +99,7 @@ class ChromaSearchComponent(CustomComponent):
chroma_server_grpc_port=chroma_server_grpc_port or None,
chroma_server_ssl_enabled=chroma_server_ssl_enabled,
)
index_directory = self.resolve_path(index_directory)
chroma = Chroma(
embedding_function=embedding,
collection_name=collection_name,

View file

@ -1,12 +1,17 @@
from collections import defaultdict, deque
from typing import Dict, Generator, List, Type, Union
from typing import Dict, Generator, List, Optional, Type, Union
from langchain.chains.base import Chain
from langflow.graph.edge.base import ContractEdge
from langflow.graph.graph.constants import lazy_load_vertex_dict
from langflow.graph.graph.utils import process_flow
from langflow.graph.vertex.base import Vertex
from langflow.graph.vertex.types import ChatVertex, FileToolVertex, LLMVertex, ToolkitVertex
from langflow.graph.vertex.types import (
ChatVertex,
FileToolVertex,
LLMVertex,
ToolkitVertex,
)
from langflow.interface.tools.constants import FILE_TOOLS
from langflow.utils import payload
from loguru import logger
@ -71,6 +76,25 @@ class Graph:
return False
return self.__repr__() == other.__repr__()
# update this graph with another graph by comparing the __repr__ of each vertex
# and if the __repr__ of a vertex is not the same as the other
# then update the .data of the vertex to the self
# both graphs have the same vertices and edges
# but the data of the vertices might be different
def update(self, other: "Graph", different_vertices: List[str] = None) -> None:
if different_vertices is None:
different_vertices = []
for vertex in self.vertices:
if (
vertex.id in different_vertices
or vertex.__repr__() != other.get_vertex(vertex.id).__repr__()
):
vertex.data = other.get_vertex(vertex.id).data
vertex._build_params()
vertex.graph = self
vertex._built = False
return self
def _build_graph(self) -> None:
"""Builds the graph from the vertices and edges."""
self.vertices = self._build_vertices()
@ -127,7 +151,9 @@ class Graph:
return
for vertex in self.vertices:
if not self._validate_vertex(vertex):
raise ValueError(f"{vertex.vertex_type} is not connected to any other components")
raise ValueError(
f"{vertex.vertex_type} is not connected to any other components"
)
def _validate_vertex(self, vertex: Vertex) -> bool:
"""Validates a vertex."""
@ -140,7 +166,11 @@ class Graph:
def get_vertex_edges(self, vertex_id: str) -> List[ContractEdge]:
"""Returns a list of edges for a given vertex."""
return [edge for edge in self.edges if edge.source_id == vertex_id or edge.target_id == vertex_id]
return [
edge
for edge in self.edges
if edge.source_id == vertex_id or edge.target_id == vertex_id
]
def get_vertices_with_target(self, vertex_id: str) -> List[Vertex]:
"""Returns the vertices connected to a vertex."""
@ -178,7 +208,9 @@ class Graph:
def dfs(vertex):
if state[vertex] == 1:
# We have a cycle
raise ValueError("Graph contains a cycle, cannot perform topological sort")
raise ValueError(
"Graph contains a cycle, cannot perform topological sort"
)
if state[vertex] == 0:
state[vertex] = 1
for edge in vertex.edges:
@ -237,7 +269,9 @@ class Graph:
edges.append(ContractEdge(source, target, edge))
return edges
def _get_vertex_class(self, node_type: str, node_base_type: str, node_id: str) -> Type[Vertex]:
def _get_vertex_class(
self, node_type: str, node_base_type: str, node_id: str
) -> Type[Vertex]:
"""Returns the node class based on the node type."""
# First we check for the node_base_type
node_name = node_id.split("-")[0]
@ -267,14 +301,18 @@ class Graph:
vertex_type: str = vertex_data["type"] # type: ignore
vertex_base_type: str = vertex_data["node"]["template"]["_type"] # type: ignore
VertexClass = self._get_vertex_class(vertex_type, vertex_base_type, vertex_data["id"])
VertexClass = self._get_vertex_class(
vertex_type, vertex_base_type, vertex_data["id"]
)
vertex_instance = VertexClass(vertex, graph=self)
vertex_instance.set_top_level(self.top_level_vertices)
vertices.append(vertex_instance)
return vertices
def get_children_by_vertex_type(self, vertex: Vertex, vertex_type: str) -> List[Vertex]:
def get_children_by_vertex_type(
self, vertex: Vertex, vertex_type: str
) -> List[Vertex]:
"""Returns the children of a vertex based on the vertex type."""
children = []
vertex_types = [vertex.data["type"]]
@ -286,20 +324,44 @@ class Graph:
def __repr__(self):
vertex_ids = [vertex.id for vertex in self.vertices]
edges_repr = "\n".join([f"{edge.source_id} --> {edge.target_id}" for edge in self.edges])
edges_repr = "\n".join(
[f"{edge.source_id} --> {edge.target_id}" for edge in self.edges]
)
return f"Graph:\nNodes: {vertex_ids}\nConnections:\n{edges_repr}"
def layered_topological_sort(self):
in_degree = {vertex.id: 0 for vertex in self.vertices} # Initialize in-degrees
def sort_up_to_vertex(self, vertex_id: str) -> "Graph":
"""Cuts the graph up to a given vertex."""
# Get the vertices that are connected to the vertex
# and the vertex itself
vertices = [self.get_vertex(vertex_id)]
for edge in self.get_vertex(edge.target_id).edges:
vertices.append(self.get_vertex(edge.target_id))
edges = [edge for vertex in vertices for edge in vertex.edges]
return self.layered_topological_sort(vertices, edges)
def layered_topological_sort(
self,
vertices: Optional[List[Vertex]] = None,
edges: Optional[List[ContractEdge]] = None,
) -> List[List[str]]:
"""Performs a layered topological sort of the vertices in the graph."""
if vertices is None:
vertices = self.vertices
if edges is None:
edges = self.edges
in_degree = {vertex.id: 0 for vertex in vertices} # Initialize in-degrees
graph = defaultdict(list) # Adjacency list representation
# Build graph and compute in-degrees
for edge in self.edges:
for edge in edges:
graph[edge.source_id].append(edge.target_id)
in_degree[edge.target_id] += 1
# Queue for vertices with no incoming edges
queue = deque(vertex.id for vertex in self.vertices if in_degree[vertex.id] == 0)
queue = deque(vertex.id for vertex in vertices if in_degree[vertex.id] == 0)
layers = []
current_layer = 0
@ -314,9 +376,40 @@ class Graph:
if in_degree[neighbor] == 0:
queue.append(neighbor)
current_layer += 1 # Next layer
new_layers = self.refine_layers(graph, layers)
return new_layers
return layers
return layers
return layers
return layers
return layers
def refine_layers(self, graph, initial_layers):
# Map each vertex to its current layer
vertex_to_layer = {}
for layer_index, layer in enumerate(initial_layers):
for vertex in layer:
vertex_to_layer[vertex] = layer_index
# Build the adjacency list for reverse lookup (dependencies)
refined_layers = [[] for _ in initial_layers] # Start with empty layers
new_layer_index_map = defaultdict(
int
) # Map each vertex to its highest dependency layer
for vertex_id, deps in graph.items():
for dep in deps:
new_layer_index_map[vertex_id] = (
max(new_layer_index_map[vertex_id], vertex_to_layer[dep]) - 1
)
for layer_index, layer in enumerate(initial_layers):
for vertex_id in layer:
# Place the vertex in the highest possible layer where its dependencies are met
new_layer_index = new_layer_index_map[vertex_id]
if new_layer_index > layer_index:
refined_layers[new_layer_index].append(vertex_id)
vertex_to_layer[vertex_id] = new_layer_index
else:
refined_layers[layer_index].append(vertex_id)
# Remove empty layers if any
refined_layers = [layer for layer in refined_layers if layer]
return refined_layers

View file

@ -78,14 +78,18 @@ class Vertex:
):
if edge.target_id not in edge_results:
edge_results[edge.target_id] = {}
edge_results[edge.target_id][edge.target_param] = await edge.get_result(source=self, target=target)
edge_results[edge.target_id][edge.target_param] = await edge.get_result(
source=self, target=target
)
return edge_results
def get_built_result(self):
# If the Vertex.type is a power component
# then we need to return the built object
# instead of the result dict
if self.is_interface_component and not isinstance(self._built_object, UnbuiltObject):
if self.is_interface_component and not isinstance(
self._built_object, UnbuiltObject
):
result = self._built_object
# if it is not a dict or a string and hasattr model_dump then
# return the model_dump
@ -95,7 +99,11 @@ class Vertex:
if isinstance(self._built_result, UnbuiltResult):
return {}
return self._built_result if isinstance(self._built_result, dict) else {"result": self._built_result}
return (
self._built_result
if isinstance(self._built_result, dict)
else {"result": self._built_result}
)
def set_artifacts(self) -> None:
pass
@ -149,18 +157,29 @@ class Vertex:
self.data = self._data["data"]
self.output = self.data["node"]["base_classes"]
self.pinned = self.data["node"].get("pinned", False)
template_dicts = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)}
template_dicts = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)}
template_dicts = {
key: value
for key, value in self.data["node"]["template"].items()
if isinstance(value, dict)
}
self.required_inputs = [
template_dicts[key]["type"] for key, value in template_dicts.items() if value["required"]
template_dicts[key]["type"]
for key, value in template_dicts.items()
if value["required"]
]
self.optional_inputs = [
template_dicts[key]["type"] for key, value in template_dicts.items() if not value["required"]
template_dicts[key]["type"]
for key, value in template_dicts.items()
if not value["required"]
]
# Add the template_dicts[key]["input_types"] to the optional_inputs
self.optional_inputs.extend(
[input_type for value in template_dicts.values() for input_type in value.get("input_types", [])]
[
input_type
for value in template_dicts.values()
for input_type in value.get("input_types", [])
]
)
template_dict = self.data["node"]["template"]
@ -203,7 +222,11 @@ class Vertex:
if self.graph is None:
raise ValueError("Graph not found")
template_dict = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)}
template_dict = {
key: value
for key, value in self.data["node"]["template"].items()
if isinstance(value, dict)
}
params = self.params.copy() if self.params else {}
for edge in self.edges:
@ -255,7 +278,11 @@ class Vertex:
# list of dicts, so we need to convert it to a dict
# before passing it to the build method
if isinstance(val, list):
params[key] = {k: v for item in value.get("value", []) for k, v in item.items()}
params[key] = {
k: v
for item in value.get("value", [])
for k, v in item.items()
}
elif isinstance(val, dict):
params[key] = val
elif value.get("type") == "int" and val is not None:
@ -292,7 +319,12 @@ class Vertex:
self._built = True
async def _run(self, user_id: str, inputs: Optional[dict] = None, session_id: Optional[str] = None):
async def _run(
self,
user_id: str,
inputs: Optional[dict] = None,
session_id: Optional[str] = None,
):
# user_id is just for compatibility with the other build methods
inputs = inputs or {}
# inputs = {key: value or "" for key, value in inputs.items()}
@ -307,7 +339,9 @@ class Vertex:
if isinstance(self._built_object, str):
self._built_result = self._built_object
result = await generate_result(self._built_object, inputs, self.has_external_output, session_id)
result = await generate_result(
self._built_object, inputs, self.has_external_output, session_id
)
self._built_result = result
async def _build_each_node_in_params_dict(self, user_id=None):
@ -335,7 +369,9 @@ class Vertex:
"""
return all(self._is_node(node) for node in value)
async def get_result(self, requester: Optional["Vertex"] = None, user_id=None, timeout=None) -> Any:
async def get_result(
self, requester: Optional["Vertex"] = None, user_id=None, timeout=None
) -> Any:
# PLEASE REVIEW THIS IF STATEMENT
# Check if the Vertex was built already
if self._built:
@ -369,7 +405,9 @@ class Vertex:
self._extend_params_list_with_result(key, result)
self.params[key] = result
async def _build_list_of_nodes_and_update_params(self, key, nodes: List["Vertex"], user_id=None):
async def _build_list_of_nodes_and_update_params(
self, key, nodes: List["Vertex"], user_id=None
):
"""
Iterates over a list of nodes, builds each and updates the params dictionary.
"""
@ -421,7 +459,9 @@ class Vertex:
self._update_built_object_and_artifacts(result)
except Exception as exc:
logger.exception(exc)
raise ValueError(f"Error building node {self.vertex_type}(ID:{self.id}): {str(exc)}") from exc
raise ValueError(
f"Error building node {self.vertex_type}(ID:{self.id}): {str(exc)}"
) from exc
def _update_built_object_and_artifacts(self, result):
"""
@ -458,7 +498,7 @@ class Vertex:
requester: Optional["Vertex"] = None,
**kwargs,
) -> Any:
if self.pinned:
if self.pinned and self._built:
return self.get_requester_result(requester)
self._reset()
@ -480,9 +520,15 @@ class Vertex:
return self._built_object
# Get the requester edge
requester_edge = next((edge for edge in self.edges if edge.target_id == requester.id), None)
requester_edge = next(
(edge for edge in self.edges if edge.target_id == requester.id), None
)
# Return the result of the requester edge
return None if requester_edge is None else await requester_edge.get_result(source=self, target=requester)
return (
None
if requester_edge is None
else await requester_edge.get_result(source=self, target=requester)
)
def add_edge(self, edge: "ContractEdge") -> None:
if edge not in self.edges:
@ -502,7 +548,11 @@ class Vertex:
def _built_object_repr(self):
# Add a message with an emoji, stars for sucess,
return "Built sucessfully ✨" if self._built_object is not None else "Failed to build 😵‍💫"
return (
"Built sucessfully ✨"
if self._built_object is not None
else "Failed to build 😵‍💫"
)
class StatefulVertex(Vertex):

View file

@ -5,7 +5,6 @@ from typing import Any, ClassVar, Optional
import emoji
from cachetools import TTLCache, cachedmethod
from fastapi import HTTPException
from langflow.interface.custom.code_parser import CodeParser
from langflow.interface.custom.eval import eval_custom_component_code
from langflow.utils import validate
@ -21,7 +20,9 @@ class ComponentFunctionEntrypointNameNullError(HTTPException):
class Component:
ERROR_CODE_NULL: ClassVar[str] = "Python code must be provided."
ERROR_FUNCTION_ENTRYPOINT_NAME_NULL: ClassVar[str] = "The name of the entrypoint function must be provided."
ERROR_FUNCTION_ENTRYPOINT_NAME_NULL: ClassVar[str] = (
"The name of the entrypoint function must be provided."
)
code: Optional[str] = None
_function_entrypoint_name: str = "build"
@ -36,10 +37,6 @@ class Component:
else:
setattr(self, key, value)
# Validate the emoji at the icon field
if hasattr(self, "icon") and self.icon:
self.icon = self.validate_icon(self.icon)
def __setattr__(self, key, value):
if key == "_user_id" and hasattr(self, "_user_id"):
warnings.warn("user_id is immutable and cannot be changed.")
@ -68,8 +65,8 @@ class Component:
return validate.create_function(self.code, self._function_entrypoint_name)
def getattr_return_str(self, component, value):
value = getattr(component, value)
def getattr_return_str(self, value):
return str(value) if value else ""
def build_template_config(self) -> dict:
@ -89,19 +86,22 @@ class Component:
for attribute, func in attributes_func_mapping.items():
if hasattr(component_instance, attribute):
template_config[attribute] = func(component=component_instance, value=attribute)
value = getattr(component_instance, attribute)
if value is not None:
template_config[attribute] = func(value=value)
return template_config
return template_config
def validate_icon(self, value: str, *args, **kwargs):
# we are going to use the emoji library to validate the emoji
# emojis can be defined using the :emoji_name: syntax
if not value.startswith(":") or not value.endswith(":"):
raise ValueError("Invalid emoji. Please use the :emoji_name: syntax.")
warnings.warn("Invalid emoji. Please use the :emoji_name: syntax.")
return value
emoji_value = emoji.emojize(value, variant="emoji_type")
if value == emoji_value:
raise ValueError(f"Invalid emoji. {value} is not a valid emoji.")
warnings.warn(f"Invalid emoji. {value} is not a valid emoji.")
return value
return emoji_value
def build(self, *args: Any, **kwargs: Any) -> Any:

View file

@ -1,11 +1,11 @@
import operator
from pathlib import Path
from typing import Any, Callable, ClassVar, List, Optional, Union
from uuid import UUID
import yaml
from cachetools import TTLCache, cachedmethod
from fastapi import HTTPException
from langflow.interface.custom.code_parser.utils import (
extract_inner_type_from_generic_alias,
extract_union_types_from_generic_alias,
@ -13,7 +13,11 @@ from langflow.interface.custom.code_parser.utils import (
from langflow.interface.custom.custom_component.component import Component
from langflow.services.database.models.flow import Flow
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_credential_service, get_db_service, get_storage_service
from langflow.services.deps import (
get_credential_service,
get_db_service,
get_storage_service,
)
from langflow.services.storage.service import StorageService
from langflow.utils import validate
@ -42,6 +46,16 @@ class CustomComponent(Component):
self.cache = TTLCache(maxsize=1024, ttl=60)
super().__init__(**data)
@staticmethod
def resolve_path(path: str) -> str:
"""Resolves the path to an absolute path."""
path_object = Path(path)
if path_object.parts[0] == "~":
path_object = path_object.expanduser()
elif path_object.is_relative_to("."):
path_object = path_object.resolve()
return str(path_object)
def get_full_path(self, path: str) -> str:
storage_svc: "StorageService" = get_storage_service()
@ -78,7 +92,8 @@ class CustomComponent(Component):
detail={
"error": "Type hint Error",
"traceback": (
"Prompt type is not supported in the build method." " Try using PromptTemplate instead."
"Prompt type is not supported in the build method."
" Try using PromptTemplate instead."
),
},
)
@ -92,14 +107,20 @@ class CustomComponent(Component):
if not self.code:
return {}
component_classes = [cls for cls in self.tree["classes"] if self.code_class_base_inheritance in cls["bases"]]
component_classes = [
cls
for cls in self.tree["classes"]
if self.code_class_base_inheritance in cls["bases"]
]
if not component_classes:
return {}
# Assume the first Component class is the one we're interested in
component_class = component_classes[0]
build_methods = [
method for method in component_class["methods"] if method["name"] == self.function_entrypoint_name
method
for method in component_class["methods"]
if method["name"] == self.function_entrypoint_name
]
return build_methods[0] if build_methods else {}
@ -112,7 +133,10 @@ class CustomComponent(Component):
return_type = build_method["return_type"]
# If list or List is in the return type, then we remove it and return the inner type
if hasattr(return_type, "__origin__") and return_type.__origin__ in [list, List]:
if hasattr(return_type, "__origin__") and return_type.__origin__ in [
list,
List,
]:
return_type = extract_inner_type_from_generic_alias(return_type)
# If the return type is not a Union, then we just return it as a list
@ -153,7 +177,9 @@ class CustomComponent(Component):
# Retrieve and decrypt the credential by name for the current user
db_service = get_db_service()
with session_getter(db_service) as session:
return credential_service.get_credential(user_id=self._user_id or "", name=name, session=session)
return credential_service.get_credential(
user_id=self._user_id or "", name=name, session=session
)
return get_credential
@ -163,7 +189,9 @@ class CustomComponent(Component):
credential_service = get_credential_service()
db_service = get_db_service()
with session_getter(db_service) as session:
return credential_service.list_credentials(user_id=self._user_id, session=session)
return credential_service.list_credentials(
user_id=self._user_id, session=session
)
def index(self, value: int = 0):
"""Returns a function that returns the value at the given index in the iterable."""
@ -214,7 +242,11 @@ class CustomComponent(Component):
if flow_id:
flow = session.query(Flow).get(flow_id)
elif flow_name:
flow = (session.query(Flow).filter(Flow.name == flow_name).filter(Flow.user_id == self.user_id)).first()
flow = (
session.query(Flow)
.filter(Flow.name == flow_name)
.filter(Flow.user_id == self.user_id)
).first()
else:
raise ValueError("Either flow_name or flow_id must be provided")

View file

@ -59,7 +59,9 @@ class ChatService(Service):
"""Send the last chat message to the client."""
client_id = self.chat_cache.current_client_id
if client_id in self.active_connections:
chat_response = self.chat_history.get_history(client_id, filter_messages=False)[-1]
chat_response = self.chat_history.get_history(
client_id, filter_messages=False
)[-1]
if chat_response.is_bot:
# Process FileResponse
if isinstance(chat_response, FileResponse):
@ -86,7 +88,9 @@ class ChatService(Service):
data_type=self.last_cached_object_dict["type"],
)
self.chat_history.add_message(self.chat_cache.current_client_id, chat_response)
self.chat_history.add_message(
self.chat_cache.current_client_id, chat_response
)
async def connect(self, client_id: str, websocket: WebSocket):
self.active_connections[client_id] = websocket
@ -209,7 +213,9 @@ class ChatService(Service):
await self.process_message(client_id, payload, build_result)
else:
raise RuntimeError(f"Could not find a build result for client_id {client_id}")
raise RuntimeError(
f"Could not find a build result for client_id {client_id}"
)
except Exception as exc:
# Handle any exceptions that might occur
logger.exception(f"Error handling websocket: {exc}")
@ -241,6 +247,12 @@ class ChatService(Service):
"""
return self.cache_service.get(client_id)
def clear_cache(self, client_id: str):
"""
Clear the cache for a client.
"""
self.cache_service.clear(client_id)
def dict_to_markdown_table(my_dict):
markdown_table = "| Key | Value |\n|---|---|\n"

View file

@ -7,7 +7,9 @@ from pydantic import BaseModel, Field, field_serializer, validator
class TransactionModel(BaseModel):
id: Optional[int] = Field(default=None, alias="id")
timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp")
timestamp: Optional[datetime] = Field(
default_factory=datetime.now, alias="timestamp"
)
source: str
target: str
target_args: dict
@ -89,18 +91,13 @@ class VertexBuildModel(BaseModel):
return v
# create a function that turns dicts into a
# dict like this:
# my_map_dict = {
# "key": [
# 1, 2, 3
# ],
# "value": [
# "one", "two", "three"
# ]
# }
# so map has a "key" and a "value" list
# containing the keys and values of the dict
class VertexBuildResponseModel(VertexBuildModel):
@field_serializer("data", "artifacts")
def serialize_dict(v):
return v
def to_map(value: dict):
keys = list(value.keys())
values = list(value.values())
@ -108,13 +105,13 @@ def to_map(value: dict):
class VertexBuildMapModel(BaseModel):
vertex_builds: dict[str, list[VertexBuildModel]]
vertex_builds: dict[str, list[VertexBuildResponseModel]]
@classmethod
def from_list_of_dicts(cls, vertex_build_dicts):
vertex_build_map = {}
for vertex_build_dict in vertex_build_dicts:
vertex_build = VertexBuildModel(**vertex_build_dict)
vertex_build = VertexBuildResponseModel(**vertex_build_dict)
if vertex_build.id not in vertex_build_map:
vertex_build_map[vertex_build.id] = []
vertex_build_map[vertex_build.id].append(vertex_build)

View file

@ -1,13 +1,12 @@
from pathlib import Path
import aiofiles
from loguru import logger
from .service import StorageService
class LocalStorageService(StorageService):
"""A service class for handling local storage operations."""
"""A service class for handling local storage operations without aiofiles."""
def __init__(self, session_service, settings_service):
"""Initialize the local storage service with session and settings services."""
@ -35,8 +34,8 @@ class LocalStorageService(StorageService):
file_path = folder_path / file_name
try:
async with aiofiles.open(file_path, "wb") as f:
await f.write(data)
with open(file_path, "wb") as f:
f.write(data)
logger.info(f"File {file_name} saved successfully in flow {flow_id}.")
except Exception as e:
logger.error(f"Error saving file {file_name} in flow {flow_id}: {e}")
@ -56,9 +55,9 @@ class LocalStorageService(StorageService):
logger.warning(f"File {file_name} not found in flow {flow_id}.")
raise FileNotFoundError(f"File {file_name} not found in flow {flow_id}")
async with aiofiles.open(file_path, "rb") as f:
with open(file_path, "rb") as f:
logger.info(f"File {file_name} retrieved successfully from flow {flow_id}.")
return await f.read()
return f.read()
async def list_files(self, flow_id: str):
"""
@ -89,8 +88,10 @@ class LocalStorageService(StorageService):
file_path.unlink()
logger.info(f"File {file_name} deleted successfully from flow {flow_id}.")
else:
logger.warning(f"Attempted to delete non-existent file {file_name} in flow {flow_id}.")
logger.warning(
f"Attempted to delete non-existent file {file_name} in flow {flow_id}."
)
def teardown(self):
"""Perform any cleanup operations when the service is being torn down."""
pass # No specific teardown actions required for local storage at the moment.
pass # No specific teardown actions required for local

View file

@ -406,13 +406,27 @@ export default function ParameterComponent({
data-testid={"textarea-" + data.node.template[name].name}
/>
) : (
<InputComponent
id={"input-" + index}
disabled={disabled}
password={data.node?.template[name].password ?? false}
value={data.node?.template[name].value ?? ""}
onChange={handleOnNewValue}
/>
<div className="mt-2 flex w-full items-center">
<div className="w-5/6 flex-grow">
<InputComponent
id={"input-" + index}
disabled={disabled}
password={data.node?.template[name].password ?? false}
value={data.node?.template[name].value ?? ""}
onChange={handleOnNewValue}
/>
</div>
{data.node?.template[name].refresh && (
<button
className="extra-side-bar-buttons ml-2 mt-1 w-1/6"
onClick={() => {
handleUpdateValues(name, data);
}}
>
<IconComponent name="RefreshCcw" />
</button>
)}
</div>
)}
</div>
) : left === true && type === "bool" ? (

View file

@ -1,4 +1,4 @@
import { useEffect, useState } from "react";
import { useCallback, useEffect, useState } from "react";
import { NodeToolbar } from "reactflow";
import ShadTooltip from "../../components/ShadTooltipComponent";
import Tooltip from "../../components/TooltipComponent";
@ -112,6 +112,39 @@ export default function GenericNode({
const nameEditable = data.node?.flow || data.type === "CustomComponent";
const emojiRegex = /\p{Emoji}/u;
const isEmoji = emojiRegex.test(data?.node?.icon!);
const iconNodeRender = useCallback(() => {
const iconElement = data?.node?.icon;
const iconColor = nodeColors[types[data.type]];
const iconName =
iconElement || (data.node?.flow ? "group_components" : name);
const iconClassName = `generic-node-icon ${
!showNode ? "absolute inset-x-6 h-12 w-12" : ""
}`;
if (iconElement && isEmoji) {
return nodeIconFragment(iconElement);
} else {
return checkNodeIconFragment(iconColor, iconName, iconClassName);
}
}, [data, isEmoji, name, showNode]);
const nodeIconFragment = (icon) => {
return <span className="text-lg">{icon}</span>;
};
const checkNodeIconFragment = (iconColor, iconName, iconClassName) => {
return (
<IconComponent
name={iconName}
className={iconClassName}
iconColor={iconColor}
/>
);
};
return (
<>
<NodeToolbar>
@ -164,19 +197,7 @@ export default function GenericNode({
(!showNode && "justify-center")
}
>
{data?.node?.icon ? (
<span className="text-lg">{data?.node?.icon}</span>
) : (
<IconComponent
name={data.node?.flow ? "group_components" : name}
className={
"generic-node-icon " +
(!showNode ? "absolute inset-x-6 h-12 w-12" : "")
}
iconColor={`${nodeColors[types[data.type]]}`}
/>
)}
{iconNodeRender()}
{showNode && (
<div className="generic-node-tooltip-div">
{nameEditable && inputName ? (

View file

@ -90,7 +90,7 @@ export default function ChatInput({
: "bg-chat-send text-background"
)}
disabled={lockChat}
onClick={(): void => sendMessage()}
onClick={(): void => sendMessage(repeat)}
>
{lockChat ? (
<IconComponent

View file

@ -680,5 +680,5 @@ export const LANGFLOW_SUPPORTED_TYPES = new Set([
export const priorityFields = new Set(["code", "template"]);
export const INPUT_TYPES = new Set(["ChatInput", "TextInput", "FileLoader"]);
export const INPUT_TYPES = new Set(["ChatInput", "TextInput"]);
export const OUTPUT_TYPES = new Set(["ChatOutput"]);

View file

@ -26,10 +26,11 @@ export async function buildVertices({
for (let i = 0; i < verticesOrder.length; i += 1) {
const innerArray = verticesOrder[i];
const idIndex = innerArray.indexOf(nodeId);
if (idIndex !== -1) {
// If the targetId is found in the inner array, cut the array before the id
vertices.push(innerArray.slice(0, idIndex + 1));
// If there's a nodeId, we want to run just that component and not the entire layer
// because a layer contains dependencies for the next layer
// and we are stopping at the layer that contains the nodeId
vertices.push([innerArray[idIndex]]);
break; // Stop searching after finding the first occurrence
}
// If the targetId is not found, include the entire inner array
@ -39,11 +40,14 @@ export async function buildVertices({
vertices = verticesOrder;
}
// Set each vertex state to building
const buildResults: Array<boolean> = [];
for (let i = 0; i < vertices.length; i += 1) {
await Promise.all(
vertices[i].map(async (id) => {
try {
// Set vertex state to building
const buildRes = await postBuildVertex(flowId, id);
const buildData: VertexBuildTypeAPI = buildRes.data;
if (onBuildUpdate) {

View file

@ -119,13 +119,7 @@ export function groupByFamily(
display_name?: string;
}> = [];
let checkedNodes = new Map();
const excludeTypes = new Set([
"bool",
"float",
"code",
"file",
"int",
]);
const excludeTypes = new Set(["bool", "float", "code", "file", "int"]);
const checkBaseClass = (template: TemplateVariableType) => {
return (
@ -134,12 +128,11 @@ export function groupByFamily(
((!excludeTypes.has(template.type) &&
baseClassesSet.has(template.type)) ||
(template.input_types &&
template.input_types.some((inputType) => {
baseClassesSet.has(inputType);
})))
template.input_types.some((inputType) =>
baseClassesSet.has(inputType)
)))
);
};
console.log(flow);
if (flow) {
// se existir o flow