🐛 (chat.py): Fix issue with logs assignment in build_vertex function

🐛 (schemas.py): Fix issue with logs field type in ResultDataResponse class
🐛 (Prompt.py): Fix issue with kwargs assignment in build_prompt method
🐛 (component.py): Fix issue with results and arguments assignment in Component class
🐛 (schema.py): Fix issue with logs field type in ResultData class

📝 (types.py): Add ResultData import and build_logs_from_artifacts function for better logging and message extraction
📝 (setup.py): Set DEFAULT_PROMPT_INTUT_TYPES for missing template keys to provide default input types
📝 (loading.py): Update build_component function to return artifacts along with build_results
📝 (schema.py): Add build_logs_from_artifacts function to generate logs from artifacts
📝 (utils.py): Update log_transaction function to use source instead of vertex and handle source result for outputs
📝 (prompt.py): Define DEFAULT_PROMPT_INTUT_TYPES constant and use it for missing template input types
This commit is contained in:
ogabrielluiz 2024-06-11 15:44:05 -03:00
commit cda65c07de
13 changed files with 148 additions and 35 deletions

View file

@ -180,14 +180,21 @@ async def build_vertex(
inputs_dict=inputs.model_dump() if inputs else {},
files=files,
)
log_obj = Log(message=vertex.artifacts_raw, type=vertex.artifacts_type)
if isinstance(vertex.artifacts_raw, dict):
logs = {}
for key in vertex.artifacts_raw:
log_obj = Log(message=vertex.artifacts_raw[key], type=vertex.artifacts_type[key])
logs[key] = log_obj
else:
logs = [Log(message=vertex.artifacts_raw, type=vertex.artifacts_type)]
result_data_response = ResultDataResponse(**result_dict.model_dump())
except Exception as exc:
logger.exception(f"Error building vertex: {exc}")
params = format_exception_message(exc)
valid = False
log_obj = Log(message=params, type="error")
logs = [Log(message=params, type="error")]
result_data_response = ResultDataResponse(results={})
artifacts = {}
# If there's an error building the vertex
@ -195,7 +202,7 @@ async def build_vertex(
await chat_service.clear_cache(flow_id_str)
result_data_response.message = artifacts
result_data_response.logs.append(log_obj)
result_data_response.logs = logs
# Log the vertex build
if not vertex.will_stream:

View file

@ -245,7 +245,7 @@ class VerticesOrderResponse(BaseModel):
class ResultDataResponse(BaseModel):
results: Optional[Any] = Field(default_factory=dict)
logs: List[Log | None] = Field(default_factory=list)
logs: dict[str, list[Log]] = Field(default_factory=dict)
message: Optional[Any] = Field(default_factory=dict)
artifacts: Optional[Any] = Field(default_factory=dict)
timedelta: Optional[float] = None

View file

@ -33,8 +33,8 @@ class ChatInput(ChatComponent):
),
]
outputs = [
Output(display_name="Message", name="message", method="text_response"),
Output(display_name="Record", name="record", method="message_response"),
Output(display_name="Text", name="text", method="text_response"),
Output(display_name="Message", name="message", method="message_response"),
]
def text_response(self) -> Text:

View file

@ -31,6 +31,7 @@ class PromptComponent(Component):
async def build_prompt(
self,
) -> Prompt:
prompt = await Prompt.from_template_and_variables(self.template, self.kwargs)
kwargs = {k: v for k, v in self._arguments.items() if k != "template"}
prompt = await Prompt.from_template_and_variables(self.template, kwargs)
self.status = prompt.format_text()
return prompt

View file

@ -17,6 +17,7 @@ import yaml
from loguru import logger
from pydantic import BaseModel
from langflow.schema.artifact import get_artifact_type, post_process_raw
from langflow.schema.record import Record
from langflow.template.field.base import UNDEFINED, Input, Output
@ -51,12 +52,15 @@ class Component(CustomComponent):
inputs: Optional[List[Input]] = None
outputs: Optional[List[Output]] = None
code_class_base_inheritance: ClassVar[str] = "Component"
_results: dict = {}
_arguments: dict = {}
def set_attributes(self, params: dict):
for key, value in params.items():
if key in self.__dict__:
raise ValueError(f"Key {key} already exists in {self.__class__.__name__}")
setattr(self, key, value)
self._arguments = params
def _set_outputs(self, outputs: List[dict]):
self.outputs = [Output(**output) for output in outputs]
@ -65,7 +69,7 @@ class Component(CustomComponent):
async def build_results(self, vertex: "Vertex"):
_results = {}
_artifacts = {}
if hasattr(self, "outputs"):
self._set_outputs(vertex.outputs)
for output in self.outputs:
@ -73,7 +77,7 @@ class Component(CustomComponent):
# or if it's not connected to any vertex
if not vertex.outgoing_edges or output.name in vertex.edges_source_names:
method: Callable | Awaitable = getattr(self, output.method)
if output.cache and not isinstance(output.value, UNDEFINED):
if output.cache and output.value != UNDEFINED:
_results[output.name] = output.value
else:
result = method()
@ -82,8 +86,24 @@ class Component(CustomComponent):
result = await result
_results[output.name] = result
output.value = result
custom_repr = self.custom_repr()
if custom_repr is None and isinstance(result, (dict, Record, str)):
custom_repr = result
if not isinstance(custom_repr, str):
custom_repr = str(custom_repr)
raw = self.status
if hasattr(raw, "data") and raw is not None:
raw = raw.data
return _results
elif hasattr(raw, "model_dump") and raw is not None:
raw = raw.model_dump()
artifact_type = get_artifact_type(self.status, result)
raw = post_process_raw(raw, artifact_type)
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type}
_artifacts[output.name] = artifact
self._artifacts = _artifacts
self._results = _results
return _results, _artifacts
def custom_repr(self):
# ! Temporary REPR

