feat: add logs field to ResultData and Vertex class (#2732)

* feat: add logs to ResultDataResponse in schemas.py

* feat(schema.py): add logs field to ResultData class to store log messages for better debugging and monitoring

* feat(vertex): add logs attribute to Vertex class to store logs for each vertex operation
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-07-16 15:24:55 -03:00 committed by GitHub
commit 5346db0d0c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 34 additions and 16 deletions

View file

@ -25,7 +25,7 @@ from langflow.api.v1.schemas import (
)
from langflow.exceptions.component import ComponentBuildException
from langflow.graph.graph.base import Graph
from langflow.schema.schema import OutputLog
from langflow.schema.schema import OutputValue
from langflow.services.auth.utils import get_current_active_user
from langflow.services.chat.service import ChatService
from langflow.services.deps import get_chat_service, get_session, get_session_service, get_telemetry_service
@ -218,7 +218,7 @@ async def build_vertex(
valid = False
error_message = params
output_label = vertex.outputs[0]["name"] if vertex.outputs else "output"
outputs = {output_label: OutputLog(message=message, type="error")}
outputs = {output_label: OutputValue(message=message, type="error")}
result_data_response = ResultDataResponse(results={}, outputs=outputs)
artifacts = {}
background_tasks.add_task(graph.end_all_traces, error=exc)

View file

@ -9,11 +9,12 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator, model_serial
from langflow.graph.schema import RunOutputs
from langflow.schema import dotdict
from langflow.schema.graph import Tweaks
from langflow.schema.schema import InputType, OutputLog, OutputType
from langflow.schema.schema import InputType, OutputType, OutputValue
from langflow.services.database.models.api_key.model import ApiKeyRead
from langflow.services.database.models.base import orjson_dumps
from langflow.services.database.models.flow import FlowCreate, FlowRead
from langflow.services.database.models.user import UserRead
from langflow.services.tracing.schema import Log
class BuildStatus(Enum):
@ -250,7 +251,8 @@ class VerticesOrderResponse(BaseModel):
class ResultDataResponse(BaseModel):
results: Optional[Any] = Field(default_factory=dict)
outputs: dict[str, OutputLog] = Field(default_factory=dict)
outputs: dict[str, OutputValue] = Field(default_factory=dict)
logs: dict[str, Log] = Field(default_factory=dict)
message: Optional[Any] = Field(default_factory=dict)
artifacts: Optional[Any] = Field(default_factory=dict)
timedelta: Optional[float] = None

View file

@ -9,6 +9,7 @@ from langflow.inputs.inputs import InputTypes
from langflow.schema.artifact import get_artifact_type, post_process_raw
from langflow.schema.data import Data
from langflow.schema.message import Message
from langflow.services.tracing.schema import Log
from langflow.template.field.base import UNDEFINED, Output
from .custom_component import CustomComponent
@ -38,12 +39,14 @@ class Component(CustomComponent):
inputs: List[InputTypes] = []
outputs: List[Output] = []
code_class_base_inheritance: ClassVar[str] = "Component"
_output_logs: dict[str, Log] = {}
def __init__(self, **data):
self._inputs: dict[str, InputTypes] = {}
self._results: dict[str, Any] = {}
self._attributes: dict[str, Any] = {}
self._parameters: dict[str, Any] = {}
self._output_logs = {}
super().__init__(**data)
if not hasattr(self, "trace_type"):
self.trace_type = "chain"
@ -189,6 +192,8 @@ class Component(CustomComponent):
raw, artifact_type = post_process_raw(raw, artifact_type)
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type}
_artifacts[output.name] = artifact
self._output_logs[output.name] = self._logs
self._logs = []
self._artifacts = _artifacts
self._results = _results
if self.tracing_service:

View file

@ -13,7 +13,7 @@ from langflow.schema import Data
from langflow.schema.artifact import get_artifact_type
from langflow.schema.dotdict import dotdict
from langflow.schema.log import LoggableType
from langflow.schema.schema import OutputLog
from langflow.schema.schema import OutputValue
from langflow.services.deps import get_storage_service, get_tracing_service, get_variable_service, session_scope
from langflow.services.storage.service import StorageService
from langflow.services.tracing.schema import Log
@ -84,7 +84,7 @@ class CustomComponent(BaseComponent):
status: Optional[Any] = None
"""The status of the component. This is displayed on the frontend. Defaults to None."""
_flows_data: Optional[List[Data]] = None
_outputs: List[OutputLog] = []
_outputs: List[OutputValue] = []
_logs: List[Log] = []
tracing_service: Optional["TracingService"] = None

View file

@ -4,7 +4,7 @@ from typing import Any, List, Optional
from pydantic import BaseModel, Field, field_serializer, model_validator
from langflow.graph.utils import serialize_field
from langflow.schema.schema import OutputLog, StreamURL
from langflow.schema.schema import OutputValue, StreamURL
from langflow.utils.schemas import ChatOutputResponse, ContainsEnumMeta
@ -12,6 +12,7 @@ class ResultData(BaseModel):
results: Optional[Any] = Field(default_factory=dict)
artifacts: Optional[Any] = Field(default_factory=dict)
outputs: Optional[dict] = Field(default_factory=dict)
logs: Optional[dict] = Field(default_factory=dict)
messages: Optional[list[ChatOutputResponse]] = Field(default_factory=list)
timedelta: Optional[float] = None
duration: Optional[str] = None
@ -40,9 +41,9 @@ class ResultData(BaseModel):
if "stream_url" in message and "type" in message:
stream_url = StreamURL(location=message["stream_url"])
values["outputs"].update({key: OutputLog(message=stream_url, type=message["type"])})
values["outputs"].update({key: OutputValue(message=stream_url, type=message["type"])})
elif "type" in message:
values["outputs"].update({OutputLog(message=message, type=message["type"])})
values["outputs"].update({OutputValue(message=message, type=message["type"])})
return values

View file

@ -17,14 +17,16 @@ from langflow.interface.listing import lazy_load_dict
from langflow.schema.artifact import ArtifactType
from langflow.schema.data import Data
from langflow.schema.message import Message
from langflow.schema.schema import INPUT_FIELD_NAME, OutputLog, build_output_logs
from langflow.schema.schema import INPUT_FIELD_NAME, OutputValue, build_output_logs
from langflow.services.deps import get_storage_service
from langflow.services.monitor.utils import log_transaction
from langflow.services.tracing.schema import Log
from langflow.utils.constants import DIRECT_TYPES
from langflow.utils.schemas import ChatOutputResponse
from langflow.utils.util import sync_to_async, unescape_string
if TYPE_CHECKING:
from langflow.custom import Component
from langflow.graph.edge.base import ContractEdge
from langflow.graph.graph.base import Graph
@ -82,7 +84,8 @@ class Vertex:
self.layer = None
self.result: Optional[ResultData] = None
self.results: Dict[str, Any] = {}
self.outputs_logs: Dict[str, OutputLog] = {}
self.outputs_logs: Dict[str, OutputValue] = {}
self.logs: Dict[str, Log] = {}
try:
self.is_interface_component = self.vertex_type in InterfaceComponentTypes
except ValueError:
@ -480,6 +483,7 @@ class Vertex:
results=result_dict,
artifacts=artifacts,
outputs=self.outputs_logs,
logs=self.logs,
messages=messages,
component_display_name=self.display_name,
component_id=self.id,
@ -643,13 +647,14 @@ class Vertex:
vertex=self,
)
self.outputs_logs = build_output_logs(self, result)
self._update_built_object_and_artifacts(result)
except Exception as exc:
tb = traceback.format_exc()
logger.exception(exc)
raise ComponentBuildException(f"Error building Component {self.display_name}:\n\n{exc}", tb) from exc
def _update_built_object_and_artifacts(self, result):
def _update_built_object_and_artifacts(self, result: Any | tuple[Any, dict] | tuple["Component", Any, dict]):
"""
Updates the built object and its artifacts.
"""
@ -658,8 +663,11 @@ class Vertex:
self._built_object, self.artifacts = result
elif len(result) == 3:
self._custom_component, self._built_object, self.artifacts = result
self.logs = self._custom_component._output_logs
self.artifacts_raw = self.artifacts.get("raw", None)
self.artifacts_type = self.artifacts.get("type", None) or ArtifactType.UNKNOWN.value
self.artifacts_type = {
self.outputs[0]["name"]: self.artifacts.get("type", None) or ArtifactType.UNKNOWN.value
}
self.artifacts = {self.outputs[0]["name"]: self.artifacts}
else:
self._built_object = result

