diff --git a/src/backend/base/langflow/api/utils.py b/src/backend/base/langflow/api/utils.py index ffb5f22c9..cc38b474a 100644 --- a/src/backend/base/langflow/api/utils.py +++ b/src/backend/base/langflow/api/utils.py @@ -286,7 +286,7 @@ async def get_next_runnable_vertices( for v_id in set(next_runnable_vertices): # Use set to avoid duplicates graph.vertices_to_run.remove(v_id) graph.remove_from_predecessors(v_id) - await chat_service.set_cache(flow_id=flow_id, data=graph, lock=lock) + await chat_service.set_cache(key=flow_id, data=graph, lock=lock) return next_runnable_vertices diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 23dea900b..fbb763e8d 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -1,6 +1,5 @@ import time import uuid -from functools import partial from typing import TYPE_CHECKING, Annotated, Optional from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException @@ -162,7 +161,6 @@ async def build_vertex( vertex = graph.get_vertex(vertex_id) try: lock = chat_service._cache_locks[flow_id_str] - set_cache_coro = partial(chat_service.set_cache, flow_id=flow_id_str) ( next_runnable_vertices, top_level_vertices, @@ -173,7 +171,7 @@ async def build_vertex( vertex, ) = await graph.build_vertex( lock=lock, - set_cache_coro=set_cache_coro, + chat_service=chat_service, vertex_id=vertex_id, user_id=current_user.id, inputs_dict=inputs.model_dump() if inputs else {}, diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index 419c3d1f9..9ccdb0085 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -248,6 +248,7 @@ class ResultDataResponse(BaseModel): artifacts: Optional[Any] = Field(default_factory=dict) timedelta: Optional[float] = None duration: Optional[str] = None + used_frozen_result: Optional[bool] = False class VertexBuildResponse(BaseModel): diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 1712f84d2..956fda7bf 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -17,6 +17,8 @@ from langflow.graph.vertex.base import Vertex from langflow.graph.vertex.types import InterfaceVertex, StateVertex from langflow.schema import Record from langflow.schema.schema import INPUT_FIELD_NAME, InputType +from langflow.services.cache.utils import CacheMiss +from langflow.services.chat.service import ChatService from langflow.services.deps import get_chat_service if TYPE_CHECKING: @@ -704,7 +706,7 @@ class Graph: async def build_vertex( self, lock: asyncio.Lock, - set_cache_coro: Callable[["Graph", asyncio.Lock], Coroutine], + chat_service: ChatService, vertex_id: str, inputs_dict: Optional[Dict[str, str]] = None, user_id: Optional[str] = None, @@ -729,17 +731,35 @@ class Graph: """ vertex = self.get_vertex(vertex_id) try: - if not vertex.frozen or not vertex._built: + params = "" + if vertex.frozen: + # Check the cache for the vertex + cached_result = await chat_service.get_cache(key=vertex.id) + if isinstance(cached_result, CacheMiss): + await vertex.build(user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars) + await chat_service.set_cache(key=vertex.id, data=vertex) + else: + cached_vertex = cached_result["result"] + # Now set update the vertex with the cached vertex + vertex._built = cached_vertex._built + vertex.result = cached_vertex.result + vertex.artifacts = cached_vertex.artifacts + vertex._built_object = cached_vertex._built_object + vertex._custom_component = cached_vertex._custom_component + if vertex.result is not None: + vertex.result.used_frozen_result = True + + else: await vertex.build(user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars) if vertex.result is not None: - params = vertex._built_object_repr() + params = f"{vertex._built_object_repr()}{params}" valid = True result_dict = vertex.result artifacts = vertex.artifacts else: raise ValueError(f"No result found for vertex {vertex_id}") - + set_cache_coro = partial(chat_service.set_cache, key=self.flow_id) next_runnable_vertices, top_level_vertices = await self.get_next_and_top_level_vertices( lock, set_cache_coro, vertex ) @@ -810,11 +830,10 @@ class Graph: for vertex_id in current_batch: vertex = self.get_vertex(vertex_id) lock = chat_service._cache_locks[self.run_id] - set_cache_coro = partial(chat_service.set_cache, flow_id=self.run_id) task = asyncio.create_task( self.build_vertex( lock=lock, - set_cache_coro=set_cache_coro, + chat_service=chat_service, vertex_id=vertex_id, user_id=self.user_id, inputs_dict={}, diff --git a/src/backend/base/langflow/graph/schema.py b/src/backend/base/langflow/graph/schema.py index 18010de4c..60e7ab590 100644 --- a/src/backend/base/langflow/graph/schema.py +++ b/src/backend/base/langflow/graph/schema.py @@ -15,6 +15,7 @@ class ResultData(BaseModel): duration: Optional[str] = None component_display_name: Optional[str] = None component_id: Optional[str] = None + used_frozen_result: Optional[bool] = False @field_serializer("results") def serialize_results(self, value): diff --git a/src/backend/base/langflow/services/cache/service.py b/src/backend/base/langflow/services/cache/service.py index 4ae4dc540..2aa187b22 100644 --- a/src/backend/base/langflow/services/cache/service.py +++ b/src/backend/base/langflow/services/cache/service.py @@ -9,6 +9,9 @@ from loguru import logger from langflow.services.base import Service from langflow.services.cache.base import AsyncBaseCacheService, CacheService +from langflow.services.cache.utils import CacheMiss + +CACHE_MISS = CacheMiss() class ThreadingInMemoryCache(CacheService, Service): @@ -341,12 +344,14 @@ class AsyncInMemoryCache(AsyncBaseCacheService, Service): async def _get(self, key): item = self.cache.get(key, None) - if item and (time.time() - item["time"] < self.expiration_time): - self.cache.move_to_end(key) - return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] if item: - await self.delete(key) - return None + if time.time() - item["time"] < self.expiration_time: + self.cache.move_to_end(key) + return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] + else: + logger.info(f"Cache item for key '{key}' has expired and will be deleted.") + await self.delete(key) # Log before deleting the expired item + return CACHE_MISS async def set(self, key, value, lock: Optional[asyncio.Lock] = None): if not lock: diff --git a/src/backend/base/langflow/services/cache/utils.py b/src/backend/base/langflow/services/cache/utils.py index 129e9a6b7..ff19836ef 100644 --- a/src/backend/base/langflow/services/cache/utils.py +++ b/src/backend/base/langflow/services/cache/utils.py @@ -19,6 +19,11 @@ CACHE_DIR = user_cache_dir("langflow", "langflow") PREFIX = "langflow_cache" +class CacheMiss: + def __repr__(self): + return "" + + def create_cache_folder(func): def wrapper(*args, **kwargs): # Get the destination folder diff --git a/src/backend/base/langflow/services/chat/service.py b/src/backend/base/langflow/services/chat/service.py index 072920418..042a541a3 100644 --- a/src/backend/base/langflow/services/chat/service.py +++ b/src/backend/base/langflow/services/chat/service.py @@ -13,7 +13,7 @@ class ChatService(Service): self._cache_locks = defaultdict(asyncio.Lock) self.cache_service = get_cache_service() - async def set_cache(self, flow_id: str, data: Any, lock: Optional[asyncio.Lock] = None) -> bool: + async def set_cache(self, key: str, data: Any, lock: Optional[asyncio.Lock] = None) -> bool: """ Set the cache for a client. """ @@ -23,17 +23,17 @@ class ChatService(Service): "result": data, "type": type(data), } - await self.cache_service.upsert(flow_id, result_dict, lock=lock or self._cache_locks[flow_id]) - return flow_id in self.cache_service + await self.cache_service.upsert(key, result_dict, lock=lock or self._cache_locks[key]) + return key in self.cache_service - async def get_cache(self, flow_id: str, lock: Optional[asyncio.Lock] = None) -> Any: + async def get_cache(self, key: str, lock: Optional[asyncio.Lock] = None) -> Any: """ Get the cache for a client. """ - return await self.cache_service.get(flow_id, lock=lock or self._cache_locks[flow_id]) + return await self.cache_service.get(key, lock=lock or self._cache_locks[key]) - async def clear_cache(self, flow_id: str, lock: Optional[asyncio.Lock] = None): + async def clear_cache(self, key: str, lock: Optional[asyncio.Lock] = None): """ Clear the cache for a client. """ - await self.cache_service.delete(flow_id, lock=lock or self._cache_locks[flow_id]) + await self.cache_service.delete(key, lock=lock or self._cache_locks[key]) diff --git a/src/frontend/src/customNodes/genericNode/components/parameterComponent/index.tsx b/src/frontend/src/customNodes/genericNode/components/parameterComponent/index.tsx index 1b39e25bd..43078ba08 100644 --- a/src/frontend/src/customNodes/genericNode/components/parameterComponent/index.tsx +++ b/src/frontend/src/customNodes/genericNode/components/parameterComponent/index.tsx @@ -89,7 +89,7 @@ export default function ParameterComponent({ setNode, renderTooltips, isLoading, - setIsLoading, + setIsLoading ); const { handleNodeClass: handleNodeClassHook } = useHandleNodeClass( @@ -98,7 +98,7 @@ export default function ParameterComponent({ takeSnapshot, setNode, updateNodeInternals, - renderTooltips, + renderTooltips ); const { handleRefreshButtonPress: handleRefreshButtonPressHook } = @@ -107,7 +107,7 @@ export default function ParameterComponent({ let disabled = edges.some( (edge) => - edge.targetHandle === scapedJSONStringfy(proxy ? { ...id, proxy } : id), + edge.targetHandle === scapedJSONStringfy(proxy ? { ...id, proxy } : id) ) ?? false; const handleRefreshButtonPress = async (name, data) => { @@ -120,12 +120,12 @@ export default function ParameterComponent({ handleUpdateValues, setNode, renderTooltips, - setIsLoading, + setIsLoading ); const handleOnNewValue = async ( newValue: string | string[] | boolean | Object[], - skipSnapshot: boolean | undefined = false, + skipSnapshot: boolean | undefined = false ): Promise => { handleOnNewValueHook(newValue, skipSnapshot); }; @@ -207,7 +207,7 @@ export default function ParameterComponent({ className={classNames( left ? "my-12 -ml-0.5 " : " my-12 -mr-0.5 ", "h-3 w-3 rounded-full border-2 bg-background", - !showNode ? "mt-0" : "", + !showNode ? "mt-0" : "" )} style={{ borderColor: color ?? nodeColors.unknown, @@ -238,7 +238,7 @@ export default function ParameterComponent({ (left ? "" : " justify-end") } > - +
@@ -296,7 +296,7 @@ export default function ParameterComponent({ } className={classNames( left ? "-ml-0.5" : "-mr-0.5", - "h-3 w-3 rounded-full border-2 bg-background", + "h-3 w-3 rounded-full border-2 bg-background" )} style={{ borderColor: color ?? nodeColors.unknown }} onClick={() => setFilterEdge(groupedEdge.current)} diff --git a/src/frontend/src/customNodes/genericNode/index.tsx b/src/frontend/src/customNodes/genericNode/index.tsx index 26dc0f764..5d70d6b18 100644 --- a/src/frontend/src/customNodes/genericNode/index.tsx +++ b/src/frontend/src/customNodes/genericNode/index.tsx @@ -28,9 +28,9 @@ import { NodeDataType } from "../../types/flow"; import { handleKeyDown, scapedJSONStringfy } from "../../utils/reactflowUtils"; import { nodeColors, nodeIconsLucide } from "../../utils/styleUtils"; import { classNames, cn } from "../../utils/utils"; -import ParameterComponent from "./components/parameterComponent"; import getFieldTitle from "../utils/get-field-title"; import sortFields from "../utils/sort-fields"; +import ParameterComponent from "./components/parameterComponent"; export default function GenericNode({ data, @@ -56,14 +56,14 @@ export default function GenericNode({ const [nodeName, setNodeName] = useState(data.node!.display_name); const [inputDescription, setInputDescription] = useState(false); const [nodeDescription, setNodeDescription] = useState( - data.node?.description!, + data.node?.description! ); const [isOutdated, setIsOutdated] = useState(false); const buildStatus = useFlowStore( - (state) => state.flowBuildStatus[data.id]?.status, + (state) => state.flowBuildStatus[data.id]?.status ); const lastRunTime = useFlowStore( - (state) => state.flowBuildStatus[data.id]?.timestamp, + (state) => state.flowBuildStatus[data.id]?.timestamp ); const [validationStatus, setValidationStatus] = useState(null); @@ -120,7 +120,7 @@ export default function GenericNode({ updateNodeInternals(data.id); }, - [data.id, data.node, setNode, setIsOutdated], + [data.id, data.node, setNode, setIsOutdated] ); if (!data.node!.template) { @@ -260,7 +260,7 @@ export default function GenericNode({ const isDark = useDarkStore((state) => state.dark); const renderIconStatus = ( buildStatus: BuildStatus | undefined, - validationStatus: validationStatusType | null, + validationStatus: validationStatusType | null ) => { if (buildStatus === BuildStatus.BUILDING) { return ; @@ -301,7 +301,7 @@ export default function GenericNode({ }; const getSpecificClassFromBuildStatus = ( buildStatus: BuildStatus | undefined, - validationStatus: validationStatusType | null, + validationStatus: validationStatusType | null ) => { let isInvalid = validationStatus && !validationStatus.valid; @@ -325,25 +325,31 @@ export default function GenericNode({ selected: boolean, showNode: boolean, buildStatus: BuildStatus | undefined, - validationStatus: validationStatusType | null, + validationStatus: validationStatusType | null ) => { const specificClassFromBuildStatus = getSpecificClassFromBuildStatus( buildStatus, - validationStatus, + validationStatus ); const baseBorderClass = getBaseBorderClass(selected); const nodeSizeClass = getNodeSizeClass(showNode); - return classNames( + const names = classNames( baseBorderClass, nodeSizeClass, "generic-node-div", - specificClassFromBuildStatus, + specificClassFromBuildStatus ); + console.log("names", names); + return names; }; - const getBaseBorderClass = (selected) => - selected ? "border border-ring" : "border"; + const getBaseBorderClass = (selected) => { + console.log("data.node?.frozen", data.node?.frozen); + let className = selected ? "border border-ring" : "border"; + let frozenClass = selected ? "border-ring-frozen" : "border-frozen"; + return data.node?.frozen ? frozenClass : className; + }; const getNodeSizeClass = (showNode) => showNode ? "w-96 rounded-lg" : "w-26 h-26 rounded-full"; @@ -394,7 +400,7 @@ export default function GenericNode({ selected, showNode, buildStatus, - validationStatus, + validationStatus )} > {data.node?.beta && showNode && ( @@ -539,7 +545,7 @@ export default function GenericNode({ } title={getFieldTitle( data.node?.template!, - templateField, + templateField )} info={data.node?.template[templateField].info} name={templateField} @@ -567,7 +573,7 @@ export default function GenericNode({ proxy={data.node?.template[templateField].proxy} showNode={showNode} /> - ), + ) )} { setInputDescription(true); @@ -786,13 +792,13 @@ export default function GenericNode({ } title={getFieldTitle( data.node?.template!, - templateField, + templateField )} info={data.node?.template[templateField].info} name={templateField} tooltipTitle={ data.node?.template[templateField].input_types?.join( - "\n", + "\n" ) ?? data.node?.template[templateField].type } required={data.node!.template[templateField].required} @@ -819,7 +825,7 @@ export default function GenericNode({
{" "} diff --git a/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx b/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx index cbf941fbb..7d1d33d5f 100644 --- a/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx +++ b/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx @@ -29,7 +29,7 @@ import { expandGroupNode, updateFlowPosition, } from "../../../../utils/reactflowUtils"; -import { classNames } from "../../../../utils/utils"; +import { classNames, cn } from "../../../../utils/utils"; import ToolbarSelectItem from "./toolbarSelectItem"; export default function NodeToolbarComponent({ @@ -58,7 +58,7 @@ export default function NodeToolbarComponent({ data.node.template[templateField].type === "Any" || data.node.template[templateField].type === "int" || data.node.template[templateField].type === "dict" || - data.node.template[templateField].type === "NestedDict"), + data.node.template[templateField].type === "NestedDict") ).length; const templates = useTypesStore((state) => state.templates); const hasStore = useStoreStore((state) => state.hasStore); @@ -68,7 +68,7 @@ export default function NodeToolbarComponent({ const isMinimal = numberOfHandles <= 1; const isGroup = data.node?.flow ? true : false; - // const frozen = data.node?.frozen ?? false; + const frozen = data.node?.frozen ?? false; const paste = useFlowStore((state) => state.paste); const nodes = useFlowStore((state) => state.nodes); const edges = useFlowStore((state) => state.edges); @@ -85,7 +85,7 @@ export default function NodeToolbarComponent({ const [showconfirmShare, setShowconfirmShare] = useState(false); const [showOverrideModal, setShowOverrideModal] = useState(false); const [flowComponent, setFlowComponent] = useState( - createFlowComponent(cloneDeep(data), version), + createFlowComponent(cloneDeep(data), version) ); const openInNewTab = (url) => { @@ -100,7 +100,7 @@ export default function NodeToolbarComponent({ const updateNodeInternals = useUpdateNodeInternals(); const setLastCopiedSelection = useFlowStore( - (state) => state.setLastCopiedSelection, + (state) => state.setLastCopiedSelection ); const setSuccessData = useAlertStore((state) => state.setSuccessData); @@ -150,7 +150,7 @@ export default function NodeToolbarComponent({ nodes, edges, setNodes, - setEdges, + setEdges ); break; case "override": @@ -174,7 +174,7 @@ export default function NodeToolbarComponent({ y: 10, paneX: nodes.find((node) => node.id === data.id)?.position.x, paneY: nodes.find((node) => node.id === data.id)?.position.y, - }, + } ); break; case "update": @@ -212,13 +212,13 @@ export default function NodeToolbarComponent({ }; const isSaved = flows.some((flow) => - Object.values(flow).includes(data.node?.display_name!), + Object.values(flow).includes(data.node?.display_name!) ); const setNode = useFlowStore((state) => state.setNode); const handleOnNewValue = ( - newValue: string | string[] | boolean | Object[], + newValue: string | string[] | boolean | Object[] ): void => { if (data.node!.template[name].value !== newValue) { takeSnapshot(); @@ -401,7 +401,7 @@ export default function NodeToolbarComponent({ data-testid="save-button-modal" className={classNames( "relative -ml-px inline-flex items-center bg-background px-2 py-2 text-foreground shadow-md ring-1 ring-inset ring-ring transition-all duration-500 ease-in-out hover:bg-muted focus:z-10", - hasCode ? " " : " rounded-l-md ", + hasCode ? " " : " rounded-l-md " )} onClick={(event) => { event.preventDefault(); @@ -419,7 +419,7 @@ export default function NodeToolbarComponent({ - {/* + - */} +