Merge branch 'zustand/io/migration' of personal:logspace-ai/langflow into zustand/io/migration

This commit is contained in:
anovazzi1 2024-02-07 15:30:28 -03:00
commit 36ccba9b78
16 changed files with 238 additions and 36 deletions

View file

@ -13,10 +13,112 @@ if TYPE_CHECKING:
from langflow.services.socket.service import SocketIOService
# https://github.com/hwchase17/chat-langchain/blob/master/callback.py
class AsyncStreamingLLMCallbackHandler(AsyncCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, client_id: str = None):
self.chat_service = get_chat_service()
self.client_id = client_id
self.websocket = self.chat_service.active_connections[self.client_id]
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
await self.websocket.send_json(resp.model_dump())
async def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
"""Run when tool starts running."""
resp = ChatResponse(
message="",
type="stream",
intermediate_steps=f"Tool input: {input_str}",
)
await self.websocket.send_json(resp.model_dump())
async def on_tool_end(self, output: str, **kwargs: Any) -> Any:
"""Run when tool ends running."""
observation_prefix = kwargs.get("observation_prefix", "Tool output: ")
split_output = output.split()
first_word = split_output[0]
rest_of_output = split_output[1:]
# Create a formatted message.
intermediate_steps = f"{observation_prefix}{first_word}"
# Create a ChatResponse instance.
resp = ChatResponse(
message="",
type="stream",
intermediate_steps=intermediate_steps,
)
rest_of_resps = [
ChatResponse(
message="",
type="stream",
intermediate_steps=f"{word}",
)
for word in rest_of_output
]
resps = [resp] + rest_of_resps
# Try to send the response, handle potential errors.
try:
# This is to emulate the stream of tokens
for resp in resps:
await self.websocket.send_json(resp.model_dump())
except Exception as exc:
logger.error(f"Error sending response: {exc}")
async def on_tool_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run when tool errors."""
async def on_text(self, text: str, **kwargs: Any) -> Any:
"""Run on arbitrary text."""
# This runs when first sending the prompt
# to the LLM, adding it will send the final prompt
# to the frontend
if "Prompt after formatting" in text:
text = text.replace("Prompt after formatting:\n", "")
text = remove_ansi_escape_codes(text)
resp = PromptResponse(
prompt=text,
)
await self.websocket.send_json(resp.model_dump())
self.chat_service.chat_history.add_message(self.client_id, resp)
async def on_agent_action(self, action: AgentAction, **kwargs: Any):
log = f"Thought: {action.log}"
# if there are line breaks, split them and send them
# as separate messages
if "\n" in log:
logs = log.split("\n")
for log in logs:
resp = ChatResponse(message="", type="stream", intermediate_steps=log)
await self.websocket.send_json(resp.model_dump())
else:
resp = ChatResponse(message="", type="stream", intermediate_steps=log)
await self.websocket.send_json(resp.model_dump())
async def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
"""Run on agent end."""
resp = ChatResponse(
message="",
type="stream",
intermediate_steps=finish.log,
)
await self.websocket.send_json(resp.model_dump())
# https://github.com/hwchase17/chat-langchain/blob/master/callback.py
class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, session_id: str):
self.chat_service = get_chat_service()
self.client_id = session_id

View file

@ -84,3 +84,24 @@ def update_credential(
return db_credential
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@router.delete("/{credential_id}", response_model=CredentialRead, status_code=200)
def delete_credential(
*,
session: Session = Depends(get_session),
credential_id: UUID,
current_user: User = Depends(get_current_active_user),
):
"""Delete a credential."""
try:
db_credential = session.exec(
select(Credential).where(Credential.id == credential_id, Credential.user_id == current_user.id)
).first()
if not db_credential:
raise HTTPException(status_code=404, detail="Credential not found")
session.delete(db_credential)
session.commit()
return db_credential
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e

View file

@ -4,12 +4,12 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from pydantic import BaseModel, Field, field_validator
from langchain_core.documents import Document
from langflow.services.database.models.api_key.model import ApiKeyRead
from langflow.services.database.models.base import orjson_dumps
from langflow.services.database.models.flow import FlowCreate, FlowRead
from langflow.services.database.models.user import UserRead
from pydantic import BaseModel, Field, field_serializer, field_validator
class BuildStatus(Enum):
@ -221,13 +221,41 @@ class VerticesOrderResponse(BaseModel):
class ResultDict(BaseModel):
"""Outputs of the vertex build process."""
results: Optional[Any] = Field(default_factory=dict)
artifacts: Optional[Any] = Field(default_factory=dict)
timedelta: Optional[float] = None
duration: Optional[str] = None
def serialize_field(self, value):
"""Unified serialization function for handling both BaseModel and Document types,
including handling lists of these types."""
if isinstance(value, (list, tuple)):
return [self.serialize_field(v) for v in value]
elif isinstance(value, Document):
return value.to_json()
elif isinstance(value, BaseModel):
return value.model_dump()
elif isinstance(value, str):
return {"result": value}
return value
@field_serializer("results")
def serialize_results(self, value):
if isinstance(value, dict):
return {key: self.serialize_field(val) for key, val in value.items()}
return self.serialize_field(value)
def serialize_list_of_documents_or_base_models(value):
if isinstance(value, list):
for i, val in enumerate(value):
if isinstance(val, Document):
value[i] = val.to_json()
elif isinstance(val, BaseModel):
value[i] = val.model_dump()
return value
class VertexBuildResponse(BaseModel):
id: Optional[str] = None
@ -242,3 +270,4 @@ class VertexBuildResponse(BaseModel):
class VerticesBuiltResponse(BaseModel):
vertices: List[VertexBuildResponse]
vertices: List[VertexBuildResponse]

View file

@ -2,8 +2,9 @@ from typing import Callable, Optional, Union
from langchain.chains.combine_documents.base import BaseCombineDocumentsChain
from langchain.chains.retrieval_qa.base import BaseRetrievalQA, RetrievalQA
from langchain_core.documents import Document
from langflow import CustomComponent
from langflow.field_typing import BaseMemory, BaseRetriever
from langflow.field_typing import BaseMemory, BaseRetriever, Text
class RetrievalQAComponent(CustomComponent):
@ -18,18 +19,20 @@ class RetrievalQAComponent(CustomComponent):
"input_key": {"display_name": "Input Key", "advanced": True},
"output_key": {"display_name": "Output Key", "advanced": True},
"return_source_documents": {"display_name": "Return Source Documents"},
"inputs": {"display_name": "Input", "input_types": ["Text", "Document"]},
}
def build(
self,
combine_documents_chain: BaseCombineDocumentsChain,
retriever: BaseRetriever,
inputs: str = "",
memory: Optional[BaseMemory] = None,
input_key: str = "query",
output_key: str = "result",
return_source_documents: bool = True,
) -> Union[BaseRetrievalQA, Callable]:
return RetrievalQA(
) -> Union[BaseRetrievalQA, Callable, Text]:
runnable = RetrievalQA(
combine_documents_chain=combine_documents_chain,
retriever=retriever,
memory=memory,
@ -37,3 +40,8 @@ class RetrievalQAComponent(CustomComponent):
output_key=output_key,
return_source_documents=return_source_documents,
)
if isinstance(inputs, Document):
inputs = inputs.page_content
result = runnable.invoke({input_key: inputs})
return result.content if hasattr(result, "content") else result

View file

@ -237,8 +237,12 @@ class Graph:
edges.append(ContractEdge(source, target, edge))
return edges
def _get_vertex_class(self, node_type: str, node_lc_type: str, node_id: str) -> Type[Vertex]:
def _get_vertex_class(self, node_type: str, node_base_type: str, node_id: str) -> Type[Vertex]:
"""Returns the node class based on the node type."""
# First we check for the node_base_type
if node_base_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP:
return lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_base_type]
node_name = node_id.split("-")[0]
if node_name in lazy_load_vertex_dict.VERTEX_TYPE_MAP:
return lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_name]
@ -248,8 +252,8 @@ class Graph:
if node_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP:
return lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_type]
return (
lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_lc_type]
if node_lc_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP
lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_base_type]
if node_base_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP
else Vertex
)
@ -312,3 +316,4 @@ class Graph:
return layers
return layers
return layers
return layers

View file

@ -303,7 +303,7 @@ class Vertex:
if isinstance(self._built_object, str):
self._built_result = self._built_object
result = generate_result(self._built_object, inputs, self.has_external_output, session_id)
result = await generate_result(self._built_object, inputs, self.has_external_output, session_id)
self._built_result = result
async def _build_each_node_in_params_dict(self, user_id=None):
@ -509,3 +509,4 @@ class StatelessVertex(Vertex):
pass
pass
pass
pass

View file

@ -14,7 +14,7 @@ class AgentVertex(StatelessVertex):
self.tools: List[Union[ToolkitVertex, ToolVertex]] = []
self.chains: List[ChainVertex] = []
self.steps: List[Callable] = [self._custom_build, self._run]
self.steps: List[Callable] = [self._custom_build]
def __getstate__(self):
state = super().__getstate__()
@ -201,7 +201,7 @@ class TextSplitterVertex(StatefulVertex):
class ChainVertex(StatelessVertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="chains")
self.steps = [self._custom_build, self._run]
self.steps = [self._custom_build]
async def _custom_build(self, *args, **kwargs):
force = kwargs.get("force", False)

View file

@ -11,7 +11,7 @@ def is_basic_type(obj):
async def invoke_lc_runnable(
langchain_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None, **kwargs
built_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None, **kwargs
) -> Union[str, BaseMessage]:
# Setup callbacks for asynchronous execution
from langflow.processing.base import setup_callbacks
@ -19,15 +19,15 @@ async def invoke_lc_runnable(
callbacks = setup_callbacks(sync=False, trace_id=session_id, **kwargs)
try:
if has_external_output and hasattr(langchain_object, "astream"):
if has_external_output and hasattr(built_object, "astream"):
# Asynchronous stream handling if supported and required
output = ""
async for chunk in langchain_object.astream(inputs, {"callbacks": callbacks}):
async for chunk in built_object.astream(inputs, {"callbacks": callbacks}):
output += chunk
return output
else:
# Direct asynchronous invocation
return await langchain_object.ainvoke(inputs, {"callbacks": callbacks})
return await built_object.ainvoke(inputs, {"callbacks": callbacks})
except Exception as async_exc:
logger.debug(f"Async error, falling back to sync: {str(async_exc)}")
@ -35,15 +35,15 @@ async def invoke_lc_runnable(
sync_callbacks = setup_callbacks(sync=True, trace_id=session_id, **kwargs)
try:
# Synchronous fallback if asynchronous execution fails
if has_external_output and hasattr(langchain_object, "stream"):
if has_external_output and hasattr(built_object, "stream"):
# Synchronous stream handling if supported and required
output = ""
for chunk in langchain_object.stream(inputs, {"callbacks": sync_callbacks}):
for chunk in built_object.stream(inputs, {"callbacks": sync_callbacks}):
output += chunk
return output
else:
# Direct synchronous invocation
return langchain_object.invoke(inputs, {"callbacks": sync_callbacks})
return built_object.invoke(inputs, {"callbacks": sync_callbacks})
except Exception as sync_exc:
logger.error(f"Sync error after async failure: {str(sync_exc)}")
# Handle or re-raise exception as appropriate for your application

View file

@ -256,7 +256,7 @@ class DirectoryReader:
output_types = [component_name_camelcase]
component_info = {
"name": "CustomComponent",
"name": component_name_camelcase,
"output_types": output_types,
"file": filename,
"code": result_content if validation_result else "",

View file

@ -124,12 +124,13 @@ def get_new_key(dictionary, original_key):
def determine_component_name(component):
"""Determine the name of the component."""
component_output_types = component["output_types"]
if len(component_output_types) == 1:
return component_output_types[0]
else:
file_name = component.get("file").split(".")[0]
return "".join(word.capitalize() for word in file_name.split("_")) if "_" in file_name else file_name
# component_output_types = component["output_types"]
# if len(component_output_types) == 1:
# return component_output_types[0]
# else:
# file_name = component.get("file").split(".")[0]
# return "".join(word.capitalize() for word in file_name.split("_")) if "_" in file_name else file_name
return component["name"]
def build_menu_items(menu_item):
@ -143,3 +144,4 @@ def build_menu_items(menu_item):
logger.error(f"Error loading Component: {component['output_types']}")
logger.exception(f"Error while building custom component {component['output_types']}: {exc}")
return menu_items
return menu_items

View file

@ -366,7 +366,7 @@ def sanitize_field_config(field_config: Dict):
def build_component(component):
"""Build a single component."""
logger.debug(f"Building component: {component.get('name'), component.get('output_types')}")
component_name = determine_component_name(component)
component_template = create_component_template(component)
logger.debug(f"Building component: {component_name, component.get('output_types')}")
return component_name, component_template

View file

@ -2,7 +2,7 @@ import json
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, field_serializer, validator
class TransactionModel(BaseModel):
@ -60,6 +60,13 @@ class VertexBuildModel(BaseModel):
from_attributes = True
populate_by_name = True
@field_serializer("data", "artifacts")
def serialize_dict(v):
if isinstance(v, dict):
# map_dict = to_map(v)
return json.dumps(v)
return v
@validator("params", pre=True)
def validate_params(cls, v):
if isinstance(v, str):
@ -82,6 +89,24 @@ class VertexBuildModel(BaseModel):
return v
# create a function that turns dicts into a
# dict like this:
# my_map_dict = {
# "key": [
# 1, 2, 3
# ],
# "value": [
# "one", "two", "three"
# ]
# }
# so map has a "key" and a "value" list
# containing the keys and values of the dict
def to_map(value: dict):
keys = list(value.keys())
values = list(value.values())
return {"key": keys, "value": values}
class VertexBuildMapModel(BaseModel):
vertex_builds: dict[str, list[VertexBuildModel]]

View file

@ -29,7 +29,7 @@ class MonitorService(Service):
try:
self.ensure_tables_exist()
except Exception as e:
logger.error(f"Error initializing monitor service: {e}")
logger.exception(f"Error initializing monitor service: {e}")
def to_df(self, table_name):
return self.load_table_as_dataframe(table_name)
@ -145,3 +145,4 @@ class MonitorService(Service):
df = conn.execute(query).df()
return df.to_dict(orient="records")
return df.to_dict(orient="records")

View file

@ -1,11 +1,10 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Type
import duckdb
from langflow.services.deps import get_monitor_service
from loguru import logger
from pydantic import BaseModel
from langflow.services.deps import get_monitor_service
if TYPE_CHECKING:
from langflow.api.v1.schemas import ResultDict
@ -35,7 +34,7 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict:
elif field_info.__name__ == "bool":
sql_type = "BOOLEAN"
elif field_info.__name__ == "dict":
sql_type = "JSON"
sql_type = "VARCHAR"
elif field_info.__name__ == "Any":
sql_type = "VARCHAR"
else:
@ -91,7 +90,14 @@ def add_row_to_table(
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values_placeholders})"
# Execute the insert statement
conn.execute(insert_sql, values)
try:
conn.execute(insert_sql, values)
except Exception as e:
# Log values types
for key, value in validated_dict.items():
logger.error(f"{key}: {type(value)}")
logger.error(f"Error adding row to table: {e}")
async def log_message(
@ -131,6 +137,7 @@ async def log_vertex_build(
):
try:
monitor_service = get_monitor_service()
row = {
"flow_id": flow_id,
"id": vertex_id,

View file

@ -366,7 +366,7 @@ export default function GenericNode({
) : (
<div className="max-h-96 overflow-auto">
{typeof validationStatus.params === "string"
? `Duration: ${validationStatus.duration}\n${validationStatus.params}`
? `Duration: ${validationStatus.data.duration}\n${validationStatus.params}`
.split("\n")
.map((line, index) => (
<div key={index}>{line}</div>

View file

@ -620,6 +620,7 @@ export type crashComponentPropsType = {
export type validationStatusType = {
id: string;
data: object;
params: string;
progress: number;
valid: boolean;