View file

@ -47,6 +47,7 @@ class ComponentVertex(Vertex):
self._built_object, self.artifacts = result
elif len(result) == 3:
self._custom_component, self._built_object, self.artifacts = result
self.logs = self._custom_component._output_logs
for key in self.artifacts:
self.artifacts_raw[key] = self.artifacts[key].get("raw", None)
self.artifacts_type[key] = self.artifacts[key].get("type", None) or ArtifactType.UNKNOWN.value
@ -149,6 +150,7 @@ class ComponentVertex(Vertex):
results=result_dict,
artifacts=self.artifacts,
outputs=self.outputs_logs,
logs=self.logs,
messages=messages,
component_display_name=self.display_name,
component_id=self.id,

View file

@ -32,7 +32,7 @@ class ErrorLog(TypedDict):
stackTrace: str
class OutputLog(BaseModel):
class OutputValue(BaseModel):
message: Union[ErrorLog, StreamURL, dict, list, str]
type: str
@ -80,7 +80,7 @@ def get_message(payload):
def build_output_logs(vertex, result) -> dict:
outputs: dict[str, OutputLog] = dict()
outputs: dict[str, OutputValue] = dict()
component_instance = result[0]
for index, output in enumerate(vertex.outputs):
if component_instance.status is None:
@ -105,6 +105,6 @@ def build_output_logs(vertex, result) -> dict:
case LogType.UNKNOWN:
message = ""
name = output.get("name", f"output_{index}")
outputs |= {name: OutputLog(message=message, type=_type).model_dump()}
outputs |= {name: OutputValue(message=message, type=_type).model_dump()}
return outputs