View file

@ -11,7 +11,7 @@ from langflow.utils.schemas import ChatOutputResponse, ContainsEnumMeta
class ResultData(BaseModel):
results: Optional[Any] = Field(default_factory=dict)
artifacts: Optional[Any] = Field(default_factory=dict)
logs: Optional[List[dict]] = Field(default_factory=list)
logs: Optional[dict] = Field(default_factory=dict)
messages: Optional[list[ChatOutputResponse]] = Field(default_factory=list)
timedelta: Optional[float] = None
duration: Optional[str] = None
@ -30,17 +30,19 @@ class ResultData(BaseModel):
def validate_model(cls, values):
if not values.get("logs") and values.get("artifacts"):
# Build the log from the artifacts
message = values["artifacts"]
# ! Temporary fix
if not isinstance(message, dict):
message = {"message": message}
for key in values["artifacts"]:
message = values["artifacts"][key]
if "stream_url" in message and "type" in message:
stream_url = StreamURL(location=message["stream_url"])
values["logs"] = [Log(message=stream_url, type=message["type"])]
elif "type" in message:
values["logs"] = [Log(message=message, type=message["type"])]
# ! Temporary fix
if not isinstance(message, dict):
message = {"message": message}
if "stream_url" in message and "type" in message:
stream_url = StreamURL(location=message["stream_url"])
values["logs"].update({key: Log(message=stream_url, type=message["type"])})
elif "type" in message:
values["logs"].update({Log(message=message, type=message["type"])})
return values

View file

