Merge remote-tracking branch 'origin/dev' into v2

This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-10-25 12:43:44 -03:00
commit 3983d0cdb1
159 changed files with 12323 additions and 3611 deletions

View file

@ -52,6 +52,20 @@ def display_results(results):
console.print() # Print a new line
def set_var_for_macos_issue():
# OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
# we need to set this var is we are running on MacOS
# otherwise we get an error when running gunicorn
if platform.system() in ["Darwin"]:
import os
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
# https://stackoverflow.com/questions/75747888/uwsgi-segmentation-fault-with-flask-python-app-behind-nginx-after-running-for-2 # noqa
os.environ["no_proxy"] = "*" # to avoid error with gunicorn
logger.debug("Set OBJC_DISABLE_INITIALIZE_FORK_SAFETY to YES to avoid error")
def update_settings(
config: str,
cache: Optional[str] = None,
@ -84,7 +98,7 @@ def run(
"127.0.0.1", help="Host to bind the server to.", envvar="LANGFLOW_HOST"
),
workers: int = typer.Option(
2, help="Number of worker processes.", envvar="LANGFLOW_WORKERS"
1, help="Number of worker processes.", envvar="LANGFLOW_WORKERS"
),
timeout: int = typer.Option(300, help="Worker timeout in seconds."),
port: int = typer.Option(7860, help="Port to listen on.", envvar="LANGFLOW_PORT"),
@ -143,7 +157,10 @@ def run(
"""
Run the Langflow.
"""
set_var_for_macos_issue()
# override env variables with .env file
if env_file:
load_dotenv(env_file, override=True)
@ -165,7 +182,6 @@ def run(
options = {
"bind": f"{host}:{port}",
"workers": get_number_of_workers(workers),
"worker_class": "uvicorn.workers.UvicornWorker",
"timeout": timeout,
}

View file

@ -0,0 +1,85 @@
"""Change columns to be nullable
Revision ID: eb5866d51fd2
Revises: 67cc006d50bf
Create Date: 2023-10-04 10:18:25.640458
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy import exc
import sqlmodel # noqa: F401
# revision identifiers, used by Alembic.
revision: str = "eb5866d51fd2"
down_revision: Union[str, None] = "67cc006d50bf"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
connection = op.get_bind()
try:
op.drop_table("flowstyle")
with op.batch_alter_table("component", schema=None) as batch_op:
batch_op.drop_index("ix_component_frontend_node_id")
batch_op.drop_index("ix_component_name")
except exc.SQLAlchemyError:
connection.execute("ROLLBACK")
except Exception:
pass
try:
op.drop_table("component")
except exc.SQLAlchemyError:
connection.execute("ROLLBACK")
except Exception:
pass
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
op.create_table(
"component",
sa.Column("id", sa.CHAR(length=32), nullable=False),
sa.Column("frontend_node_id", sa.CHAR(length=32), nullable=False),
sa.Column("name", sa.VARCHAR(), nullable=False),
sa.Column("description", sa.VARCHAR(), nullable=True),
sa.Column("python_code", sa.VARCHAR(), nullable=True),
sa.Column("return_type", sa.VARCHAR(), nullable=True),
sa.Column("is_disabled", sa.BOOLEAN(), nullable=False),
sa.Column("is_read_only", sa.BOOLEAN(), nullable=False),
sa.Column("create_at", sa.DATETIME(), nullable=False),
sa.Column("update_at", sa.DATETIME(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
with op.batch_alter_table("component", schema=None) as batch_op:
batch_op.create_index("ix_component_name", ["name"], unique=False)
batch_op.create_index(
"ix_component_frontend_node_id", ["frontend_node_id"], unique=False
)
except Exception:
pass
try:
op.create_table(
"flowstyle",
sa.Column("color", sa.VARCHAR(), nullable=False),
sa.Column("emoji", sa.VARCHAR(), nullable=False),
sa.Column("flow_id", sa.CHAR(length=32), nullable=True),
sa.Column("id", sa.CHAR(length=32), nullable=False),
sa.ForeignKeyConstraint(
["flow_id"],
["flow.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("id"),
)
except Exception:
pass
# ### end Alembic commands ###

View file

@ -1,15 +1,17 @@
import asyncio
from uuid import UUID
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from langflow.api.v1.schemas import ChatResponse
from langflow.api.v1.schemas import ChatResponse, PromptResponse
from typing import Any, Dict, List, Union
from fastapi import WebSocket
from typing import Any, Dict, List, Optional
from langflow.services.getters import get_chat_service
from langchain.schema import AgentAction, LLMResult, AgentFinish
from langflow.utils.util import remove_ansi_escape_codes
from langchain.schema import AgentAction, AgentFinish
from loguru import logger
@ -17,39 +19,15 @@ from loguru import logger
class AsyncStreamingLLMCallbackHandler(AsyncCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, websocket: WebSocket):
self.websocket = websocket
def __init__(self, client_id: str):
self.chat_service = get_chat_service()
self.client_id = client_id
self.websocket = self.chat_service.active_connections[self.client_id]
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
await self.websocket.send_json(resp.dict())
async def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> Any:
"""Run when LLM starts running."""
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
"""Run when LLM ends running."""
async def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when LLM errors."""
async def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> Any:
"""Run when chain starts running."""
async def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
"""Run when chain ends running."""
async def on_chain_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when chain errors."""
async def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> Any:
@ -95,8 +73,14 @@ class AsyncStreamingLLMCallbackHandler(AsyncCallbackHandler):
logger.error(f"Error sending response: {exc}")
async def on_tool_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run when tool errors."""
async def on_text(self, text: str, **kwargs: Any) -> Any:
@ -104,6 +88,14 @@ class AsyncStreamingLLMCallbackHandler(AsyncCallbackHandler):
# This runs when first sending the prompt
# to the LLM, adding it will send the final prompt
# to the frontend
if "Prompt after formatting" in text:
text = text.replace("Prompt after formatting:\n", "")
text = remove_ansi_escape_codes(text)
resp = PromptResponse(
prompt=text,
)
await self.websocket.send_json(resp.dict())
self.chat_service.chat_history.add_message(self.client_id, resp)
async def on_agent_action(self, action: AgentAction, **kwargs: Any):
log = f"Thought: {action.log}"
@ -131,8 +123,10 @@ class AsyncStreamingLLMCallbackHandler(AsyncCallbackHandler):
class StreamingLLMCallbackHandler(BaseCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, websocket):
self.websocket = websocket
def __init__(self, client_id: str):
self.chat_service = get_chat_service()
self.client_id = client_id
self.websocket = self.chat_service.active_connections[self.client_id]
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
resp = ChatResponse(message=token, type="stream", intermediate_steps="")

View file

@ -187,14 +187,18 @@ async def stream_build(
valid = False
update_build_status(cache_service, flow_id, BuildStatus.FAILURE)
response = {
"valid": valid,
"params": params,
"id": vertex.id,
"progress": round(i / number_of_nodes, 2),
}
vertex_id = (
vertex.parent_node_id if vertex.parent_is_top_level else vertex.id
)
if vertex_id in graph.top_level_nodes:
response = {
"valid": valid,
"params": params,
"id": vertex_id,
"progress": round(i / number_of_nodes, 2),
}
yield str(StreamData(event="message", data=response))
yield str(StreamData(event="message", data=response))
langchain_object = graph.build()
# Now we need to check the input_keys to send them to the client
@ -239,8 +243,7 @@ def try_running_celery_task(vertex):
task = build_vertex.delay(vertex)
vertex.task_id = task.id
except Exception as exc:
logger.exception(exc)
logger.error("Error running task in celery, running locally")
logger.debug(f"Error running task in celery: {exc}")
vertex.task_id = None
vertex.build()
return vertex

View file

@ -69,7 +69,7 @@ def get_all(
"/process/{flow_id}",
response_model=ProcessResponse,
)
async def process_flow(
async def process(
session: Annotated[Session, Depends(get_session)],
flow_id: str,
inputs: Optional[dict] = None,
@ -156,7 +156,7 @@ async def process_flow(
result=task_result,
task=task_response,
session_id=session_id,
backend=str(type(task_service.backend)),
backend=task_service.backend_name,
)
except sa.exc.StatementError as exc:
# StatementError('(builtins.ValueError) badly formed hexadecimal UUID string')

View file

@ -78,6 +78,7 @@ class ChatMessage(BaseModel):
is_bot: bool = False
message: Union[str, None, dict] = None
chatKey: Optional[str] = None
type: str = "human"
@ -85,6 +86,7 @@ class ChatResponse(ChatMessage):
"""Chat response schema."""
intermediate_steps: str
type: str
is_bot: bool = True
files: list = []
@ -97,6 +99,14 @@ class ChatResponse(ChatMessage):
return v
class PromptResponse(ChatMessage):
"""Prompt response schema."""
prompt: str
type: str = "prompt"
is_bot: bool = True
class FileResponse(ChatMessage):
"""File response schema."""

View file

@ -79,4 +79,5 @@ class ConversationalAgent(CustomComponent):
memory=memory,
verbose=True,
return_intermediate_steps=True,
handle_parsing_errors=True,
)

View file

@ -1,7 +1,7 @@
from langflow import CustomComponent
from langchain.llms.base import BaseLLM
from langchain import PromptTemplate
from langchain.prompts import PromptTemplate
from langchain.schema import Document

View file

@ -0,0 +1,92 @@
from typing import Optional
from langflow import CustomComponent
from langchain.chat_models.baidu_qianfan_endpoint import QianfanChatEndpoint
from langchain.llms.base import BaseLLM
class QianfanChatEndpointComponent(CustomComponent):
display_name: str = "QianfanChatEndpoint"
description: str = (
"Baidu Qianfan chat models. Get more detail from "
"https://python.langchain.com/docs/integrations/chat/baidu_qianfan_endpoint."
)
def build_config(self):
return {
"model": {
"display_name": "Model Name",
"options": [
"ERNIE-Bot",
"ERNIE-Bot-turbo",
"BLOOMZ-7B",
"Llama-2-7b-chat",
"Llama-2-13b-chat",
"Llama-2-70b-chat",
"Qianfan-BLOOMZ-7B-compressed",
"Qianfan-Chinese-Llama-2-7B",
"ChatGLM2-6B-32K",
"AquilaChat-7B",
],
"info": "https://python.langchain.com/docs/integrations/chat/baidu_qianfan_endpoint",
"required": True,
},
"qianfan_ak": {
"display_name": "Qianfan Ak",
"required": True,
"password": True,
"info": "which you could get from https://cloud.baidu.com/product/wenxinworkshop",
},
"qianfan_sk": {
"display_name": "Qianfan Sk",
"required": True,
"password": True,
"info": "which you could get from https://cloud.baidu.com/product/wenxinworkshop",
},
"top_p": {
"display_name": "Top p",
"field_type": "float",
"info": "Model params, only supported in ERNIE-Bot and ERNIE-Bot-turbo",
"value": 0.8,
},
"temperature": {
"display_name": "Temperature",
"field_type": "float",
"info": "Model params, only supported in ERNIE-Bot and ERNIE-Bot-turbo",
"value": 0.95,
},
"penalty_score": {
"display_name": "Penalty Score",
"field_type": "float",
"info": "Model params, only supported in ERNIE-Bot and ERNIE-Bot-turbo",
"value": 1.0,
},
"endpoint": {
"display_name": "Endpoint",
"info": "Endpoint of the Qianfan LLM, required if custom model used.",
},
"code": {"show": False},
}
def build(
self,
model: str = "ERNIE-Bot-turbo",
qianfan_ak: Optional[str] = None,
qianfan_sk: Optional[str] = None,
top_p: Optional[float] = None,
temperature: Optional[float] = None,
penalty_score: Optional[float] = None,
endpoint: Optional[str] = None,
) -> BaseLLM:
try:
output = QianfanChatEndpoint( # type: ignore
model=model,
qianfan_ak=qianfan_ak,
qianfan_sk=qianfan_sk,
top_p=top_p,
temperature=temperature,
penalty_score=penalty_score,
endpoint=endpoint,
)
except Exception as e:
raise ValueError("Could not connect to Baidu Qianfan API.") from e
return output # type: ignore

View file

@ -0,0 +1,92 @@
from typing import Optional
from langflow import CustomComponent
from langchain.llms.baidu_qianfan_endpoint import QianfanLLMEndpoint
from langchain.llms.base import BaseLLM
class QianfanLLMEndpointComponent(CustomComponent):
display_name: str = "QianfanLLMEndpoint"
description: str = (
"Baidu Qianfan hosted open source or customized models. "
"Get more detail from https://python.langchain.com/docs/integrations/chat/baidu_qianfan_endpoint"
)
def build_config(self):
return {
"model": {
"display_name": "Model Name",
"options": [
"ERNIE-Bot",
"ERNIE-Bot-turbo",
"BLOOMZ-7B",
"Llama-2-7b-chat",
"Llama-2-13b-chat",
"Llama-2-70b-chat",
"Qianfan-BLOOMZ-7B-compressed",
"Qianfan-Chinese-Llama-2-7B",
"ChatGLM2-6B-32K",
"AquilaChat-7B",
],
"info": "https://python.langchain.com/docs/integrations/chat/baidu_qianfan_endpoint",
"required": True,
},
"qianfan_ak": {
"display_name": "Qianfan Ak",
"required": True,
"password": True,
"info": "which you could get from https://cloud.baidu.com/product/wenxinworkshop",
},
"qianfan_sk": {
"display_name": "Qianfan Sk",
"required": True,
"password": True,
"info": "which you could get from https://cloud.baidu.com/product/wenxinworkshop",
},
"top_p": {
"display_name": "Top p",
"field_type": "float",
"info": "Model params, only supported in ERNIE-Bot and ERNIE-Bot-turbo",
"value": 0.8,
},
"temperature": {
"display_name": "Temperature",
"field_type": "float",
"info": "Model params, only supported in ERNIE-Bot and ERNIE-Bot-turbo",
"value": 0.95,
},
"penalty_score": {
"display_name": "Penalty Score",
"field_type": "float",
"info": "Model params, only supported in ERNIE-Bot and ERNIE-Bot-turbo",
"value": 1.0,
},
"endpoint": {
"display_name": "Endpoint",
"info": "Endpoint of the Qianfan LLM, required if custom model used.",
},
"code": {"show": False},
}
def build(
self,
model: str = "ERNIE-Bot-turbo",
qianfan_ak: Optional[str] = None,
qianfan_sk: Optional[str] = None,
top_p: Optional[float] = None,
temperature: Optional[float] = None,
penalty_score: Optional[float] = None,
endpoint: Optional[str] = None,
) -> BaseLLM:
try:
output = QianfanLLMEndpoint( # type: ignore
model=model,
qianfan_ak=qianfan_ak,
qianfan_sk=qianfan_sk,
top_p=top_p,
temperature=temperature,
penalty_score=penalty_score,
endpoint=endpoint,
)
except Exception as e:
raise ValueError("Could not connect to Baidu Qianfan API.") from e
return output # type: ignore

View file

@ -1,6 +1,6 @@
from typing import Optional
from langflow import CustomComponent
from langchain.llms import HuggingFaceEndpoint
from langchain.llms.huggingface_endpoint import HuggingFaceEndpoint
from langchain.llms.base import BaseLLM
@ -13,7 +13,6 @@ class HuggingFaceEndpointsComponent(CustomComponent):
"endpoint_url": {"display_name": "Endpoint URL", "password": True},
"task": {
"display_name": "Task",
"type": "select",
"options": ["text2text-generation", "text-generation", "summarization"],
},
"huggingfacehub_api_token": {"display_name": "API token", "password": True},
@ -27,7 +26,7 @@ class HuggingFaceEndpointsComponent(CustomComponent):
def build(
self,
endpoint_url: str,
task="text2text-generation",
task: str = "text2text-generation",
huggingfacehub_api_token: Optional[str] = None,
model_kwargs: Optional[dict] = None,
) -> BaseLLM:
@ -36,6 +35,7 @@ class HuggingFaceEndpointsComponent(CustomComponent):
endpoint_url=endpoint_url,
task=task,
huggingfacehub_api_token=huggingfacehub_api_token,
model_kwargs=model_kwargs,
)
except Exception as e:
raise ValueError("Could not connect to HuggingFace Endpoints API.") from e

View file

@ -5,7 +5,6 @@ from langchain.vectorstores import Vectara
from langchain.schema import Document
from langchain.vectorstores.base import VectorStore
from langchain.schema import BaseRetriever
from langchain.embeddings.base import Embeddings
class VectaraComponent(CustomComponent):
@ -22,7 +21,6 @@ class VectaraComponent(CustomComponent):
"vectara_api_key": {"display_name": "Vectara API Key", "password": True},
"code": {"show": False},
"documents": {"display_name": "Documents"},
"embedding": {"display_name": "Embedding"},
}
def build(
@ -30,21 +28,21 @@ class VectaraComponent(CustomComponent):
vectara_customer_id: str,
vectara_corpus_id: str,
vectara_api_key: str,
embedding: Optional[Embeddings] = None,
documents: Optional[Document] = None,
) -> Union[VectorStore, BaseRetriever]:
# If documents, then we need to create a Vectara instance using .from_documents
if documents is not None and embedding is not None:
if documents is not None:
return Vectara.from_documents(
documents=documents, # type: ignore
vectara_customer_id=vectara_customer_id,
vectara_corpus_id=vectara_corpus_id,
vectara_api_key=vectara_api_key,
embedding=embedding,
source="langflow",
)
return Vectara(
vectara_customer_id=vectara_customer_id,
vectara_corpus_id=vectara_corpus_id,
vectara_api_key=vectara_api_key,
source="langflow",
)

View file

@ -1,3 +1,53 @@
from .base import NestedDict
# LANGCHAIN_BASE_TYPES = {
# "Chain": Chain,
# "AgentExecutor": AgentExecutor,
# "Tool": Tool,
# "BaseLLM": BaseLLM,
# "PromptTemplate": PromptTemplate,
# "BaseLoader": BaseLoader,
# "Document": Document,
# "TextSplitter": TextSplitter,
# "VectorStore": VectorStore,
# "Embeddings": Embeddings,
# "BaseRetriever": BaseRetriever,
# "BaseOutputParser": BaseOutputParser,
# "BaseMemory": BaseMemory,
# "BaseChatMemory": BaseChatMemory,
# }
from .constants import (
Tool,
PromptTemplate,
Chain,
BaseChatMemory,
BaseLLM,
BaseLoader,
BaseMemory,
BaseOutputParser,
BaseRetriever,
VectorStore,
Embeddings,
TextSplitter,
Document,
AgentExecutor,
NestedDict,
Data,
)
__all__ = ["NestedDict"]
__all__ = [
"NestedDict",
"Data",
"Tool",
"PromptTemplate",
"Chain",
"BaseChatMemory",
"BaseLLM",
"BaseLoader",
"BaseMemory",
"BaseOutputParser",
"BaseRetriever",
"VectorStore",
"Embeddings",
"TextSplitter",
"Document",
"AgentExecutor",
]

View file

@ -1,4 +0,0 @@
from typing import Union, Dict
# Type alias for more complex dicts
NestedDict = Dict[str, Union[str, Dict]]

View file

@ -0,0 +1,50 @@
from langchain.agents.agent import AgentExecutor
from langchain.chains.base import Chain
from langchain.document_loaders.base import BaseLoader
from langchain.llms.base import BaseLLM
from langchain.memory.chat_memory import BaseChatMemory
from langchain.prompts import PromptTemplate
from langchain.schema import BaseOutputParser, BaseRetriever, Document
from langchain.schema.embeddings import Embeddings
from langchain.schema.memory import BaseMemory
from langchain.text_splitter import TextSplitter
from langchain.tools import Tool
from langchain.vectorstores.base import VectorStore
from typing import Union, Dict
# Type alias for more complex dicts
NestedDict = Dict[str, Union[str, Dict]]
class Data:
pass
LANGCHAIN_BASE_TYPES = {
"Chain": Chain,
"AgentExecutor": AgentExecutor,
"Tool": Tool,
"BaseLLM": BaseLLM,
"PromptTemplate": PromptTemplate,
"BaseLoader": BaseLoader,
"Document": Document,
"TextSplitter": TextSplitter,
"VectorStore": VectorStore,
"Embeddings": Embeddings,
"BaseRetriever": BaseRetriever,
"BaseOutputParser": BaseOutputParser,
"BaseMemory": BaseMemory,
"BaseChatMemory": BaseChatMemory,
}
# Langchain base types plus Python base types
CUSTOM_COMPONENT_SUPPORTED_TYPES = {
**LANGCHAIN_BASE_TYPES,
"str": str,
"int": int,
"float": float,
"bool": bool,
"list": list,
"dict": dict,
"NestedDict": NestedDict,
"Data": Data,
}

View file

@ -1,28 +1,79 @@
from loguru import logger
from typing import TYPE_CHECKING
from pydantic import BaseModel, Field
from typing import List, Optional
if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
class SourceHandle(BaseModel):
baseClasses: List[str] = Field(
..., description="List of base classes for the source handle."
)
dataType: str = Field(..., description="Data type for the source handle.")
id: str = Field(..., description="Unique identifier for the source handle.")
class TargetHandle(BaseModel):
fieldName: str = Field(..., description="Field name for the target handle.")
id: str = Field(..., description="Unique identifier for the target handle.")
inputTypes: Optional[List[str]] = Field(
None, description="List of input types for the target handle."
)
type: str = Field(..., description="Type of the target handle.")
class Edge:
def __init__(self, source: "Vertex", target: "Vertex", edge: dict):
self.source: "Vertex" = source
self.target: "Vertex" = target
self.source_handle = edge.get("sourceHandle", "")
self.target_handle = edge.get("targetHandle", "")
# 'BaseLoader;BaseOutputParser|documents|PromptTemplate-zmTlD'
# target_param is documents
self.target_param = self.target_handle.split("|")[1]
if data := edge.get("data", {}):
self._source_handle = data.get("sourceHandle", {})
self._target_handle = data.get("targetHandle", {})
self.source_handle: SourceHandle = SourceHandle(**self._source_handle)
self.target_handle: TargetHandle = TargetHandle(**self._target_handle)
self.target_param = self.target_handle.fieldName
# validate handles
self.validate_handles()
else:
# Logging here because this is a breaking change
logger.error("Edge data is empty")
self._source_handle = edge.get("sourceHandle", "")
self._target_handle = edge.get("targetHandle", "")
# 'BaseLoader;BaseOutputParser|documents|PromptTemplate-zmTlD'
# target_param is documents
self.target_param = self._target_handle.split("|")[1]
# Validate in __init__ to fail fast
self.validate_edge()
def validate_handles(self) -> None:
if self.target_handle.inputTypes is None:
self.valid_handles = (
self.target_handle.type in self.source_handle.baseClasses
)
else:
self.valid_handles = (
any(
baseClass in self.target_handle.inputTypes
for baseClass in self.source_handle.baseClasses
)
or self.target_handle.type in self.source_handle.baseClasses
)
if not self.valid_handles:
logger.debug(self.source_handle)
logger.debug(self.target_handle)
raise ValueError(
f"Edge between {self.source.vertex_type} and {self.target.vertex_type} "
f"has invalid handles"
)
def __setstate__(self, state):
self.source = state["source"]
self.target = state["target"]
self.target_param = state["target_param"]
self.source_handle = state["source_handle"]
self.target_handle = state["target_handle"]
self.source_handle = state.get("source_handle")
self.target_handle = state.get("target_handle")
def reset(self) -> None:
self.source._build_params()

View file

@ -2,6 +2,7 @@ from typing import Dict, Generator, List, Type, Union
from langflow.graph.edge.base import Edge
from langflow.graph.graph.constants import lazy_load_vertex_dict
from langflow.graph.graph.utils import process_flow
from langflow.graph.vertex.base import Vertex
from langflow.graph.vertex.types import (
FileToolVertex,
@ -19,11 +20,21 @@ class Graph:
def __init__(
self,
nodes: List[Dict[str, Union[str, Dict[str, Union[str, List[str]]]]]],
nodes: List[Dict],
edges: List[Dict[str, str]],
) -> None:
self._nodes = nodes
self._edges = edges
self.raw_graph_data = {"nodes": nodes, "edges": edges}
self.top_level_nodes = []
for node in self._nodes:
if node_id := node.get("id"):
self.top_level_nodes.append(node_id)
self._graph_data = process_flow(self.raw_graph_data)
self._nodes = self._graph_data["nodes"]
self._edges = self._graph_data["edges"]
self._build_graph()
def __setstate__(self, state):
@ -50,6 +61,7 @@ class Graph:
edges = payload["edges"]
return cls(nodes, edges)
except KeyError as exc:
logger.exception(exc)
raise ValueError(
f"Invalid payload. Expected keys 'nodes' and 'edges'. Found {list(payload.keys())}"
) from exc
@ -215,7 +227,9 @@ class Graph:
node_lc_type: str = node_data["node"]["template"]["_type"] # type: ignore
VertexClass = self._get_vertex_class(node_type, node_lc_type)
nodes.append(VertexClass(node))
vertex = VertexClass(node)
vertex.set_top_level(self.top_level_nodes)
nodes.append(vertex)
return nodes

View file

@ -0,0 +1,230 @@
from collections import deque
import copy
def find_last_node(nodes, edges):
"""
This function receives a flow and returns the last node.
"""
return next((n for n in nodes if all(e["source"] != n["id"] for e in edges)), None)
def add_parent_node_id(nodes, parent_node_id):
"""
This function receives a list of nodes and adds a parent_node_id to each node.
"""
for node in nodes:
node["parent_node_id"] = parent_node_id
def ungroup_node(group_node_data, base_flow):
template, flow = (
group_node_data["node"]["template"],
group_node_data["node"]["flow"],
)
parent_node_id = group_node_data["id"]
g_nodes = flow["data"]["nodes"]
add_parent_node_id(g_nodes, parent_node_id)
g_edges = flow["data"]["edges"]
# Redirect edges to the correct proxy node
updated_edges = get_updated_edges(
base_flow, g_nodes, g_edges, group_node_data["id"]
)
# Update template values
update_template(template, g_nodes)
nodes = [
n for n in base_flow["nodes"] if n["id"] != group_node_data["id"]
] + g_nodes
edges = (
[
e
for e in base_flow["edges"]
if e["target"] != group_node_data["id"]
and e["source"] != group_node_data["id"]
]
+ g_edges
+ updated_edges
)
base_flow["nodes"] = nodes
base_flow["edges"] = edges
return nodes
def process_flow(flow_object):
cloned_flow = copy.deepcopy(flow_object)
processed_nodes = set() # To keep track of processed nodes
def process_node(node):
node_id = node.get("id")
# If node already processed, skip
if node_id in processed_nodes:
return
if (
node.get("data")
and node["data"].get("node")
and node["data"]["node"].get("flow")
):
process_flow(node["data"]["node"]["flow"]["data"])
new_nodes = ungroup_node(node["data"], cloned_flow)
# Add new nodes to the queue for future processing
nodes_to_process.extend(new_nodes)
# Mark node as processed
processed_nodes.add(node_id)
nodes_to_process = deque(cloned_flow["nodes"])
while nodes_to_process:
node = nodes_to_process.popleft()
process_node(node)
return cloned_flow
def update_template(template, g_nodes):
"""
Updates the template of a node in a graph with the given template.
Args:
template (dict): The new template to update the node with.
g_nodes (list): The list of nodes in the graph.
Returns:
None
"""
for _, value in template.items():
if not value.get("proxy"):
continue
proxy_dict = value["proxy"]
field, id_ = proxy_dict["field"], proxy_dict["id"]
node_index = next((i for i, n in enumerate(g_nodes) if n["id"] == id_), -1)
if node_index != -1:
display_name = None
show = g_nodes[node_index]["data"]["node"]["template"][field]["show"]
advanced = g_nodes[node_index]["data"]["node"]["template"][field][
"advanced"
]
if "display_name" in g_nodes[node_index]["data"]["node"]["template"][field]:
display_name = g_nodes[node_index]["data"]["node"]["template"][field][
"display_name"
]
else:
display_name = g_nodes[node_index]["data"]["node"]["template"][field][
"name"
]
g_nodes[node_index]["data"]["node"]["template"][field] = value
g_nodes[node_index]["data"]["node"]["template"][field]["show"] = show
g_nodes[node_index]["data"]["node"]["template"][field][
"advanced"
] = advanced
g_nodes[node_index]["data"]["node"]["template"][field][
"display_name"
] = display_name
def update_target_handle(new_edge, g_nodes, group_node_id):
"""
Updates the target handle of a given edge if it is a proxy node.
Args:
new_edge (dict): The edge to update.
g_nodes (list): The list of nodes in the graph.
group_node_id (str): The ID of the group node.
Returns:
dict: The updated edge.
"""
target_handle = new_edge["data"]["targetHandle"]
if target_handle.get("proxy"):
proxy_id = target_handle["proxy"]["id"]
if node := next((n for n in g_nodes if n["id"] == proxy_id), None):
set_new_target_handle(proxy_id, new_edge, target_handle, node)
return new_edge
def set_new_target_handle(proxy_id, new_edge, target_handle, node):
"""
Sets a new target handle for a given edge.
Args:
proxy_id (str): The ID of the proxy.
new_edge (dict): The new edge to be created.
target_handle (dict): The target handle of the edge.
node (dict): The node containing the edge.
Returns:
None
"""
new_edge["target"] = proxy_id
_type = target_handle.get("type")
if _type is None:
raise KeyError("The 'type' key must be present in target_handle.")
field = target_handle["proxy"]["field"]
new_target_handle = {
"fieldName": field,
"type": _type,
"id": proxy_id,
}
if node["data"]["node"].get("flow"):
new_target_handle["proxy"] = {
"field": node["data"]["node"]["template"][field]["proxy"]["field"],
"id": node["data"]["node"]["template"][field]["proxy"]["id"],
}
if input_types := target_handle.get("inputTypes"):
new_target_handle["inputTypes"] = input_types
new_edge["data"]["targetHandle"] = new_target_handle
def update_source_handle(new_edge, g_nodes, g_edges):
"""
Updates the source handle of a given edge to the last node in the flow data.
Args:
new_edge (dict): The edge to update.
flow_data (dict): The flow data containing the nodes and edges.
Returns:
dict: The updated edge with the new source handle.
"""
last_node = copy.deepcopy(find_last_node(g_nodes, g_edges))
new_edge["source"] = last_node["id"]
new_source_handle = new_edge["data"]["sourceHandle"]
new_source_handle["id"] = last_node["id"]
new_edge["data"]["sourceHandle"] = new_source_handle
return new_edge
def get_updated_edges(base_flow, g_nodes, g_edges, group_node_id):
"""
Given a base flow, a list of graph nodes and a group node id, returns a list of updated edges.
An updated edge is an edge that has its target or source handle updated based on the group node id.
Args:
base_flow (dict): The base flow containing a list of edges.
g_nodes (list): A list of graph nodes.
group_node_id (str): The id of the group node.
Returns:
list: A list of updated edges.
"""
updated_edges = []
for edge in base_flow["edges"]:
new_edge = copy.deepcopy(edge)
if new_edge["target"] == group_node_id:
new_edge = update_target_handle(new_edge, g_nodes, group_node_id)
if new_edge["source"] == group_node_id:
new_edge = update_source_handle(new_edge, g_nodes, g_edges)
if edge["target"] == group_node_id or edge["source"] == group_node_id:
updated_edges.append(new_edge)
return updated_edges

View file

@ -38,6 +38,8 @@ class Vertex:
self.task_id: Optional[str] = None
self.is_task = is_task
self.params = params or {}
self.parent_node_id: Optional[str] = self._data.get("parent_node_id")
self.parent_is_top_level = False
def reset_params(self):
for edge in self.edges:
@ -88,6 +90,11 @@ class Vertex:
self._built = False
self.artifacts: Dict[str, Any] = {}
self.task_id: Optional[str] = None
self.parent_node_id = state["parent_node_id"]
self.parent_is_top_level = state["parent_is_top_level"]
def set_top_level(self, top_level_nodes: List[str]) -> None:
self.parent_is_top_level = self.parent_node_id in top_level_nodes
def _parse_data(self) -> None:
self.data = self._data["data"]
@ -209,6 +216,16 @@ class Vertex:
}
elif isinstance(_value, dict):
params[key] = _value
elif value.get("type") == "int" and value.get("value") is not None:
try:
params[key] = int(value.get("value"))
except ValueError:
params[key] = value.get("value")
elif value.get("type") == "float" and value.get("value") is not None:
try:
params[key] = float(value.get("value"))
except ValueError:
params[key] = value.get("value")
else:
params[key] = value.get("value")
@ -342,7 +359,7 @@ class Vertex:
except Exception as exc:
logger.exception(exc)
raise ValueError(
f"Error building node {self.vertex_type}: {str(exc)}"
f"Error building node {self.vertex_type}(ID:{self.id}): {str(exc)}"
) from exc
def _update_built_object_and_artifacts(self, result):

View file

@ -1,6 +1,6 @@
from typing import Any, List, Optional
from langchain import LLMChain
from langchain.chains.llm import LLMChain
from langchain.agents import (
AgentExecutor,
Tool,

View file

@ -1,4 +1,4 @@
from langchain import LLMChain
from langchain.chains.llm import LLMChain
from langchain.agents import AgentExecutor, ZeroShotAgent
from langchain.agents.agent_toolkits.json.prompt import JSON_PREFIX, JSON_SUFFIX
from langchain.agents.agent_toolkits.json.toolkit import JsonToolkit

View file

@ -1,65 +1,33 @@
from langchain import PromptTemplate
from langchain.chains.base import Chain
from langchain.document_loaders.base import BaseLoader
from langchain.embeddings.base import Embeddings
from langchain.llms.base import BaseLLM
from langchain.schema import BaseRetriever, Document
from langchain.text_splitter import TextSplitter
from langchain.tools import Tool
from langchain.vectorstores.base import VectorStore
from langchain.schema import BaseOutputParser
from langchain.schema.memory import BaseMemory
from langchain.memory.chat_memory import BaseChatMemory
from langchain.agents.agent import AgentExecutor
LANGCHAIN_BASE_TYPES = {
"Chain": Chain,
"AgentExecutor": AgentExecutor,
"Tool": Tool,
"BaseLLM": BaseLLM,
"PromptTemplate": PromptTemplate,
"BaseLoader": BaseLoader,
"Document": Document,
"TextSplitter": TextSplitter,
"VectorStore": VectorStore,
"Embeddings": Embeddings,
"BaseRetriever": BaseRetriever,
"BaseOutputParser": BaseOutputParser,
"BaseMemory": BaseMemory,
"BaseChatMemory": BaseChatMemory,
}
# Langchain base types plus Python base types
CUSTOM_COMPONENT_SUPPORTED_TYPES = {
**LANGCHAIN_BASE_TYPES,
"str": str,
"int": int,
"float": float,
"bool": bool,
"list": list,
"dict": dict,
}
DEFAULT_CUSTOM_COMPONENT_CODE = """from langflow import CustomComponent
from langchain.llms.base import BaseLLM
from langchain.chains import LLMChain
from langchain import PromptTemplate
from langchain.schema import Document
from langflow.field_typing import (
Tool,
PromptTemplate,
Chain,
BaseChatMemory,
BaseLLM,
BaseLoader,
BaseMemory,
BaseOutputParser,
BaseRetriever,
VectorStore,
Embeddings,
TextSplitter,
Document,
AgentExecutor,
NestedDict,
Data,
)
import requests
class YourComponent(CustomComponent):
class Component(CustomComponent):
display_name: str = "Custom Component"
description: str = "Create any custom component you want!"
def build_config(self):
return { "url": { "multiline": True, "required": True } }
return {"param": {"display_name": "Parameter"}}
def build(self, param: Data) -> Data:
return param
def build(self, url: str, llm: BaseLLM, prompt: PromptTemplate) -> Document:
response = requests.get(url)
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(response.text[:300])
return Document(page_content=str(result))
"""

View file

@ -1,7 +1,7 @@
from typing import Any, Callable, ClassVar, List, Optional, Union, Dict
from uuid import UUID
from fastapi import HTTPException
from langflow.interface.custom.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES
from langflow.field_typing.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES
from langflow.interface.custom.component import Component
from langflow.interface.custom.directory_reader import DirectoryReader
from langflow.services.getters import get_db_service
@ -108,6 +108,9 @@ class CustomComponent(Component, extra=Extra.allow):
),
},
)
elif not arg.get("type"):
# Set the type to Data
arg["type"] = "Data"
return args
@property

View file

@ -3,7 +3,7 @@
import importlib
from typing import Any, Type
from langchain import PromptTemplate
from langchain.prompts import PromptTemplate
from langchain.agents import Agent
from langchain.base_language import BaseLanguageModel
from langchain.chains.base import Chain

View file

@ -111,7 +111,7 @@ def instantiate_based_on_type(class_object, base_type, node_type, params, user_i
elif base_type == "vectorstores":
return instantiate_vectorstore(class_object, params)
elif base_type == "documentloaders":
return instantiate_documentloader(class_object, params)
return instantiate_documentloader(node_type, class_object, params)
elif base_type == "textsplitters":
return instantiate_textsplitter(class_object, params)
elif base_type == "utilities":
@ -321,7 +321,9 @@ def instantiate_vectorstore(class_object: Type[VectorStore], params: Dict):
return vecstore
def instantiate_documentloader(class_object: Type[BaseLoader], params: Dict):
def instantiate_documentloader(
node_type: str, class_object: Type[BaseLoader], params: Dict
):
if "file_filter" in params:
# file_filter will be a string but we need a function
# that will be used to filter the files using file_filter
@ -341,6 +343,11 @@ def instantiate_documentloader(class_object: Type[BaseLoader], params: Dict):
raise ValueError(
"The metadata you provided is not a valid JSON string."
) from exc
if node_type == "WebBaseLoader":
if web_path := params.pop("web_path", None):
params["web_paths"] = [web_path]
docs = class_object(**params).load()
# Now if metadata is an empty dict, we will not add it to the documents
if metadata:

View file

@ -4,7 +4,7 @@ from typing import Any, List
from langflow.api.utils import get_new_key
from langflow.interface.agents.base import agent_creator
from langflow.interface.chains.base import chain_creator
from langflow.interface.custom.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES
from langflow.field_typing.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES
from langflow.interface.custom.utils import extract_inner_type
from langflow.interface.document_loaders.base import documentloader_creator
from langflow.interface.embeddings.base import embedding_creator
@ -288,6 +288,24 @@ def add_base_classes(frontend_node, return_types: List[str]):
frontend_node.get("base_classes").append(base_class)
def add_output_types(frontend_node, return_types: List[str]):
"""Add output types to the frontend node"""
for return_type in return_types:
if return_type not in CUSTOM_COMPONENT_SUPPORTED_TYPES or return_type is None:
raise HTTPException(
status_code=400,
detail={
"error": (
"Invalid return type should be one of: "
f"{list(CUSTOM_COMPONENT_SUPPORTED_TYPES.keys())}"
),
"traceback": traceback.format_exc(),
},
)
frontend_node.get("output_types").append(return_type)
def build_langchain_template_custom_component(custom_component: CustomComponent):
"""Build a custom component template for the langchain"""
try:
@ -314,6 +332,9 @@ def build_langchain_template_custom_component(custom_component: CustomComponent)
add_base_classes(
frontend_node, custom_component.get_function_entrypoint_return_type
)
add_output_types(
frontend_node, custom_component.get_function_entrypoint_return_type
)
logger.debug("Added base classes")
return frontend_node
except Exception as exc:

View file

@ -1,6 +1,6 @@
from typing import Dict, List, Optional, Type
from langchain import SQLDatabase, utilities
from langchain import utilities
from langflow.custom.customs import get_custom_nodes
from langflow.interface.base import LangChainTypeCreator
@ -32,7 +32,7 @@ class UtilityCreator(LangChainTypeCreator):
utility_name: import_class(f"langchain.utilities.{utility_name}")
for utility_name in utilities.__all__
}
self.type_dict["SQLDatabase"] = SQLDatabase
self.type_dict["SQLDatabase"] = utilities.SQLDatabase
# Filter according to settings.utilities
self.type_dict = {
name: utility

View file

@ -36,7 +36,7 @@ def pil_to_base64(image: Image) -> str:
return img_str.decode("utf-8")
def try_setting_streaming_options(langchain_object, websocket):
def try_setting_streaming_options(langchain_object):
# If the LLM type is OpenAI or ChatOpenAI,
# set streaming to True
# First we need to find the LLM

View file

@ -1,6 +1,6 @@
from typing import ClassVar, Dict, List, Optional
from langchain import requests, sql_database
from langchain.utilities import requests, sql_database
from langflow.interface.base import LangChainTypeCreator
from loguru import logger

View file

@ -34,7 +34,9 @@ def get_langfuse_callback(trace_id):
if langfuse := LangfuseInstance.get():
logger.debug("Langfuse credentials found")
try:
trace = langfuse.trace(CreateTrace(id=trace_id))
trace = langfuse.trace(
CreateTrace(name="langflow-" + trace_id, id=trace_id)
)
return trace.getNewHandler()
except Exception as exc:
logger.error(f"Error initializing langfuse callback: {exc}")

View file

@ -4,6 +4,8 @@ from gunicorn.app.base import BaseApplication # type: ignore
class LangflowApplication(BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.options["worker_class"] = "uvicorn.workers.UvicornWorker"
self.application = app
super().__init__()

View file

@ -1,6 +1,7 @@
from collections import defaultdict
import uuid
from fastapi import WebSocket, status
from starlette.websockets import WebSocketState
from langflow.api.v1.schemas import ChatMessage, ChatResponse, FileResponse
from langflow.interface.utils import pil_to_base64
from langflow.services.base import Service
@ -125,7 +126,8 @@ class ChatService(Service):
):
# Process the graph data and chat message
chat_inputs = payload.pop("inputs", {})
chat_inputs = ChatMessage(message=chat_inputs)
chatkey = payload.pop("chatKey", None)
chat_inputs = ChatMessage(message=chat_inputs, chatKey=chatkey)
self.chat_history.add_message(client_id, chat_inputs)
# graph_data = payload
@ -140,7 +142,7 @@ class ChatService(Service):
result, intermediate_steps = await process_graph(
langchain_object=langchain_object,
chat_inputs=chat_inputs,
websocket=self.active_connections[client_id],
client_id=client_id,
session_id=self.connection_ids[client_id],
)
self.set_cache(client_id, langchain_object)
@ -200,11 +202,11 @@ class ChatService(Service):
while True:
json_payload = await websocket.receive_json()
try:
if isinstance(json_payload, str):
payload = orjson.loads(json_payload)
except Exception:
elif isinstance(json_payload, dict):
payload = json_payload
if "clear_history" in payload:
if "clear_history" in payload and payload["clear_history"]:
self.chat_history.history[client_id] = []
continue
@ -216,23 +218,29 @@ class ChatService(Service):
else:
raise RuntimeError(
f"Could not find a LangChain object for client_id {client_id}"
f"Could not find a build result for client_id {client_id}"
)
except Exception as exc:
# Handle any exceptions that might occur
logger.error(f"Error handling websocket: {exc}")
await self.close_connection(
client_id=client_id,
code=status.WS_1011_INTERNAL_ERROR,
reason=str(exc)[:120],
)
finally:
try:
logger.exception(f"Error handling websocket: {exc}")
if websocket.client_state == WebSocketState.CONNECTED:
await self.close_connection(
client_id=client_id,
code=status.WS_1000_NORMAL_CLOSURE,
reason="Client disconnected",
code=status.WS_1011_INTERNAL_ERROR,
reason=str(exc)[:120],
)
elif websocket.client_state == WebSocketState.DISCONNECTED:
self.disconnect(client_id)
finally:
try:
# first check if the connection is still open
if websocket.client_state == WebSocketState.CONNECTED:
await self.close_connection(
client_id=client_id,
code=status.WS_1000_NORMAL_CLOSURE,
reason="Client disconnected",
)
except Exception as exc:
logger.error(f"Error closing connection: {exc}")
self.disconnect(client_id)

View file

@ -1,4 +1,3 @@
from fastapi import WebSocket
from langflow.api.v1.schemas import ChatMessage
from langflow.processing.base import get_result_and_steps
from langflow.interface.utils import try_setting_streaming_options
@ -8,10 +7,10 @@ from loguru import logger
async def process_graph(
langchain_object,
chat_inputs: ChatMessage,
websocket: WebSocket,
client_id: str,
session_id: str,
):
langchain_object = try_setting_streaming_options(langchain_object, websocket)
langchain_object = try_setting_streaming_options(langchain_object)
logger.debug("Loaded langchain object")
if langchain_object is None:
@ -30,7 +29,7 @@ async def process_graph(
result, intermediate_steps = await get_result_and_steps(
langchain_object,
chat_inputs.message,
websocket=websocket,
client_id=client_id,
session_id=session_id,
)
logger.debug("Generated result and intermediate_steps")

View file

@ -6,6 +6,7 @@ from langflow.services.database.utils import Result, TableResults
from langflow.services.getters import get_settings_service
from sqlalchemy import inspect
import sqlalchemy as sa
from sqlalchemy.exc import OperationalError
from sqlmodel import SQLModel, Session, create_engine
from loguru import logger
from alembic.config import Config
@ -58,6 +59,27 @@ class DatabaseService(Service):
with Session(self.engine) as session:
yield session
def migrate_flows_if_auto_login(self):
# if auto_login is enabled, we need to migrate the flows
# to the default superuser if they don't have a user id
# associated with them
settings_service = get_settings_service()
if settings_service.auth_settings.AUTO_LOGIN:
with Session(self.engine) as session:
flows = (
session.query(models.Flow)
.filter(models.Flow.user_id == None) # noqa
.all()
)
if flows:
logger.debug("Migrating flows to default superuser")
username = settings_service.auth_settings.SUPERUSER
user = get_user_by_username(session, username)
for flow in flows:
flow.user_id = user.id
session.commit()
logger.debug("Flows migrated successfully")
def check_schema_health(self) -> bool:
inspector = inspect(self.engine)
@ -93,7 +115,35 @@ class DatabaseService(Service):
return True
def init_alembic(self):
logger.info("Initializing alembic")
alembic_cfg = Config()
alembic_cfg.set_main_option("script_location", str(self.script_location))
alembic_cfg.set_main_option("sqlalchemy.url", self.database_url)
command.stamp(alembic_cfg, "head")
logger.info("Alembic initialized")
def run_migrations(self):
# First we need to check if alembic has been initialized
# If not, we need to initialize it
# if not self.script_location.exists(): # this is not the correct way to check if alembic has been initialized
# We need to check if the alembic_version table exists
# if not, we need to initialize alembic
with Session(self.engine) as session:
# If the table does not exist it throws an error
# so we need to catch it
try:
session.execute("SELECT * FROM alembic_version")
except Exception:
logger.info("Alembic not initialized")
try:
self.init_alembic()
except Exception as exc:
logger.error(f"Error initializing alembic: {exc}")
raise RuntimeError("Error initializing alembic") from exc
else:
logger.info("Alembic already initialized")
logger.info(f"Running DB migrations in {self.script_location}")
alembic_cfg = Config()
alembic_cfg.set_main_option("script_location", str(self.script_location))
@ -133,19 +183,31 @@ class DatabaseService(Service):
return results
def create_db_and_tables(self):
logger.debug("Creating database and tables")
try:
SQLModel.metadata.create_all(self.engine)
except Exception as exc:
logger.error(f"Error creating database and tables: {exc}")
raise RuntimeError("Error creating database and tables") from exc
# Now check if the table "flow" exists, if not, something went wrong
# and we need to create the tables again.
from sqlalchemy import inspect
inspector = inspect(self.engine)
table_names = inspector.get_table_names()
current_tables = ["flow", "user", "apikey"]
if table_names and all(table in table_names for table in current_tables):
logger.debug("Database and tables already exist")
return
logger.debug("Creating database and tables")
for table in SQLModel.metadata.sorted_tables:
try:
table.create(self.engine, checkfirst=True)
except OperationalError as oe:
logger.warning(
f"Table {table} already exists, skipping. Exception: {oe}"
)
except Exception as exc:
logger.error(f"Error creating table {table}: {exc}")
raise RuntimeError(f"Error creating table {table}") from exc
# Now check if the required tables exist, if not, something went wrong.
inspector = inspect(self.engine)
table_names = inspector.get_table_names()
for table in current_tables:
if table not in table_names:

View file

@ -12,7 +12,7 @@ if TYPE_CHECKING:
class ApiKeyBase(SQLModelSerializable):
name: Optional[str] = Field(index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
last_used_at: Optional[datetime] = Field(default=None)
last_used_at: Optional[datetime] = Field(default=None, nullable=True)
total_uses: int = Field(default=0)
is_active: bool = Field(default=True)

View file

@ -22,7 +22,7 @@ def create_api_key(
session: Session, api_key_create: ApiKeyCreate, user_id: UUID
) -> UnmaskedApiKeyRead:
# Generate a random API key with 32 bytes of randomness
generated_api_key = f"lf-{secrets.token_urlsafe(32)}"
generated_api_key = f"sk-{secrets.token_urlsafe(32)}"
api_key = ApiKey(
api_key=generated_api_key,

View file

@ -2,6 +2,7 @@
from langflow.services.database.models.base import SQLModelSerializable
from pydantic import validator
from sqlmodel import Field, JSON, Column, Relationship
from uuid import UUID, uuid4
from typing import Dict, Optional, TYPE_CHECKING
@ -12,8 +13,8 @@ if TYPE_CHECKING:
class FlowBase(SQLModelSerializable):
name: str = Field(index=True)
description: Optional[str] = Field(index=True, default="")
data: Optional[Dict] = Field(default=None)
description: Optional[str] = Field(index=True)
data: Optional[Dict] = Field(default=None, nullable=True)
@validator("data")
def validate_json(v):

View file

@ -15,7 +15,7 @@ class User(SQLModelSerializable, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True, unique=True)
username: str = Field(index=True, unique=True)
password: str = Field()
profile_image: Optional[str] = Field(default=None)
profile_image: Optional[str] = Field(default=None, nullable=True)
is_active: bool = Field(default=False)
is_superuser: bool = Field(default=False)
create_at: datetime = Field(default_factory=datetime.utcnow)

View file

@ -13,7 +13,17 @@ def initialize_database():
logger.debug("Initializing database")
from langflow.services import service_manager, ServiceType
database_service = service_manager.get(ServiceType.DATABASE_SERVICE)
database_service: "DatabaseService" = service_manager.get(
ServiceType.DATABASE_SERVICE
)
try:
database_service.create_db_and_tables()
except Exception as exc:
# if the exception involves tables already existing
# we can ignore it
if "already exists" not in str(exc):
logger.error(f"Error creating DB and tables: {exc}")
raise RuntimeError("Error creating DB and tables") from exc
try:
database_service.check_schema_health()
except Exception as exc:
@ -22,7 +32,11 @@ def initialize_database():
try:
database_service.run_migrations()
except CommandError as exc:
if "Can't locate revision identified by" not in str(exc):
# if "overlaps with other requested revisions" or "Can't locate revision identified by"
# are not in the exception, we can't handle it
if "overlaps with other requested revisions" not in str(
exc
) and "Can't locate revision identified by" not in str(exc):
raise exc
# This means there's wrong revision in the DB
# We need to delete the alembic_version table
@ -39,7 +53,6 @@ def initialize_database():
if "already exists" not in str(exc):
logger.error(f"Error running migrations: {exc}")
raise RuntimeError("Error running migrations") from exc
database_service.create_db_and_tables()
logger.debug("Database initialized")

View file

@ -90,7 +90,10 @@ class ServiceManager:
if service is None:
continue
logger.debug(f"Teardown service {service.name}")
service.teardown()
try:
service.teardown()
except Exception as exc:
logger.exception(exc)
self.services = {}
self.factories = {}
self.dependencies = {}
@ -99,51 +102,6 @@ class ServiceManager:
service_manager = ServiceManager()
def initialize_services():
"""
Initialize all the services needed.
"""
from langflow.services.database import factory as database_factory
from langflow.services.cache import factory as cache_factory
from langflow.services.chat import factory as chat_factory
from langflow.services.settings import factory as settings_factory
from langflow.services.session import factory as session_service_factory
from langflow.services.auth import factory as auth_factory
from langflow.services.task import factory as task_factory
service_manager.register_factory(settings_factory.SettingsServiceFactory())
service_manager.register_factory(
database_factory.DatabaseServiceFactory(),
dependencies=[ServiceType.SETTINGS_SERVICE],
)
service_manager.register_factory(
cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(
auth_factory.AuthServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(chat_factory.ChatServiceFactory())
service_manager.register_factory(
session_service_factory.SessionServiceFactory(),
dependencies=[ServiceType.CACHE_SERVICE],
)
service_manager.register_factory(
task_factory.TaskServiceFactory(),
)
# Test cache connection
service_manager.get(ServiceType.CACHE_SERVICE)
# Test database connection
service_manager.get(ServiceType.DATABASE_SERVICE)
# Test cache connection
service_manager.get(ServiceType.CACHE_SERVICE)
# Test database connection
service_manager.get(ServiceType.DATABASE_SERVICE)
def reinitialize_services():
"""
Reinitialize all the services needed.
@ -194,10 +152,3 @@ def initialize_session_service():
session_service_factory.SessionServiceFactory(),
dependencies=[ServiceType.CACHE_SERVICE],
)
def teardown_services():
"""
Teardown all the services.
"""
service_manager.teardown()

View file

@ -35,7 +35,7 @@ class AuthSettings(BaseSettings):
# If AUTO_LOGIN = True
# > The application does not request login and logs in automatically as a super user.
AUTO_LOGIN: bool = False
AUTO_LOGIN: bool = True
NEW_USER_IS_ACTIVE: bool = False
SUPERUSER: str = DEFAULT_SUPERUSER
SUPERUSER_PASSWORD: str = DEFAULT_SUPERUSER_PASSWORD

View file

@ -34,6 +34,8 @@ class AnyIOTaskResult:
class AnyIOBackend(TaskBackend):
name = "anyio"
def __init__(self):
self.tasks = {}

View file

@ -5,6 +5,8 @@ from langflow.worker import celery_app
class CeleryBackend(TaskBackend):
name = "celery"
def __init__(self):
self.celery_app = celery_app

View file

@ -35,6 +35,10 @@ class TaskService(Service):
self.backend = self.get_backend()
self.use_celery = USE_CELERY
@property
def backend_name(self) -> str:
return self.backend.name
def get_backend(self) -> TaskBackend:
if USE_CELERY:
from langflow.services.task.backends.celery import CeleryBackend

View file

@ -1,4 +1,4 @@
from langflow.services.auth.utils import create_super_user
from langflow.services.auth.utils import create_super_user, verify_password
from langflow.services.database.utils import initialize_database
from langflow.services.manager import service_manager
from langflow.services.schema import ServiceType
@ -6,50 +6,121 @@ from langflow.services.settings.constants import (
DEFAULT_SUPERUSER,
DEFAULT_SUPERUSER_PASSWORD,
)
from .getters import get_session, get_settings_service
from sqlmodel import Session
from .getters import get_db_service, get_session, get_settings_service
from loguru import logger
def setup_superuser(settings_service, session):
"""
Setup the superuser.
"""
# We will use the SUPERUSER and SUPERUSER_PASSWORD
# vars on settings_manager.auth_settings to create the superuser
# if it does not exist.
def get_factories_and_deps():
from langflow.services.database import factory as database_factory
from langflow.services.cache import factory as cache_factory
from langflow.services.chat import factory as chat_factory
from langflow.services.settings import factory as settings_factory
from langflow.services.auth import factory as auth_factory
from langflow.services.task import factory as task_factory
from langflow.services.session import factory as session_service_factory # type: ignore
return [
(settings_factory.SettingsServiceFactory(), []),
(
auth_factory.AuthServiceFactory(),
[ServiceType.SETTINGS_SERVICE],
),
(
database_factory.DatabaseServiceFactory(),
[ServiceType.SETTINGS_SERVICE],
),
(
cache_factory.CacheServiceFactory(),
[ServiceType.SETTINGS_SERVICE],
),
(chat_factory.ChatServiceFactory(), []),
(task_factory.TaskServiceFactory(), []),
(
session_service_factory.SessionServiceFactory(),
[ServiceType.CACHE_SERVICE],
),
]
def get_or_create_super_user(session: Session, username, password, is_default):
from langflow.services.database.models.user.user import User
user = session.query(User).filter(User.username == username).first()
if user and user.is_superuser:
return None # Superuser already exists
if user and is_default:
if user.is_superuser:
if verify_password(password, user.password):
return None
else:
# Superuser exists but password is incorrect
# which means that the user has changed the
# base superuser credentials.
# This means that the user has already created
# a superuser and changed the password in the UI
# so we don't need to do anything.
logger.debug(
"Superuser exists but password is incorrect. "
"This means that the user has changed the "
"base superuser credentials."
)
return None
else:
logger.debug(
"User with superuser credentials exists but is not a superuser."
)
return None
if user:
if verify_password(password, user.password):
raise ValueError(
"User with superuser credentials exists but is not a superuser."
)
else:
raise ValueError("Incorrect superuser credentials")
if is_default:
logger.debug("Creating default superuser.")
else:
logger.debug("Creating superuser.")
try:
return create_super_user(username, password, db=session)
except Exception as exc:
if "UNIQUE constraint failed: user.username" in str(exc):
# This is to deal with workers running this
# at startup and trying to create the superuser
# at the same time.
logger.debug("Superuser already exists.")
return None
def setup_superuser(settings_service, session: Session):
if settings_service.auth_settings.AUTO_LOGIN:
logger.debug("AUTO_LOGIN is set to True. Creating default superuser.")
username = settings_service.auth_settings.SUPERUSER
password = settings_service.auth_settings.SUPERUSER_PASSWORD
if username == DEFAULT_SUPERUSER and password == DEFAULT_SUPERUSER_PASSWORD:
logger.debug("Default superuser credentials detected.")
logger.debug("Creating default superuser.")
else:
logger.debug("Creating superuser.")
is_default = (username == DEFAULT_SUPERUSER) and (
password == DEFAULT_SUPERUSER_PASSWORD
)
try:
from langflow.services.database.models.user.user import User
user = session.query(User).filter(User.username == username).first()
if user and user.is_superuser is True:
return
user = get_or_create_super_user(
session=session, username=username, password=password, is_default=is_default
)
if user is not None:
logger.debug("Superuser created successfully.")
except Exception as exc:
logger.exception(exc)
raise RuntimeError(
"Could not create superuser. Please create a superuser manually."
) from exc
try:
# create superuser
create_super_user(db=session, username=username, password=password)
except Exception as exc:
logger.exception(exc)
raise RuntimeError(
"Could not create superuser. Please create a superuser manually."
) from exc
# reset superuser credentials
settings_service.auth_settings.reset_credentials()
logger.debug("Superuser created successfully.")
finally:
settings_service.auth_settings.reset_credentials()
def teardown_superuser(settings_service, session):
@ -60,17 +131,21 @@ def teardown_superuser(settings_service, session):
# from the database.
if settings_service.auth_settings.AUTO_LOGIN:
logger.debug("AUTO_LOGIN is set to True. Removing default superuser.")
username = settings_service.auth_settings.SUPERUSER
from langflow.services.database.models.user.user import User
try:
logger.debug("AUTO_LOGIN is set to True. Removing default superuser.")
username = settings_service.auth_settings.SUPERUSER
from langflow.services.database.models.user.user import User
user = session.query(User).filter(User.username == username).first()
if user and user.is_superuser:
session.delete(user)
session.commit()
logger.debug("Default superuser removed successfully.")
else:
logger.debug("Default superuser not found.")
user = session.query(User).filter(User.username == username).first()
if user and user.is_superuser:
session.delete(user)
session.commit()
logger.debug("Default superuser removed successfully.")
else:
logger.debug("Default superuser not found.")
except Exception as exc:
logger.exception(exc)
raise RuntimeError("Could not remove default superuser.") from exc
def teardown_services():
@ -79,6 +154,9 @@ def teardown_services():
"""
try:
teardown_superuser(get_settings_service(), next(get_session()))
except Exception as exc:
logger.exception(exc)
try:
service_manager.teardown()
except Exception as exc:
logger.exception(exc)
@ -116,38 +194,24 @@ def initialize_services():
"""
Initialize all the services needed.
"""
from langflow.services.database import factory as database_factory
from langflow.services.cache import factory as cache_factory
from langflow.services.chat import factory as chat_factory
from langflow.services.settings import factory as settings_factory
from langflow.services.auth import factory as auth_factory
from langflow.services.task import factory as task_factory
from langflow.services.session import factory as session_service_factory # type: ignore
for factory, dependencies in get_factories_and_deps():
try:
service_manager.register_factory(factory, dependencies=dependencies)
except Exception as exc:
logger.exception(exc)
raise RuntimeError(
"Could not initialize services. Please check your settings."
) from exc
service_manager.register_factory(settings_factory.SettingsServiceFactory())
service_manager.register_factory(
auth_factory.AuthServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(
database_factory.DatabaseServiceFactory(),
dependencies=[ServiceType.SETTINGS_SERVICE],
)
service_manager.register_factory(
cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]
)
service_manager.register_factory(chat_factory.ChatServiceFactory())
service_manager.register_factory(task_factory.TaskServiceFactory())
service_manager.register_factory(
session_service_factory.SessionServiceFactory(),
dependencies=[ServiceType.CACHE_SERVICE],
)
# Test cache connection
service_manager.get(ServiceType.CACHE_SERVICE)
# Test database connection
service_manager.get(ServiceType.DATABASE_SERVICE)
# Setup the superuser
initialize_database()
session = next(get_session())
setup_superuser(service_manager.get(ServiceType.SETTINGS_SERVICE), session)
setup_superuser(
service_manager.get(ServiceType.SETTINGS_SERVICE), next(get_session())
)
try:
get_db_service().migrate_flows_if_auto_login()
except Exception as exc:
logger.error(f"Error migrating flows: {exc}")
raise RuntimeError("Error migrating flows") from exc

View file

@ -189,7 +189,7 @@ class InitializeAgentNode(FrontendNode):
),
TemplateField(
field_type="Tool",
required=False,
required=True,
show=True,
name="tools",
is_list=True,

View file

@ -44,7 +44,7 @@ class FieldFormatters(BaseModel):
class FrontendNode(BaseModel):
template: Template
description: str
description: Optional[str] = None
base_classes: List[str]
name: str = ""
display_name: str = ""
@ -164,7 +164,7 @@ class FrontendNode(BaseModel):
) -> None:
"""Handles specific field values for certain fields."""
if key == "headers":
field.value = """{'Authorization': 'Bearer <token>'}"""
field.value = """{"Authorization": "Bearer <token>"}"""
FrontendNode._handle_model_specific_field_values(field, key, name)
FrontendNode._handle_api_key_specific_field_values(field, key, name)
@ -249,4 +249,4 @@ class FrontendNode(BaseModel):
if "default" in value:
field.value = value["default"]
if key == "headers":
field.value = """{'Authorization': 'Bearer <token>'}"""
field.value = """{"Authorization": "Bearer <token>"}"""

View file

@ -65,4 +65,11 @@ INPUT_KEY_INFO = """The variable to be used as Chat Input when more than one var
OUTPUT_KEY_INFO = """The variable to be used as Chat Output (e.g. answer in a ConversationalRetrievalChain)"""
CLASSES_TO_REMOVE = ["Serializable", "BaseModel", "object", "Runnable", "Generic"]
CLASSES_TO_REMOVE = [
"RunnableSerializable",
"Serializable",
"BaseModel",
"object",
"Runnable",
"Generic",
]

View file

@ -2,6 +2,7 @@ from langflow.template.field.base import TemplateField
from langflow.template.frontend_node.base import FrontendNode
from langflow.template.template.base import Template
from langflow.interface.custom.constants import DEFAULT_CUSTOM_COMPONENT_CODE
from typing import Optional
class CustomComponentFrontendNode(FrontendNode):
@ -24,7 +25,7 @@ class CustomComponentFrontendNode(FrontendNode):
)
],
)
description: str = "Create any custom component you want!"
description: Optional[str] = None
base_classes: list[str] = []
def to_dict(self):

View file

@ -145,7 +145,7 @@ class HeadersDefaultValueFormatter(FieldFormatter):
def format(self, field: TemplateField, name: Optional[str] = None) -> None:
key = field.name
if key == "headers":
field.value = """{'Authorization': 'Bearer <token>'}"""
field.value = """{"Authorization": "Bearer <token>"}"""
class DictCodeFileFormatter(FieldFormatter):

View file

@ -44,6 +44,7 @@ class PromptFrontendNode(FrontendNode):
# All prompt fields should be password=False
field.password = False
field.dynamic = True
class PromptTemplateNode(FrontendNode):

View file

@ -11,6 +11,10 @@ from langflow.utils import constants
from langchain.schema import Document
def remove_ansi_escape_codes(text):
return re.sub(r"\x1b\[[0-9;]*[a-zA-Z]", "", text)
def build_template_from_function(
name: str, type_to_loader_dict: Dict, add_function: bool = False
):
@ -187,7 +191,9 @@ def get_base_classes(cls):
"""Get the base classes of a class.
These are used to determine the output of the nodes.
"""
if bases := cls.__bases__:
if hasattr(cls, "__bases__") and cls.__bases__:
bases = cls.__bases__
result = []
for base in bases:
if any(type in base.__module__ for type in ["pydantic", "abc"]):
@ -428,7 +434,7 @@ def set_headers_value(value: Dict[str, Any]) -> None:
"""
Sets the value for the 'headers' key.
"""
value["value"] = """{'Authorization': 'Bearer <token>'}"""
value["value"] = """{"Authorization": "Bearer <token>"}"""
def add_options_to_field(