refactor: rename ContractEdge to CycleEdge (#3334)

* refactor: Add CycleEdge class and make necessary adjustments in base.py files.

* refactor: Change ContractEdge to CycleEdge for consistency in vertex class properties.

* [autofix.ci] apply automated fixes

* refactor: Update type hints in types.py and base.py.

* [autofix.ci] apply automated fixes

* refactor(base.py): Update imports in base.py.

* [autofix.ci] apply automated fixes

* refactor: Changed ContractEdge to CycleEdge for building and getting edges.

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-08-14 14:39:16 -03:00 committed by GitHub
commit 1d8a24eb86
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 32 additions and 23 deletions

View file

@ -17,6 +17,7 @@ class Edge:
self.target_param: str | None = None
self._target_handle: TargetHandleDict | str | None = None
self._data = edge.copy()
self.is_cycle = False
if data := edge.get("data", {}):
self._source_handle = data.get("sourceHandle", {})
self._target_handle = cast(TargetHandleDict, data.get("targetHandle", {}))
@ -177,12 +178,16 @@ class Edge:
and self.target_param == __o.target_param
)
def __str__(self) -> str:
return self.__repr__()
class ContractEdge(Edge):
class CycleEdge(Edge):
def __init__(self, source: "Vertex", target: "Vertex", raw_edge: EdgeData):
super().__init__(source, target, raw_edge)
self.is_fulfilled = False # Whether the contract has been fulfilled.
self.result: Any = None
self.is_cycle = True
async def honor(self, source: "Vertex", target: "Vertex") -> None:
"""

View file

@ -13,7 +13,7 @@ import nest_asyncio
from loguru import logger
from langflow.exceptions.component import ComponentBuildException
from langflow.graph.edge.base import ContractEdge
from langflow.graph.edge.base import CycleEdge
from langflow.graph.edge.schema import EdgeData
from langflow.graph.graph.constants import Finish, lazy_load_vertex_dict
from langflow.graph.graph.runnable_vertices_manager import RunnableVerticesManager
@ -84,7 +84,7 @@ class Graph:
self.vertices_to_run: set[str] = set()
self.stop_vertex: Optional[str] = None
self.inactive_vertices: set = set()
self.edges: List[ContractEdge] = []
self.edges: List[CycleEdge] = []
self.vertices: List[Vertex] = []
self.run_manager = RunnableVerticesManager()
self.state_manager = GraphStateManager()
@ -100,6 +100,8 @@ class Graph:
self._first_layer: List[str] = []
self._lock = asyncio.Lock()
self.raw_graph_data: GraphData = {"nodes": [], "edges": []}
self._is_cyclic: Optional[bool] = None
self._cycles: Optional[List[tuple[str, str]]] = None
try:
self.tracing_service: "TracingService" | None = get_tracing_service()
except Exception as exc:
@ -107,7 +109,7 @@ class Graph:
self.tracing_service = None
if start is not None and end is not None:
self._set_start_and_end(start, end)
self.prepare()
self.prepare(start_component_id=start._id)
if (start is not None and end is None) or (start is None and end is not None):
raise ValueError("You must provide both input and output components")
@ -247,7 +249,7 @@ class Graph:
}
self._add_edge(edge_data)
async def async_start(self, inputs: Optional[List[dict]] = None):
async def async_start(self, inputs: Optional[List[dict]] = None, max_iterations: Optional[int] = None):
if not self._prepared:
raise ValueError("Graph not prepared. Call prepare() first.")
# The idea is for this to return a generator that yields the result of
@ -721,7 +723,7 @@ class Graph:
"flow_name": self.flow_name,
}
def build_graph_maps(self, edges: Optional[List[ContractEdge]] = None, vertices: Optional[List["Vertex"]] = None):
def build_graph_maps(self, edges: Optional[List[CycleEdge]] = None, vertices: Optional[List["Vertex"]] = None):
"""
Builds the adjacency maps for the graph.
"""
@ -776,7 +778,7 @@ class Graph:
continue
self.mark_branch(child_id, state)
def get_edge(self, source_id: str, target_id: str) -> Optional[ContractEdge]:
def get_edge(self, source_id: str, target_id: str) -> Optional[CycleEdge]:
"""Returns the edge between two vertices."""
for edge in self.edges:
if edge.source_id == source_id and edge.target_id == target_id:
@ -1242,7 +1244,7 @@ class Graph:
vertex_id: str,
is_target: Optional[bool] = None,
is_source: Optional[bool] = None,
) -> List[ContractEdge]:
) -> List[CycleEdge]:
"""Returns a list of edges for a given vertex."""
# The idea here is to return the edges that have the vertex_id as source or target
# or both
@ -1469,13 +1471,13 @@ class Graph:
neighbors[neighbor] += 1
return neighbors
def _build_edges(self) -> List[ContractEdge]:
def _build_edges(self) -> List[CycleEdge]:
"""Builds the edges of the graph."""
# Edge takes two vertices as arguments, so we need to build the vertices first
# and then build the edges
# if we can't find a vertex, we raise an error
edges: set[ContractEdge] = set()
edges: set[CycleEdge] = set()
for edge in self._edges:
new_edge = self.build_edge(edge)
edges.add(new_edge)
@ -1483,7 +1485,7 @@ class Graph:
warnings.warn("Graph has vertices but no edges")
return list(edges)
def build_edge(self, edge: EdgeData) -> ContractEdge:
def build_edge(self, edge: EdgeData) -> CycleEdge:
source = self.get_vertex(edge["source"])
target = self.get_vertex(edge["target"])
@ -1491,7 +1493,7 @@ class Graph:
raise ValueError(f"Source vertex {edge['source']} not found")
if target is None:
raise ValueError(f"Target vertex {edge['target']} not found")
new_edge = ContractEdge(source, target, edge)
new_edge = CycleEdge(source, target, edge)
return new_edge
def _get_vertex_class(self, node_type: str, node_base_type: str, node_id: str) -> Type["Vertex"]:
@ -1869,13 +1871,16 @@ class Graph:
top_level_vertices.append(vertex_id)
return top_level_vertices
def build_in_degree(self, edges: List[ContractEdge]) -> Dict[str, int]:
def build_in_degree(self, edges: List[CycleEdge]) -> Dict[str, int]:
in_degree: Dict[str, int] = defaultdict(int)
for edge in edges:
in_degree[edge.target_id] += 1
for vertex in self.vertices:
if vertex.id not in in_degree:
in_degree[vertex.id] = 0
return in_degree
def build_adjacency_maps(self, edges: List[ContractEdge]) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]:
def build_adjacency_maps(self, edges: List[CycleEdge]) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]:
"""Returns the adjacency maps for the graph."""
predecessor_map: dict[str, list[str]] = defaultdict(list)
successor_map: dict[str, list[str]] = defaultdict(list)

View file

@ -28,7 +28,7 @@ from langflow.utils.util import sync_to_async, unescape_string
if TYPE_CHECKING:
from langflow.custom import Component
from langflow.graph.edge.base import ContractEdge
from langflow.graph.edge.base import CycleEdge
from langflow.graph.graph.base import Graph
@ -162,15 +162,15 @@ class Vertex:
pass
@property
def edges(self) -> List["ContractEdge"]:
def edges(self) -> List["CycleEdge"]:
return self.graph.get_vertex_edges(self.id)
@property
def outgoing_edges(self) -> List["ContractEdge"]:
def outgoing_edges(self) -> List["CycleEdge"]:
return [edge for edge in self.edges if edge.source_id == self.id]
@property
def incoming_edges(self) -> List["ContractEdge"]:
def incoming_edges(self) -> List["CycleEdge"]:
return [edge for edge in self.edges if edge.target_id == self.id]
@property
@ -798,7 +798,7 @@ class Vertex:
else await requester_edge.get_result_from_source(source=self, target=requester)
)
def add_edge(self, edge: "ContractEdge") -> None:
def add_edge(self, edge: "CycleEdge") -> None:
if edge not in self.edges:
self.edges.append(edge)

View file

@ -7,7 +7,7 @@ from langchain_core.messages import AIMessage, AIMessageChunk
from loguru import logger
from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, log_transaction, serialize_field
from langflow.graph.utils import UnbuiltObject, log_transaction, log_vertex_build, serialize_field
from langflow.graph.vertex.base import Vertex
from langflow.graph.vertex.schema import NodeData
from langflow.inputs.inputs import InputTypes
@ -15,13 +15,12 @@ from langflow.schema import Data
from langflow.schema.artifact import ArtifactType
from langflow.schema.message import Message
from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.graph.utils import log_vertex_build
from langflow.template.field.base import UNDEFINED, Output
from langflow.utils.schemas import ChatOutputResponse, DataOutputResponse
from langflow.utils.util import unescape_string
if TYPE_CHECKING:
from langflow.graph.edge.base import ContractEdge
from langflow.graph.edge.base import CycleEdge
class CustomComponentVertex(Vertex):
@ -70,7 +69,7 @@ class ComponentVertex(Vertex):
for key, value in self._built_object.items():
self.add_result(key, value)
def get_edge_with_target(self, target_id: str) -> Generator["ContractEdge", None, None]:
def get_edge_with_target(self, target_id: str) -> Generator["CycleEdge", None, None]:
"""
Get the edge with the target id.