@ -64,8 +64,8 @@ class Vertex:
self._built_result = None
self._built = False
self.artifacts: Dict[str, Any] = {}
self.artifacts_raw: Any = None
self.artifacts_type: Optional[str] = None
self.artifacts_raw: Dict[str, Any] = {}
self.artifacts_type: Dict[str, str] = {}
self.steps: List[Callable] = [self._build]
self.steps_ran: List[Callable] = []
self.task_id: Optional[str] = None
@ -555,11 +555,11 @@ class Vertex:
"""
flow_id = self.graph.flow_id
if not self._built:
log_transaction(flow_id, vertex=self, target=requester, status="error")
log_transaction(flow_id, source=self, target=requester, status="error")
raise ValueError(f"Component {self.display_name} has not been built yet")
result = self._built_result if self.use_result else self._built_object
log_transaction(flow_id, vertex=self, target=requester, status="success")
log_transaction(flow_id, source=self, target=requester, status="success")
return result
async def _build_vertex_and_update_params(self, key, vertex: "Vertex"):

View file

@ -5,12 +5,12 @@ import yaml
from langchain_core.messages import AIMessage, AIMessageChunk
from loguru import logger
from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes
from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, serialize_field
from langflow.graph.vertex.base import Vertex
from langflow.schema import Record
from langflow.schema.artifact import ArtifactType
from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.schema.schema import INPUT_FIELD_NAME, Log, build_logs_from_artifacts
from langflow.services.monitor.utils import log_transaction, log_vertex_build
from langflow.utils.schemas import ChatOutputResponse, RecordOutputResponse
from langflow.utils.util import unescape_string
@ -22,6 +22,7 @@ if TYPE_CHECKING:
class CustomComponentVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="custom_components")
self.logs: Dict[str, Log] = {}
def _built_object_repr(self):
if self.artifacts and "repr" in self.artifacts:
@ -45,6 +46,10 @@ class ComponentVertex(Vertex):
self._built_object, self.artifacts = result
elif len(result) == 3:
self._custom_component, self._built_object, self.artifacts = result
for key in self.artifacts:
self.artifacts_raw[key] = self.artifacts[key].get("raw", None)
self.artifacts_type[key] = self.artifacts[key].get("type", None) or ArtifactType.UNKNOWN.value
self.logs = build_logs_from_artifacts(self.artifacts)
else:
self._built_object = result
@ -92,6 +97,58 @@ class ComponentVertex(Vertex):
log_transaction(source=self, target=requester, flow_id=self.graph.flow_id, status="success")
return result
def extract_messages_from_artifacts(self, artifacts: Dict[str, Any]) -> List[dict]:
"""
Extracts messages from the artifacts.
Args:
artifacts (Dict[str, Any]): The artifacts to extract messages from.
Returns:
List[str]: The extracted messages.
"""
messages = []
for key in artifacts:
artifact = artifacts[key]
if any(key not in artifact for key in ["text", "sender", "sender_name", "session_id", "stream_url"]):
continue
try:
messages.append(
ChatOutputResponse(
message=artifacts[key]["text"],
sender=artifacts[key].get("sender"),
sender_name=artifacts[key].get("sender_name"),
session_id=artifacts[key].get("session_id"),
stream_url=artifacts[key].get("stream_url"),
files=[
{"path": file} if isinstance(file, str) else file
for file in artifacts[key].get("files", [])
],
component_id=self.id,
type=self.artifacts_type[key],
).model_dump(exclude_none=True)
)
except KeyError:
pass
return messages
def _finalize_build(self):
result_dict = self.get_built_result()
# We need to set the artifacts to pass information
# to the frontend
self.set_artifacts()
artifacts = self.artifacts_raw
messages = self.extract_messages_from_artifacts(artifacts)
result_dict = ResultData(
results=result_dict,
artifacts=artifacts,
logs=self.logs,
messages=messages,
component_display_name=self.display_name,
component_id=self.id,
)
self.set_result(result_dict)
class InterfaceVertex(ComponentVertex):
def __init__(self, data: Dict, graph):

View file

@ -23,6 +23,7 @@ from langflow.services.database.models.folder.model import Folder, FolderCreate
from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist
from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.deps import get_settings_service, get_storage_service, get_variable_service, session_scope
from langflow.template.field.prompt import DEFAULT_PROMPT_INTUT_TYPES
STARTER_FOLDER_NAME = "Starter Projects"
STARTER_FOLDER_DESCRIPTION = "Starter projects to help you get started in Langflow."
@ -72,6 +73,9 @@ def update_projects_components_with_latest_component_versions(project_data, all_
}
)
node_data["template"][key]["value"] = value["value"]
for key, value in node_data["template"].items():
if key not in latest_template:
node_data["template"][key]["input_types"] = DEFAULT_PROMPT_INTUT_TYPES
node_changes_log[node_data["display_name"]].append(
{
"attr": "_type",
@ -173,6 +177,7 @@ def update_new_output(data):
"types": types,
"selected": selected,
"name": " | ".join(types),
"display_name": " | ".join(types),
}
)
deduplicated_outputs = []
@ -557,7 +562,7 @@ async def create_or_update_starter_projects():
try:
Graph.from_payload(updated_project_data)
except Exception as e:
raise ValueError(f"Error loading graph from project {project_name}: {e}")
logger.error(e)
if updated_project_data != project_data:
project_data = updated_project_data
# We also need to update the project data in the file

View file

@ -122,10 +122,9 @@ async def build_component(
# Now set the params as attributes of the custom_component
custom_component.set_attributes(params)
build_results = await custom_component.build_results(vertex)
custom_repr = custom_component.custom_repr()
build_results, artifacts = await custom_component.build_results(vertex)
return custom_component, build_results, {"repr": custom_repr}
return custom_component, build_results, artifacts
async def build_custom_component(params: dict, custom_component: "CustomComponent"):

View file

@ -1,3 +1,4 @@
from collections import defaultdict
from typing import Literal
from typing_extensions import TypedDict
@ -15,3 +16,22 @@ class StreamURL(TypedDict):
class Log(TypedDict):
message: str | dict | StreamURL
type: str
def build_logs_from_artifacts(artifacts: dict) -> dict:
logs = defaultdict(list)
for key in artifacts:
message = artifacts[key]
if not isinstance(message, dict):
message = {"message": message}
if "stream_url" in message and "type" in message:
stream_url = StreamURL(location=message["stream_url"])
log = Log(message=stream_url, type=message["type"])
elif "type" in message:
log = Log(message=message, type=message["type"])
logs[key].append(log)
return logs

View file

@ -178,15 +178,15 @@ def build_clean_params(target: "Vertex") -> dict:
return params
def log_transaction(flow_id, vertex: "Vertex", status, target: Optional["Vertex"] = None, error=None):
def log_transaction(flow_id, source: "Vertex", status, target: Optional["Vertex"] = None, error=None):
try:
monitor_service = get_monitor_service()
clean_params = build_clean_params(vertex)
clean_params = build_clean_params(source)
data = {
"vertex_id": str(vertex.id),
"vertex_id": str(source.id),
"target_id": str(target.id) if target else None,
"inputs": clean_params,
"outputs": vertex.result.model_dump_json() if vertex.result else None,
"outputs": source.result.model_dump_json() if source.result else None,
"timestamp": monitor_service.get_timestamp(),
"status": status,
"error": error,

View file

@ -2,6 +2,8 @@ from typing import Optional
from langflow.template.field.base import Input
DEFAULT_PROMPT_INTUT_TYPES = ["Document", "Message", "Record", "Text"]
class DefaultPromptField(Input):
name: str
@ -10,5 +12,5 @@ class DefaultPromptField(Input):
advanced: bool = False
multiline: bool = True
input_types: list[str] = ["Document", "Message", "Record", "Text"]
input_types: list[str] = DEFAULT_PROMPT_INTUT_TYPES
value: str = "" # Set the value to empty string