✨ (chat.py): Add telemetry service to log playground and component data for monitoring and debugging purposes.
This commit is contained in:
parent
59fae0830b
commit
6577a91c59
2 changed files with 46 additions and 3 deletions
|
|
@ -29,8 +29,10 @@ from langflow.graph.graph.base import Graph
|
|||
from langflow.schema.schema import OutputLog
|
||||
from langflow.services.auth.utils import get_current_active_user
|
||||
from langflow.services.chat.service import ChatService
|
||||
from langflow.services.deps import get_chat_service, get_session, get_session_service
|
||||
from langflow.services.deps import get_chat_service, get_session, get_session_service, get_telemetry_service
|
||||
from langflow.services.monitor.utils import log_vertex_build
|
||||
from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload
|
||||
from langflow.services.telemetry.service import TelemetryService
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langflow.graph.vertex.types import InterfaceVertex
|
||||
|
|
@ -58,11 +60,13 @@ async def try_running_celery_task(vertex, user_id):
|
|||
@router.post("/build/{flow_id}/vertices", response_model=VerticesOrderResponse)
|
||||
async def retrieve_vertices_order(
|
||||
flow_id: uuid.UUID,
|
||||
background_tasks: BackgroundTasks,
|
||||
data: Optional[Annotated[Optional[FlowDataRequest], Body(embed=True)]] = None,
|
||||
stop_component_id: Optional[str] = None,
|
||||
start_component_id: Optional[str] = None,
|
||||
chat_service: "ChatService" = Depends(get_chat_service),
|
||||
session=Depends(get_session),
|
||||
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
|
||||
):
|
||||
"""
|
||||
Retrieve the vertices order for a given flow.
|
||||
|
|
@ -81,6 +85,7 @@ async def retrieve_vertices_order(
|
|||
Raises:
|
||||
HTTPException: If there is an error checking the build status.
|
||||
"""
|
||||
start_time = time.perf_counter()
|
||||
try:
|
||||
flow_id_str = str(flow_id)
|
||||
# First, we need to check if the flow_id is in the cache
|
||||
|
|
@ -109,11 +114,28 @@ async def retrieve_vertices_order(
|
|||
# Now vertices is a list of lists
|
||||
# We need to get the id of each vertex
|
||||
# and return the same structure but only with the ids
|
||||
components_count = len(graph.vertices)
|
||||
vertices_to_run = list(graph.vertices_to_run) + get_top_level_vertices(graph, graph.vertices_to_run)
|
||||
await chat_service.set_cache(str(flow_id), graph)
|
||||
background_tasks.add_task(
|
||||
telemetry_service.log_package_playground,
|
||||
PlaygroundPayload(
|
||||
seconds=int(time.perf_counter() - start_time),
|
||||
componentCount=components_count,
|
||||
success=True,
|
||||
),
|
||||
)
|
||||
return VerticesOrderResponse(ids=first_layer, run_id=graph._run_id, vertices_to_run=vertices_to_run)
|
||||
|
||||
except Exception as exc:
|
||||
background_tasks.add_task(
|
||||
telemetry_service.log_package_playground,
|
||||
PlaygroundPayload(
|
||||
seconds=int(time.perf_counter() - start_time),
|
||||
componentCount=components_count,
|
||||
success=False,
|
||||
errorMessage=str(exc),
|
||||
),
|
||||
)
|
||||
if "stream or streaming set to True" in str(exc):
|
||||
raise HTTPException(status_code=400, detail=str(exc))
|
||||
logger.error(f"Error checking build status: {exc}")
|
||||
|
|
@ -130,6 +152,7 @@ async def build_vertex(
|
|||
files: Optional[list[str]] = None,
|
||||
chat_service: "ChatService" = Depends(get_chat_service),
|
||||
current_user=Depends(get_current_active_user),
|
||||
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
|
||||
):
|
||||
"""Build a vertex instead of the entire graph.
|
||||
|
||||
|
|
@ -152,8 +175,8 @@ async def build_vertex(
|
|||
|
||||
next_runnable_vertices = []
|
||||
top_level_vertices = []
|
||||
start_time = time.perf_counter()
|
||||
try:
|
||||
start_time = time.perf_counter()
|
||||
cache = await chat_service.get_cache(flow_id_str)
|
||||
if not cache:
|
||||
# If there's no cache
|
||||
|
|
@ -253,8 +276,26 @@ async def build_vertex(
|
|||
id=vertex.id,
|
||||
data=result_data_response,
|
||||
)
|
||||
background_tasks.add_task(
|
||||
telemetry_service.log_package_component,
|
||||
ComponentPayload(
|
||||
name=vertex_id,
|
||||
seconds=int(time.perf_counter() - start_time),
|
||||
success=valid,
|
||||
errorMessage=params,
|
||||
),
|
||||
)
|
||||
return build_response
|
||||
except Exception as exc:
|
||||
background_tasks.add_task(
|
||||
telemetry_service.log_package_component,
|
||||
ComponentPayload(
|
||||
name=vertex_id,
|
||||
seconds=int(time.perf_counter() - start_time),
|
||||
success=False,
|
||||
errorMessage=str(exc),
|
||||
),
|
||||
)
|
||||
logger.error(f"Error building Component:\n\n{exc}")
|
||||
logger.exception(exc)
|
||||
message = parse_exception(exc)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
from os import error
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
|
|
@ -26,6 +27,7 @@ class PlaygroundPayload(BaseModel):
|
|||
seconds: int
|
||||
componentCount: int
|
||||
success: bool
|
||||
errorMessage: str = ""
|
||||
|
||||
|
||||
class ComponentPayload(BaseModel):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue