Merge dev into shortcuts_settings

This commit is contained in:
igorrCarvalho 2024-05-23 15:40:53 -03:00
commit ec5d5f2b38
275 changed files with 19166 additions and 14947 deletions

View file

@ -4,6 +4,19 @@
# Do not commit .env file to git
# Do not change .env.example file
# Config directory
# Directory where files, logs and database will be stored
# Example: LANGFLOW_CONFIG_DIR=~/.langflow
LANGFLOW_CONFIG_DIR=
# Save database in the config directory
# Values: true, false
# If false, the database will be saved in Langflow's root directory
# This means that the database will be deleted when Langflow is uninstalled
# and that the database will not be shared between different virtual environments
# Example: LANGFLOW_SAVE_DB_IN_CONFIG_DIR=true
LANGFLOW_SAVE_DB_IN_CONFIG_DIR=
# Database URL
# Postgres example: LANGFLOW_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/langflow
# SQLite example:
@ -56,7 +69,6 @@ LANGFLOW_REMOVE_API_KEYS=
# LANGFLOW_REDIS_CACHE_EXPIRE (default: 3600)
LANGFLOW_CACHE_TYPE=
# Set AUTO_LOGIN to false if you want to disable auto login
# and use the login form to login. LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD
# must be set if AUTO_LOGIN is set to false

View file

@ -22,8 +22,9 @@ jobs:
strategy:
matrix:
python-version:
- "3.10"
- "3.12"
- "3.11"
- "3.10"
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }} + Poetry ${{ env.POETRY_VERSION }}

View file

@ -23,8 +23,9 @@ jobs:
strategy:
matrix:
python-version:
- "3.10"
- "3.12"
- "3.11"
- "3.10"
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
steps:

View file

@ -8,7 +8,7 @@ on:
env:
POETRY_VERSION: "1.8.2"
NODE_VERSION: "21"
PYTHON_VERSION: "3.10"
PYTHON_VERSION: "3.12"
# Define the directory where Playwright browsers will be installed.
# Adjust if your project uses a different path.
PLAYWRIGHT_BROWSERS_PATH: "ms-playwright"

View file

@ -141,7 +141,7 @@ backend:
@echo 'Setting up the environment'
@make setup_env
make install_backend
@-kill -9 `lsof -t -i:7860`
@-kill -9 $(lsof -t -i:7860)
ifdef login
@echo "Running backend autologin is $(login)";
LANGFLOW_AUTO_LOGIN=$(login) poetry run uvicorn --factory langflow.main:create_app --host 0.0.0.0 --port 7860 --reload --env-file .env --loop asyncio

View file

@ -100,6 +100,12 @@ Alternatively, click the **"Open in Cloud Shell"** button below to launch Google
## Deploy on Railway
Use this template to deploy Langflow 1.0 Preview on Railway:
[![Deploy 1.0 Preview on Railway](https://railway.app/button.svg)](https://railway.app/template/UsJ1uB?referralCode=MnPSdg)
Or this one to deploy Langflow 0.6.x:
[![Deploy on Railway](https://railway.app/button.svg)](https://railway.app/template/JMXEWp?referralCode=MnPSdg)
## Deploy on Render

View file

@ -10,7 +10,7 @@
# PYTHON-BASE
# Sets up all our shared environment variables
################################
FROM python:3.10-slim as python-base
FROM python:3.12-slim as python-base
# python
ENV PYTHONUNBUFFERED=1 \
@ -47,7 +47,7 @@ ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
# Used to build deps + create our virtual environment
################################
FROM python-base as builder-base
RUN
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
# deps for installing poetry
@ -55,7 +55,12 @@ RUN apt-get update \
# deps for building python deps
build-essential \
# npm
npm
npm \
# gcc
gcc \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
# Now we need to copy the entire project into the image
@ -70,15 +75,12 @@ RUN --mount=type=cache,target=/root/.cache \
RUN python -m pip install requests && cd ./scripts && python update_dependencies.py
RUN $POETRY_HOME/bin/poetry lock
RUN $POETRY_HOME/bin/poetry build
# Final stage for the application
FROM python-base as final
# Copy virtual environment and built .tar.gz from builder base
RUN useradd -m -u 1000 user
COPY --from=builder-base /app/dist/*.tar.gz ./
# Install the package from the .tar.gz
RUN python -m pip install *.tar.gz --user
RUN python -m pip install /app/dist/*.tar.gz --user
WORKDIR /app
ENTRYPOINT ["python", "-m", "langflow", "run"]
CMD ["--host", "0.0.0.0", "--port", "7860"]

View file

@ -10,7 +10,7 @@
# PYTHON-BASE
# Sets up all our shared environment variables
################################
FROM python:3.10-slim as python-base
FROM python:3.12-slim as python-base
# python
ENV PYTHONUNBUFFERED=1 \
@ -55,6 +55,8 @@ RUN apt-get update \
build-essential \
# npm
npm \
# gcc
gcc \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
@ -78,15 +80,11 @@ RUN cp -r src/frontend/build src/backend/base/langflow/frontend
RUN rm -rf src/backend/base/dist
RUN cd src/backend/base && $POETRY_HOME/bin/poetry build --format sdist
# Final stage for the application
FROM python-base as final
# Copy virtual environment and built .tar.gz from builder base
RUN useradd -m -u 1000 user
COPY --from=builder-base /app/src/backend/base/dist/*.tar.gz ./
# Install the package from the .tar.gz
RUN pip install *.tar.gz --user
RUN python -m pip install /app/dist/*.tar.gz --user
WORKDIR /app
ENTRYPOINT ["python", "-m", "langflow", "run"]
CMD ["--host", "0.0.0.0", "--port", "7860"]

View file

@ -0,0 +1,45 @@
import ZoomableImage from "/src/theme/ZoomableImage.js";
import Admonition from "@theme/Admonition";
# Global environment variables
Langflow 1.0 alpha includes the option to add **Global Environment Variables** for your application.
## Add a global variable to a project
In this example, you'll add the `openai_api_key` credential as a global environment variable to the **Basic Prompting** starter project.
For more information on the starter flow, see [Basic prompting](../starter-projects/basic-prompting.mdx).
1. From the Langflow dashboard, click **New Project**.
2. Select **Basic Prompting**.
The **Basic Prompting** flow is created.
3. To create an environment variable for the **OpenAI** component:
1. In the **OpenAI API Key** field, click the **Globe** button, and then click **Add New Variable**.
2. In the **Variable Name** field, enter `openai_api_key`.
3. In the **Value** field, paste your OpenAI API Key (`sk-...`).
4. For the variable **Type**, select **Credential**.
5. In the **Apply to Fields** field, select **OpenAI API Key** to apply this variable to all fields named **OpenAI API Key**.
6. Click **Save Variable**.
You now have a `openai_api_key` global environment variable for your Langflow project.
<Admonition type="tip">
You can also create global variables in **Settings** > **Variables and
Secrets**.
</Admonition>
<ZoomableImage
alt="Docusaurus themed image"
sources={{
light: "img/global-env.png",
dark: "img/global-env.png",
}}
style={{ width: "40%", margin: "20px auto" }}
/>
4. To view and manage your project's global environment variables, visit **Settings** > **Variables and Secrets**.
For more on variables in HuggingFace Spaces, see [Managing Secrets](https://huggingface.co/docs/hub/spaces-overview#managing-secrets).

View file

@ -0,0 +1,29 @@
import ThemedImage from "@theme/ThemedImage";
import useBaseUrl from "@docusaurus/useBaseUrl";
import ZoomableImage from "/src/theme/ZoomableImage.js";
import ReactPlayer from "react-player";
import Admonition from "@theme/Admonition";
# Playground
In Langflow 1.0 alpha, the **Playground** replaces the **Interaction Panel**.
The **Playground** provides an interface for interacting with flows without opening them in the flow editor.
It even works for flows hosted on the Langflow store!
As long as you have a flow's environment variables set, you can run it by clicking the **Playground** button.
1. From your **Collections** page, click **Playground** in one of your flows.
The **Playground** window opens.
<ZoomableImage
alt="Docusaurus themed image"
sources={{
light: useBaseUrl("img/playground-chat.png"),
dark: useBaseUrl("img/playground-chat.png"),
}}
style={{ width: "50%", maxWidth: "600px", margin: "0 auto" }}
/>
2. Chat with your bot as you normally would, all without having to open the editor.

View file

@ -0,0 +1,45 @@
import ZoomableImage from "/src/theme/ZoomableImage.js";
# How to contribute components?
As of Langflow 1.0 alpha, new components are added as objects of the [CustomComponent](https://github.com/langflow-ai/langflow/blob/dev/src/backend/base/langflow/interface/custom/custom_component/custom_component.py) class and any dependencies are added to the [pyproject.toml](https://github.com/langflow-ai/langflow/blob/dev/pyproject.toml#L27) file.
## Add an example component
You have a new document loader called **MyCustomDocumentLoader** and it would look awesome in Langflow.
1. Write your loader as an object of the [CustomComponent](https://github.com/langflow-ai/langflow/blob/dev/src/backend/base/langflow/interface/custom/custom_component/custom_component.py) class. You'll create a new class, `MyCustomDocumentLoader`, that will inherit from `CustomComponent` and override the base class's methods.
2. Define optional attributes like `display_name`, `description`, and `documentation` to provide information about your custom component.
3. Implement the `build_config` method to define the configuration options for your custom component.
4. Implement the `build` method to define the logic for taking input parameters specified in the `build_config` method and returning the desired output.
5. Add the code to the [/components/documentloaders](https://github.com/langflow-ai/langflow/tree/dev/src/backend/base/langflow/components) folder.
6. Add the dependency to [/documentloaders/\_\_init\_\_.py](https://github.com/langflow-ai/langflow/blob/dev/src/backend/base/langflow/components/documentloaders/__init__.py) as `from .MyCustomDocumentLoader import MyCustomDocumentLoader`.
7. Add any new dependencies to the outer [pyproject.toml](https://github.com/langflow-ai/langflow/blob/dev/pyproject.toml#L27) file.
8. Submit documentation for your component. For this example, you'd submit documentation to the [loaders page](https://github.com/langflow-ai/langflow/blob/dev/docs/docs/components/loaders.mdx).
9. Submit your changes as a pull request. The Langflow team will have a look, suggest changes, and add your component to Langflow.
## User Sharing
You might want to share and test your custom component with others, but don't need it merged into the main source code.
If so, you can share your component on the Langflow store.
1. [Register at the Langflow store](https://www.langflow.store/login/).
2. Undergo pre-validation before receiving an API key.
3. To deploy your amazing component directly to the Langflow store, without it being merged into the main source code, navigate to your flow, and then click **Share**.
The share window appears:
<ZoomableImage
alt="Docusaurus themed image"
sources={{
light: "img/add-component-to-store.png",
dark: "img/add-component-to-store.png",
}}
style={{ width: "50%", margin: "20px auto" }}
/>
5. Choose whether you want to flow to be public or private.
You can also **Export** your flow as a JSON file from this window.
When you're ready to share the flow, click **Share Flow**.
You should see a **Flow shared successfully** popup.
6. To confirm, navigate to the **Langflow Store** and filter results by **Created By Me**. You should see your new flow on the **Langflow Store**.

View file

@ -38,6 +38,8 @@ module.exports = {
"administration/login",
"administration/api",
"administration/cli",
"administration/playground",
"administration/global-env",
"administration/components",
"administration/collection",
"administration/prompt-customization",
@ -126,6 +128,7 @@ module.exports = {
"contributing/how-contribute",
"contributing/github-issues",
"contributing/community",
"contributing/contribute-component",
],
},
],

Binary file not shown.

After

Width:  |  Height:  |  Size: 136 KiB

BIN
docs/static/img/global-env.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

BIN
docs/static/img/playground-chat.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 132 KiB

File diff suppressed because one or more lines are too long

2038
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "langflow"
version = "1.0.0a30"
version = "1.0.0a36"
description = "A Python package with a built-in web application"
authors = ["Langflow <contact@langflow.org>"]
maintainers = [
@ -25,7 +25,7 @@ langflow = "langflow.__main__:main"
[tool.poetry.dependencies]
python = ">=3.10,<3.12"
python = ">=3.10,<3.13"
langflow-base = { path = "./src/backend/base", develop = true }
beautifulsoup4 = "^4.12.2"
google-search-results = "^2.4.1"
@ -72,7 +72,7 @@ langchain-cohere = "^0.1.0rc1"
elasticsearch = "^8.12.0"
pytube = "^15.0.0"
llama-index = "^0.10.13"
unstructured = { extras = ["md"], version = "^0.12.4" }
# unstructured = { extras = ["md"], version = "^0.12.4" }
dspy-ai = "^2.4.0"
html2text = "^2024.2.26"
assemblyai = "^0.23.1"
@ -85,6 +85,8 @@ zep-python = { version = "^2.0.0rc5", allow-prereleases = true }
langchain-google-vertexai = "^1.0.3"
langchain-groq = "^0.1.3"
langchain-pinecone = "^0.1.0"
langchain-mistralai = "^0.1.6"
[tool.poetry.group.dev.dependencies]
types-redis = "^4.6.0.5"

View file

@ -1,65 +1,85 @@
import { Construct } from 'constructs';
import * as ec2 from 'aws-cdk-lib/aws-ec2'
import { Construct } from "constructs";
import * as ec2 from "aws-cdk-lib/aws-ec2";
import * as rds from "aws-cdk-lib/aws-rds";
import * as cdk from 'aws-cdk-lib';
import * as cdk from "aws-cdk-lib";
interface RdsProps {
vpc: ec2.Vpc
dbSG:ec2.SecurityGroup
vpc: ec2.Vpc;
dbSG: ec2.SecurityGroup;
}
export class Rds extends Construct{
readonly rdsCluster: rds.DatabaseCluster
export class Rds extends Construct {
readonly rdsCluster: rds.DatabaseCluster;
constructor(scope: Construct, id:string, props: RdsProps){
constructor(scope: Construct, id: string, props: RdsProps) {
super(scope, id);
const {vpc, dbSG} = props
const instanceType = ec2.InstanceType.of(ec2.InstanceClass.BURSTABLE4_GRAVITON, ec2.InstanceSize.MEDIUM)
const { vpc, dbSG } = props;
const instanceType = ec2.InstanceType.of(
ec2.InstanceClass.BURSTABLE4_GRAVITON,
ec2.InstanceSize.MEDIUM,
);
// RDSのパスワードを自動生成してSecrets Managerに格納
const rdsCredentials = rds.Credentials.fromGeneratedSecret('db_user',{
secretName: 'langflow-DbSecret',
})
const rdsCredentials = rds.Credentials.fromGeneratedSecret("db_user", {
secretName: "langflow-DbSecret",
});
// DB クラスターのパラメータグループ作成
const clusterParameterGroup = new rds.ParameterGroup(scope, 'ClusterParameterGroup',{
engine: rds.DatabaseClusterEngine.auroraMysql({
version: rds.AuroraMysqlEngineVersion.VER_3_02_0
}),
description: 'for-langflow',
})
clusterParameterGroup.bindToCluster({})
const clusterParameterGroup = new rds.ParameterGroup(
scope,
"ClusterParameterGroup",
{
engine: rds.DatabaseClusterEngine.auroraMysql({
version: rds.AuroraMysqlEngineVersion.of(
"8.0.mysql_aurora.3.05.2",
"8.0",
),
}),
description: "for-langflow",
},
);
clusterParameterGroup.bindToCluster({});
// DB インスタンスのパラメタグループ作成
const instanceParameterGroup = new rds.ParameterGroup(scope, 'InstanceParameterGroup',{
engine: rds.DatabaseClusterEngine.auroraMysql({
version: rds.AuroraMysqlEngineVersion.VER_3_02_0,
}),
description: 'for-langflow',
})
instanceParameterGroup.bindToInstance({})
const instanceParameterGroup = new rds.ParameterGroup(
scope,
"InstanceParameterGroup",
{
engine: rds.DatabaseClusterEngine.auroraMysql({
version: rds.AuroraMysqlEngineVersion.of(
"8.0.mysql_aurora.3.05.2",
"8.0",
),
}),
description: "for-langflow",
},
);
instanceParameterGroup.bindToInstance({});
this.rdsCluster = new rds.DatabaseCluster(scope, 'LangflowDbCluster', {
this.rdsCluster = new rds.DatabaseCluster(scope, "LangflowDbCluster", {
engine: rds.DatabaseClusterEngine.auroraMysql({
version: rds.AuroraMysqlEngineVersion.VER_3_02_0,
version: rds.AuroraMysqlEngineVersion.of(
"8.0.mysql_aurora.3.05.2",
"8.0",
),
}),
storageEncrypted: true,
credentials: rdsCredentials,
instanceIdentifierBase: 'langflow-instance',
vpc:vpc,
vpcSubnets:vpc.selectSubnets({
subnetGroupName: 'langflow-Isolated',
instanceIdentifierBase: "langflow-instance",
vpc: vpc,
vpcSubnets: vpc.selectSubnets({
subnetGroupName: "langflow-Isolated",
}),
securityGroups:[dbSG],
securityGroups: [dbSG],
writer: rds.ClusterInstance.provisioned("WriterInstance", {
instanceType: instanceType,
enablePerformanceInsights: true,
parameterGroup:instanceParameterGroup,
parameterGroup: instanceParameterGroup,
}),
// 2台目以降はreaders:で設定
// 2台目以降はreaders:で設定
parameterGroup: clusterParameterGroup,
defaultDatabaseName: 'langflow',
})
defaultDatabaseName: "langflow",
});
}
}

File diff suppressed because it is too large Load diff

View file

@ -11,21 +11,21 @@
"cdk": "cdk"
},
"devDependencies": {
"@types/jest": "^29.5.1",
"@types/node": "20.1.7",
"aws-cdk": "^2.86.0",
"jest": "^29.5.0",
"ts-jest": "^29.1.0",
"ts-node": "^10.9.1",
"typescript": "~5.1.3"
"@types/jest": "^29.5.12",
"@types/node": "^20.12.12",
"aws-cdk": "^2.141.0",
"jest": "^29.7.0",
"ts-jest": "^29.1.2",
"ts-node": "^10.9.2",
"typescript": "^5.4.5"
},
"dependencies": {
"@aws-solutions-constructs/aws-cloudfront-s3": "^2.49.0",
"aws-cdk-lib": "^2.124.0",
"cdk-ecr-deployment": "^2.5.30",
"constructs": "^10.0.0",
"deploy-time-build": "^0.3.12",
"dotenv": "^16.3.1",
"@aws-solutions-constructs/aws-cloudfront-s3": "^2.57.0",
"aws-cdk-lib": "^2.141.0",
"cdk-ecr-deployment": "^3.0.55",
"constructs": "^10.3.0",
"deploy-time-build": "^0.3.21",
"dotenv": "^16.4.5",
"source-map-support": "^0.5.21"
}
}

View file

@ -48,8 +48,10 @@ apt -y upgrade
# Install Python 3 pip, Langflow, and Nginx
apt -y install python3-pip
pip install langflow
langflow --host 0.0.0.0 --port 7860
pip3 install pip -U
apt -y update
pip3 install langflow
langflow run --host 0.0.0.0 --port 7860
EOF
)

View file

@ -131,3 +131,4 @@ dmypy.json
# Pyre type checker
.pyre/
*.db

View file

@ -2,6 +2,7 @@ import platform
import socket
import sys
import time
import warnings
from pathlib import Path
from typing import Optional
@ -16,8 +17,10 @@ from rich import print as rprint
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from sqlmodel import select
from langflow.main import setup_app
from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
from langflow.services.utils import initialize_services
@ -431,17 +434,57 @@ def superuser(
# Verify that the superuser was created
from langflow.services.database.models.user.model import User
user: User = session.query(User).filter(User.username == username).first()
user: User = session.exec(select(User).where(User.username == username)).first()
if user is None or not user.is_superuser:
typer.echo("Superuser creation failed.")
return
# Now create the first folder for the user
result = create_default_folder_if_it_doesnt_exist(session, user.id)
if result:
typer.echo("Default folder created successfully.")
else:
raise RuntimeError("Could not create default folder.")
typer.echo("Superuser created successfully.")
else:
typer.echo("Superuser creation failed.")
# command to copy the langflow database from the cache to the current directory
# because now the database is stored per installation
@app.command()
def copy_db():
"""
Copy the database files to the current directory.
This function copies the 'langflow.db' and 'langflow-pre.db' files from the cache directory to the current directory.
If the files exist in the cache directory, they will be copied to the same directory as this script (__main__.py).
Returns:
None
"""
import shutil
from platformdirs import user_cache_dir
cache_dir = Path(user_cache_dir("langflow"))
db_path = cache_dir / "langflow.db"
pre_db_path = cache_dir / "langflow-pre.db"
# It should be copied to the current directory
# this file is __main__.py and it should be in the same directory as the database
destination_folder = Path(__file__).parent
if db_path.exists():
shutil.copy(db_path, destination_folder)
typer.echo(f"Database copied to {destination_folder}")
else:
typer.echo("Database not found in the cache directory.")
if pre_db_path.exists():
shutil.copy(pre_db_path, destination_folder)
typer.echo(f"Pre-release database copied to {destination_folder}")
else:
typer.echo("Pre-release database not found in the cache directory.")
@app.command()
def migration(
test: bool = typer.Option(True, help="Run migrations in test mode."),
@ -468,7 +511,9 @@ def migration(
def main():
app()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
app()
if __name__ == "__main__":

View file

@ -0,0 +1,78 @@
"""Add Folder table
Revision ID: 012fb73ac359
Revises: c153816fd85f
Create Date: 2024-05-07 12:52:16.954691
"""
from typing import Sequence, Union
import sqlalchemy as sa
import sqlmodel
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = "012fb73ac359"
down_revision: Union[str, None] = "c153816fd85f"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
# ### commands auto generated by Alembic - please adjust! ###
if "folder" not in table_names:
op.create_table(
"folder",
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("description", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("parent_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.ForeignKeyConstraint(
["parent_id"],
["folder.id"],
),
sa.ForeignKeyConstraint(
["user_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("id"),
)
indexes = inspector.get_indexes("folder")
if "ix_folder_name" not in [index["name"] for index in indexes]:
with op.batch_alter_table("folder", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_folder_name"), ["name"], unique=False)
if "folder_id" not in inspector.get_columns("flow"):
with op.batch_alter_table("flow", schema=None) as batch_op:
batch_op.add_column(sa.Column("folder_id", sqlmodel.sql.sqltypes.GUID(), nullable=True))
batch_op.create_foreign_key("flow_folder_id_fkey", "folder", ["folder_id"], ["id"])
batch_op.drop_column("folder")
# ### end Alembic commands ###
def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
# ### commands auto generated by Alembic - please adjust! ###
if "folder_id" in inspector.get_columns("flow"):
with op.batch_alter_table("flow", schema=None) as batch_op:
batch_op.add_column(sa.Column("folder", sa.VARCHAR(), nullable=True))
batch_op.drop_constraint("flow_folder_id_fkey", type_="foreignkey")
batch_op.drop_column("folder_id")
indexes = inspector.get_indexes("folder")
if "ix_folder_name" in [index["name"] for index in indexes]:
with op.batch_alter_table("folder", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_folder_name"))
if "folder" in table_names:
op.drop_table("folder")
# ### end Alembic commands ###

View file

@ -0,0 +1,43 @@
"""Add missing index
Revision ID: 29fe8f1f806b
Revises: 012fb73ac359
Create Date: 2024-05-21 09:23:48.772367
"""
from typing import Sequence, Union
from alembic import op
from sqlalchemy.engine.reflection import Inspector
revision: str = "29fe8f1f806b"
down_revision: Union[str, None] = "012fb73ac359"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
indexes = inspector.get_indexes("flow")
with op.batch_alter_table("flow", schema=None) as batch_op:
indexes_names = [index["name"] for index in indexes]
if "ix_flow_folder_id" not in indexes_names:
batch_op.create_index(batch_op.f("ix_flow_folder_id"), ["folder_id"], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
indexes = inspector.get_indexes("flow")
with op.batch_alter_table("flow", schema=None) as batch_op:
indexes_names = [index["name"] for index in indexes]
if "ix_flow_folder_id" in indexes_names:
batch_op.drop_index(batch_op.f("ix_flow_folder_id"))
# ### end Alembic commands ###

View file

@ -13,6 +13,7 @@ from langflow.api.v1 import (
users_router,
validate_router,
variables_router,
folders_router,
)
router = APIRouter(
@ -29,3 +30,4 @@ router.include_router(login_router)
router.include_router(variables_router)
router.include_router(files_router)
router.include_router(monitor_router)
router.include_router(folders_router)

View file

@ -9,6 +9,7 @@ from langflow.api.v1.store import router as store_router
from langflow.api.v1.users import router as users_router
from langflow.api.v1.validate import router as validate_router
from langflow.api.v1.variable import router as variables_router
from langflow.api.v1.folders import router as folders_router
__all__ = [
"chat_router",
@ -22,4 +23,5 @@ __all__ = [
"variables_router",
"monitor_router",
"files_router",
"folders_router",
]

View file

@ -29,7 +29,7 @@ from langflow.services.deps import get_chat_service, get_session, get_session_se
from langflow.services.monitor.utils import log_vertex_build
if TYPE_CHECKING:
from langflow.graph.vertex.types import ChatVertex
from langflow.graph.vertex.types import InterfaceVertex
from langflow.services.session.service import SessionService
router = APIRouter(tags=["Chat"])
@ -288,7 +288,7 @@ async def build_vertex_stream(
if not graph:
raise ValueError(f"No graph found for {flow_id}.")
vertex: "ChatVertex" = graph.get_vertex(vertex_id)
vertex: "InterfaceVertex" = graph.get_vertex(vertex_id)
if not hasattr(vertex, "stream"):
raise ValueError(f"Vertex {vertex_id} does not support streaming")
if isinstance(vertex._built_result, str) and vertex._built_result:

View file

@ -4,7 +4,7 @@ from typing import Annotated, List, Optional, Union
import sqlalchemy as sa
from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, status
from loguru import logger
from sqlmodel import Session, select
from sqlmodel import Session, col, select
from langflow.api.utils import update_frontend_node_with_template_values
from langflow.api.v1.schemas import (

View file

@ -1,18 +1,20 @@
from datetime import datetime
from datetime import datetime, timezone
from typing import List
from uuid import UUID
from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME
import orjson
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.encoders import jsonable_encoder
from loguru import logger
from sqlmodel import Session, select
from sqlmodel import Session, col, select
from langflow.api.utils import remove_api_keys, validate_is_component
from langflow.api.v1.schemas import FlowListCreate, FlowListRead
from langflow.api.v1.schemas import FlowListCreate, FlowListIds, FlowListRead
from langflow.initial_setup.setup import STARTER_FOLDER_NAME
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate
from langflow.services.database.models.folder.model import Folder
from langflow.services.database.models.user.model import User
from langflow.services.deps import get_session, get_settings_service
from langflow.services.settings.service import SettingsService
@ -33,7 +35,12 @@ def create_flow(
flow.user_id = current_user.id
db_flow = Flow.model_validate(flow, from_attributes=True)
db_flow.updated_at = datetime.utcnow()
db_flow.updated_at = datetime.now(timezone.utc)
if db_flow.folder_id is None:
default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
if default_folder:
db_flow.folder_id = default_folder.id
session.add(db_flow)
session.commit()
@ -67,7 +74,7 @@ def read_flows(
example_flows = session.exec(
select(Flow).where(
Flow.user_id == None, # noqa
Flow.folder == STARTER_FOLDER_NAME,
Flow.folder.has(Folder.name == STARTER_FOLDER_NAME),
)
).all()
for example_flow in example_flows:
@ -128,7 +135,11 @@ def update_flow(
for key, value in flow_data.items():
if value is not None:
setattr(db_flow, key, value)
db_flow.updated_at = datetime.utcnow()
db_flow.updated_at = datetime.now(timezone.utc)
if db_flow.folder_id is None:
default_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
if default_folder:
db_flow.folder_id = default_folder.id
session.add(db_flow)
session.commit()
session.refresh(db_flow)
@ -208,3 +219,31 @@ async def download_file(
"""Download all flows as a file."""
flows = read_flows(current_user=current_user, session=session, settings_service=settings_service)
return FlowListRead(flows=flows)
@router.post("/multiple_delete/")
async def delete_multiple_flows(
flow_ids: FlowListIds, user: User = Depends(get_current_active_user), db: Session = Depends(get_session)
):
"""
Delete multiple flows by their IDs.
Args:
flow_ids (List[str]): The list of flow IDs to delete.
user (User, optional): The user making the request. Defaults to the current active user.
Returns:
dict: A dictionary containing the number of flows deleted.
"""
try:
deleted_flows = db.exec(
select(Flow).where(col(Flow.id).in_(flow_ids.flow_ids)).where(Flow.user_id == user.id)
).all()
for flow in deleted_flows:
db.delete(flow)
db.commit()
return {"deleted": len(deleted_flows)}
except Exception as exc:
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc

View file

@ -0,0 +1,243 @@
from typing import List
from uuid import UUID
from fastapi import APIRouter, Depends, File, HTTPException, Response, UploadFile, status
from langflow.api.v1.flows import create_flows
from langflow.api.v1.schemas import FlowListCreate, FlowListReadWithFolderName
from langflow.initial_setup.setup import STARTER_FOLDER_NAME
from langflow.services.database.models.flow.model import Flow, FlowCreate, FlowRead
from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME
import orjson
from sqlalchemy import update
from sqlmodel import Session, select
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.folder.model import (
Folder,
FolderCreate,
FolderRead,
FolderReadWithFlows,
FolderUpdate,
)
from langflow.services.database.models.user.model import User
from langflow.services.deps import get_session
router = APIRouter(prefix="/folders", tags=["Folders"])
@router.post("/", response_model=FolderRead, status_code=201)
def create_folder(
*,
session: Session = Depends(get_session),
folder: FolderCreate,
current_user: User = Depends(get_current_active_user),
):
try:
new_folder = Folder.model_validate(folder, from_attributes=True)
new_folder.user_id = current_user.id
session.add(new_folder)
session.commit()
session.refresh(new_folder)
if folder.components_list.__len__() > 0:
update_statement_components = (
update(Flow).where(Flow.id.in_(folder.components_list)).values(folder_id=new_folder.id)
)
session.exec(update_statement_components)
session.commit()
if folder.flows_list.__len__() > 0:
update_statement_flows = update(Flow).where(Flow.id.in_(folder.flows_list)).values(folder_id=new_folder.id)
session.exec(update_statement_flows)
session.commit()
return new_folder
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/", response_model=List[FolderRead], status_code=200)
def read_folders(
*,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_active_user),
):
try:
folders = session.exec(select(Folder).where(Folder.user_id == current_user.id)).all()
return folders
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/starter-projects", response_model=FolderReadWithFlows, status_code=200)
def read_starter_folders(*, session: Session = Depends(get_session)):
try:
folders = session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first()
return folders
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{folder_id}", response_model=FolderReadWithFlows, status_code=200)
def read_folder(
*,
session: Session = Depends(get_session),
folder_id: UUID,
current_user: User = Depends(get_current_active_user),
):
try:
folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first()
if not folder:
raise HTTPException(status_code=404, detail="Folder not found")
folder.flows = session.exec(select(Flow).where(Flow.folder_id == folder_id)).all()
return folder
except Exception as e:
if "No result found" in str(e):
raise HTTPException(status_code=404, detail="Folder not found")
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{folder_id}", response_model=FolderRead, status_code=200)
def update_folder(
*,
session: Session = Depends(get_session),
folder_id: UUID,
folder: FolderUpdate, # Assuming FolderUpdate is a Pydantic model defining updatable fields
current_user: User = Depends(get_current_active_user),
):
try:
existing_folder = session.exec(
select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)
).first()
if not existing_folder:
raise HTTPException(status_code=404, detail="Folder not found")
folder_data = folder.model_dump(exclude_unset=True)
for key, value in folder_data.items():
if key != "components" and key != "flows":
setattr(existing_folder, key, value)
session.add(existing_folder)
session.commit()
session.refresh(existing_folder)
concat_folder_components = folder.components + folder.flows
flows_ids = session.exec(select(Flow.id).where(Flow.folder_id == existing_folder.id)).all()
excluded_flows = list(set(flows_ids) - set(concat_folder_components))
my_collection_folder = session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME)).first()
if my_collection_folder:
update_statement_my_collection = (
update(Flow).where(Flow.id.in_(excluded_flows)).values(folder_id=my_collection_folder.id)
)
session.exec(update_statement_my_collection)
session.commit()
if concat_folder_components.__len__() > 0:
update_statement_components = (
update(Flow).where(Flow.id.in_(concat_folder_components)).values(folder_id=existing_folder.id)
)
session.exec(update_statement_components)
session.commit()
return existing_folder
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/{folder_id}", status_code=204)
def delete_folder(
*,
session: Session = Depends(get_session),
folder_id: UUID,
current_user: User = Depends(get_current_active_user),
):
try:
folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first()
if not folder:
raise HTTPException(status_code=404, detail="Folder not found")
session.delete(folder)
session.commit()
flows = session.exec(select(Flow).where(Flow.folder_id == folder_id, Folder.user_id == current_user.id)).all()
for flow in flows:
session.delete(flow)
session.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/download/{folder_id}", response_model=FlowListReadWithFolderName, status_code=200)
async def download_file(
*,
session: Session = Depends(get_session),
folder_id: UUID,
current_user: User = Depends(get_current_active_user),
):
"""Download all flows from folder."""
try:
flows = session.exec(
select(Flow).distinct().join(Folder).where(Flow.folder_id == folder_id, Folder.user_id == current_user.id)
).all()
folder_name = (
session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first().name
)
folder_description = (
session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id))
.first()
.description
)
if not flows:
flows = []
return FlowListReadWithFolderName(flows=flows, folder_name=folder_name, folder_description=folder_description)
except Exception as e:
if "No result found" in str(e):
raise HTTPException(status_code=404, detail="Folder not found")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/upload/", response_model=List[FlowRead], status_code=201)
async def upload_file(
*,
session: Session = Depends(get_session),
file: UploadFile = File(...),
current_user: User = Depends(get_current_active_user),
):
"""Upload flows from a file."""
contents = await file.read()
data = orjson.loads(contents)
if data.__len__() == 0:
raise HTTPException(status_code=400, detail="No flows found in the file")
folder_results = session.exec(
select(Folder).where(Folder.name.like(f"{data['folder_name']}%"), Folder.user_id == current_user.id)
)
existing_folder_names = [folder.name for folder in folder_results]
if existing_folder_names.__len__() > 0:
data["folder_name"] = f"{data['folder_name']} ({existing_folder_names.__len__() + 1})"
folder = FolderCreate(name=data["folder_name"], description=data["folder_description"])
new_folder = Folder.model_validate(folder, from_attributes=True)
new_folder.id = None
new_folder.user_id = current_user.id
session.add(new_folder)
session.commit()
session.refresh(new_folder)
del data["folder_name"]
del data["folder_description"]
if "flows" in data:
flow_list = FlowListCreate(flows=[FlowCreate(**flow) for flow in data["flows"]])
else:
raise HTTPException(status_code=400, detail="No flows found in the data")
# Now we set the user_id for all flows
for flow in flow_list.flows:
flow.user_id = current_user.id
flow.folder_id = new_folder.id
return create_flows(session=session, flow_list=flow_list, current_user=current_user)

View file

@ -1,5 +1,7 @@
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import Session
from langflow.api.v1.schemas import Token
from langflow.services.auth.utils import (
authenticate_user,
@ -7,14 +9,10 @@ from langflow.services.auth.utils import (
create_user_longterm_token,
create_user_tokens,
)
from langflow.services.deps import (
get_session,
get_settings_service,
get_variable_service,
)
from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist
from langflow.services.deps import get_session, get_settings_service, get_variable_service
from langflow.services.settings.manager import SettingsService
from langflow.services.variable.service import VariableService
from sqlmodel import Session
router = APIRouter(tags=["Login"])
@ -58,6 +56,8 @@ async def login_to_get_access_token(
expires=auth_settings.ACCESS_TOKEN_EXPIRE_SECONDS,
)
variable_service.initialize_user_variables(user.id, db)
# Create default folder for user if it doesn't exist
create_default_folder_if_it_doesnt_exist(db, user.id)
return tokens
else:
raise HTTPException(
@ -86,6 +86,7 @@ async def auto_login(
expires=None, # Set to None to make it a session cookie
)
variable_service.initialize_user_variables(user_id, db)
create_default_folder_if_it_doesnt_exist(db, user_id)
return tokens
raise HTTPException(
@ -139,4 +140,3 @@ async def logout(response: Response):
response.delete_cookie("refresh_token_lf")
response.delete_cookie("access_token_lf")
return {"message": "Logout successful"}
return {"message": "Logout successful"}

View file

@ -1,10 +1,13 @@
from typing import Optional
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from langflow.services.deps import get_monitor_service
from langflow.services.monitor.schema import VertexBuildMapModel
from langflow.services.monitor.schema import (
MessageModelResponse,
TransactionModelResponse,
VertexBuildMapModel,
)
from langflow.services.monitor.service import MonitorService
router = APIRouter(prefix="/monitor", tags=["Monitor"])
@ -40,8 +43,9 @@ async def delete_vertex_builds(
raise HTTPException(status_code=500, detail=str(e))
@router.get("/messages")
@router.get("/messages", response_model=List[MessageModelResponse])
async def get_messages(
flow_id: Optional[str] = Query(None),
session_id: Optional[str] = Query(None),
sender: Optional[str] = Query(None),
sender_name: Optional[str] = Query(None),
@ -49,25 +53,32 @@ async def get_messages(
monitor_service: MonitorService = Depends(get_monitor_service),
):
try:
return monitor_service.get_messages(
df = monitor_service.get_messages(
flow_id=flow_id,
sender=sender,
sender_name=sender_name,
session_id=session_id,
order_by=order_by,
)
dicts = df.to_dict(orient="records")
return [MessageModelResponse(**d) for d in dicts]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/transactions")
@router.get("/transactions", response_model=List[TransactionModelResponse])
async def get_transactions(
source: Optional[str] = Query(None),
target: Optional[str] = Query(None),
status: Optional[str] = Query(None),
order_by: Optional[str] = Query("timestamp"),
flow_id: Optional[str] = Query(None),
monitor_service: MonitorService = Depends(get_monitor_service),
):
try:
return monitor_service.get_transactions(source=source, target=target, status=status, order_by=order_by)
dicts = monitor_service.get_transactions(
source=source, target=target, status=status, order_by=order_by, flow_id=flow_id
)
return [TransactionModelResponse(**d) for d in dicts]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View file

@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
@ -139,10 +139,20 @@ class FlowListCreate(BaseModel):
flows: List[FlowCreate]
class FlowListIds(BaseModel):
flow_ids: List[str]
class FlowListRead(BaseModel):
flows: List[FlowRead]
class FlowListReadWithFolderName(BaseModel):
flows: List[FlowRead]
folder_name: str
folder_description: str
class InitResponse(BaseModel):
flowId: str
@ -250,7 +260,7 @@ class VertexBuildResponse(BaseModel):
"""JSON string of the params."""
data: ResultDataResponse
"""Mapping of vertex ids to result dict containing the param name and result value."""
timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow)
timestamp: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc))
"""Timestamp of the build."""

View file

@ -13,6 +13,7 @@ from langflow.services.auth.utils import (
get_password_hash,
verify_password,
)
from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist
from langflow.services.database.models.user import User, UserCreate, UserRead, UserUpdate
from langflow.services.database.models.user.crud import get_user_by_id, update_user
from langflow.services.deps import get_session, get_settings_service
@ -36,6 +37,9 @@ def add_user(
session.add(new_user)
session.commit()
session.refresh(new_user)
folder = create_default_folder_if_it_doesnt_exist(session, new_user.id)
if not folder:
raise HTTPException(status_code=500, detail="Error creating default folder")
except IntegrityError as e:
session.rollback()
raise HTTPException(status_code=400, detail="This username is unavailable.") from e

View file

@ -67,23 +67,25 @@ def retrieve_file_paths(
return file_paths
def partition_file_to_record(file_path: str, silent_errors: bool) -> Optional[Record]:
# Use the partition function to load the file
from unstructured.partition.auto import partition # type: ignore
# ! Removing unstructured dependency until
# ! 3.12 is supported
# def partition_file_to_record(file_path: str, silent_errors: bool) -> Optional[Record]:
# # Use the partition function to load the file
# from unstructured.partition.auto import partition # type: ignore
try:
elements = partition(file_path)
except Exception as e:
if not silent_errors:
raise ValueError(f"Error loading file {file_path}: {e}") from e
return None
# try:
# elements = partition(file_path)
# except Exception as e:
# if not silent_errors:
# raise ValueError(f"Error loading file {file_path}: {e}") from e
# return None
# Create a Record
text = "\n\n".join([Text(el) for el in elements])
metadata = elements.metadata if hasattr(elements, "metadata") else {}
metadata["file_path"] = file_path
record = Record(text=text, data=metadata)
return record
# # Create a Record
# text = "\n\n".join([Text(el) for el in elements])
# metadata = elements.metadata if hasattr(elements, "metadata") else {}
# metadata["file_path"] = file_path
# record = Record(text=text, data=metadata)
# return record
def read_text_file(file_path: str) -> str:
@ -131,18 +133,20 @@ def parse_text_file_to_record(file_path: str, silent_errors: bool) -> Optional[R
return record
def get_elements(
file_paths: List[str],
silent_errors: bool,
max_concurrency: int,
use_multithreading: bool,
) -> List[Optional[Record]]:
if use_multithreading:
records = parallel_load_records(file_paths, silent_errors, max_concurrency)
else:
records = [partition_file_to_record(file_path, silent_errors) for file_path in file_paths]
records = list(filter(None, records))
return records
# ! Removing unstructured dependency until
# ! 3.12 is supported
# def get_elements(
# file_paths: List[str],
# silent_errors: bool,
# max_concurrency: int,
# use_multithreading: bool,
# ) -> List[Optional[Record]]:
# if use_multithreading:
# records = parallel_load_records(file_paths, silent_errors, max_concurrency)
# else:
# records = [partition_file_to_record(file_path, silent_errors) for file_path in file_paths]
# records = list(filter(None, records))
# return records
def parallel_load_records(

View file

@ -49,12 +49,12 @@ class ChatComponent(CustomComponent):
sender: Optional[str] = None,
sender_name: Optional[str] = None,
) -> list[Record]:
records = store_message(
message,
session_id=session_id,
sender=sender,
sender_name=sender_name,
flow_id=self.graph.flow_id,
)
self.status = records

View file

@ -57,7 +57,7 @@ class LCModelComponent(CustomComponent):
prompt_tokens = token_usage["prompt_tokens"]
total_tokens = token_usage["total_tokens"]
finish_reason = response_metadata["finish_reason"]
status_message = f"Tokens:\n- Input: {prompt_tokens}\nOutput: {completion_tokens}\nTotal Tokens: {total_tokens}\nStop Reason: {finish_reason}\nResponse: {content}"
status_message = f"Tokens:\nInput: {prompt_tokens}\nOutput: {completion_tokens}\nTotal Tokens: {total_tokens}\nStop Reason: {finish_reason}\nResponse: {content}"
elif all(key in response_metadata for key in anthropic_keys) and all(
key in response_metadata["usage"] for key in inner_anthropic_keys
):
@ -65,7 +65,7 @@ class LCModelComponent(CustomComponent):
input_tokens = usage["input_tokens"]
output_tokens = usage["output_tokens"]
stop_reason = response_metadata["stop_reason"]
status_message = f"Tokens:\n- Input: {input_tokens}\n- Output: {output_tokens}\nStop Reason: {stop_reason}\nResponse: {content}"
status_message = f"Tokens:\nInput: {input_tokens}\nOutput: {output_tokens}\nStop Reason: {stop_reason}\nResponse: {content}"
else:
status_message = f"Response: {content}"
else:

View file

@ -1 +1 @@
MODEL_NAMES = ["gpt-4-turbo", "gpt-4-turbo-preview", "gpt-3.5-turbo", "gpt-3.5-turbo-0125"]
MODEL_NAMES = ["gpt-4o", "gpt-4-turbo", "gpt-4-turbo-preview", "gpt-3.5-turbo", "gpt-3.5-turbo-0125"]

View file

@ -1,3 +1,6 @@
from copy import deepcopy
from langchain_core.documents import Document
from langflow.schema import Record
@ -27,19 +30,20 @@ def dict_values_to_string(d: dict) -> dict:
dict: The dictionary with values converted to strings.
"""
# Do something similar to the above
for key, value in d.items():
d_copy = deepcopy(d)
for key, value in d_copy.items():
# it could be a list of records or documents or strings
if isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, Record):
d[key][i] = record_to_string(item)
d_copy[key][i] = record_to_string(item)
elif isinstance(item, Document):
d[key][i] = document_to_string(item)
d_copy[key][i] = document_to_string(item)
elif isinstance(value, Record):
d[key] = record_to_string(value)
d_copy[key] = record_to_string(value)
elif isinstance(value, Document):
d[key] = document_to_string(value)
return d
d_copy[key] = document_to_string(value)
return d_copy
def document_to_string(document: Document) -> str:

View file

@ -8,7 +8,7 @@ from langchain.prompts import SystemMessagePromptTemplate
from langchain.prompts.chat import MessagesPlaceholder
from langchain.schema.memory import BaseMemory
from langchain.tools import Tool
from langchain_community.chat_models import ChatOpenAI
from langchain_openai import ChatOpenAI
from langflow.field_typing.range_spec import RangeSpec
from langflow.interface.custom.custom_component import CustomComponent

View file

@ -1,7 +1,7 @@
from typing import Callable, Union
from langchain.agents import AgentExecutor
from langchain.sql_database import SQLDatabase
from langchain_community.utilities import SQLDatabase
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_community.agent_toolkits.sql.base import create_sql_agent

View file

@ -0,0 +1,64 @@
from pydantic.v1 import SecretStr
from langchain_mistralai.embeddings import MistralAIEmbeddings
from langflow.interface.custom.custom_component import CustomComponent
from langflow.field_typing import Embeddings
class MistralAIEmbeddingsComponent(CustomComponent):
display_name = "MistralAI Embeddings"
description = "Generate embeddings using MistralAI models."
def build_config(self):
return {
"model": {
"display_name": "Model",
"advanced": False,
"options": ["mistral-embed"],
"value": "mistral-embed",
},
"mistral_api_key": {
"display_name": "Mistral API Key",
"password": True,
"advanced": False,
},
"max_concurrent_requests": {
"display_name": "Max Concurrent Requests",
"advanced": True,
"value": 64,
},
"max_retries": {
"display_name": "Max Retries",
"advanced": True,
"value": 5,
},
"timeout": {
"display_name": "Request Timeout",
"advanced": True,
"value": 120,
},
"endpoint": {"display_name": "API Endpoint", "advanced": True, "value": "https://api.mistral.ai/v1/"},
}
def build(
self,
mistral_api_key: str,
model: str = "mistral-embed",
max_concurrent_requests: int = 64,
max_retries: int = 5,
timeout: int = 120,
endpoint: str = "https://api.mistral.ai/v1/",
) -> Embeddings:
if mistral_api_key:
api_key = SecretStr(mistral_api_key)
else:
api_key = None
return MistralAIEmbeddings(
api_key=api_key,
model=model,
endpoint=endpoint,
max_concurrent_requests=max_concurrent_requests,
max_retries=max_retries,
timeout=timeout,
)

View file

@ -0,0 +1,29 @@
from typing import Union
from langflow.interface.custom.custom_component import CustomComponent
from langflow.schema import Record
from langflow.field_typing import Text
class PassComponent(CustomComponent):
display_name = "Pass"
description = "A pass-through component that forwards the second input while ignoring the first, used for controlling workflow direction."
field_order = ["ignored_input", "forwarded_input"]
def build_config(self) -> dict:
return {
"ignored_input": {
"display_name": "Ignored Input",
"info": "This input is ignored. It's used to control the flow in the graph.",
"input_types": ["Text", "Record"],
},
"forwarded_input": {
"display_name": "Input",
"info": "This input is forwarded by the component.",
"input_types": ["Text", "Record"],
},
}
def build(self, ignored_input: Text, forwarded_input: Text) -> Union[Text, Record]:
# The ignored_input is not used in the logic, it's just there for graph flow control
self.status = forwarded_input
return forwarded_input

View file

@ -1,5 +1,5 @@
from langchain_community.tools.sql_database.tool import QuerySQLDataBaseTool
from langchain_experimental.sql.base import SQLDatabase
from langchain_community.utilities import SQLDatabase
from langflow.field_typing import Text
from langflow.interface.custom.custom_component import CustomComponent

View file

@ -0,0 +1,49 @@
from typing import Optional
from langflow.field_typing import Text
from langflow.interface.custom.custom_component import CustomComponent
from langflow.schema import Record
from langflow.utils.util import unescape_string
class SplitTextComponent(CustomComponent):
display_name: str = "Split Text"
description: str = "Split text into chunks of a specified length."
def build_config(self):
return {
"inputs": {
"display_name": "Inputs",
"info": "Texts to split.",
"input_types": ["Record", "Text"],
},
"separator": {
"display_name": "Separator",
"info": 'The character to split on. Defaults to " ".',
},
"truncate_size": {
"display_name": "Truncate Size",
"info": "The maximum length (in number of characters) of each chunk to keep. Defaults to 0 (no truncation).",
},
}
def build(
self,
inputs: list[Text],
separator: str = " ",
truncate_size: Optional[int] = 0,
) -> list[Record]:
separator = unescape_string(separator)
outputs = []
for text in inputs:
chunks = text.split(separator)
if truncate_size:
chunks = [chunk[:truncate_size] for chunk in chunks]
for chunk in chunks:
outputs.append(Record(text=chunk, data={"parent": text}))
self.status = outputs
return outputs

View file

@ -32,7 +32,6 @@ class StoreMessageComponent(CustomComponent):
session_id: Optional[str] = None,
message: str = "",
) -> List[Record]:
store_message(
sender=sender,
sender_name=sender_name,

View file

@ -35,7 +35,7 @@ class SubFlowComponent(CustomComponent):
build_config["flow_name"]["options"] = self.get_flow_names()
# Clean up the build config
for key in list(build_config.keys()):
if key not in self.field_order + ["code", "_type"]:
if key not in self.field_order + ["code", "_type", "get_final_results_only"]:
del build_config[key]
if field_value is not None and field_name == "flow_name":
try:
@ -85,20 +85,29 @@ class SubFlowComponent(CustomComponent):
"display_name": "Tweaks",
"info": "Tweaks to apply to the flow.",
},
"get_final_results_only": {
"display_name": "Get Final Results Only",
"info": "If False, the output will contain all outputs from the flow.",
"advanced": True,
},
}
def build_records_from_result_data(self, result_data: ResultData) -> List[Record]:
def build_records_from_result_data(self, result_data: ResultData, get_final_results_only: bool) -> List[Record]:
messages = result_data.messages
if not messages:
return []
records = []
for message in messages:
message_dict = message if isinstance(message, dict) else message.model_dump()
record = Record(data={"result": result_data.model_dump(), "message": message_dict.get("message", "")})
if get_final_results_only:
result_data_dict = result_data.model_dump()
results = result_data_dict.get("results", {})
inner_result = results.get("result", {})
record = Record(data={"result": inner_result, "message": message_dict}, text_key="result")
records.append(record)
return records
async def build(self, flow_name: str, **kwargs) -> List[Record]:
async def build(self, flow_name: str, get_final_results_only: bool = True, **kwargs) -> List[Record]:
tweaks = {key: {"input_value": value} for key, value in kwargs.items()}
run_outputs: List[Optional[RunOutputs]] = await self.run_flow(
tweaks=tweaks,
@ -112,7 +121,7 @@ class SubFlowComponent(CustomComponent):
if run_output is not None:
for output in run_output.outputs:
if output:
records.extend(self.build_records_from_result_data(output))
records.extend(self.build_records_from_result_data(output, get_final_results_only))
self.status = records
logger.debug(records)

View file

@ -1,7 +1,10 @@
from typing import Optional, Union
from langflow.interface.custom.custom_component import CustomComponent
from langflow.schema import Record
from langflow.field_typing import Text
class TextOperatorComponent(CustomComponent):
display_name = "Text Operator"
description = "Compares two text inputs based on a specified condition such as equality or inequality, with optional case sensitivity."
@ -19,17 +22,29 @@ class TextOperatorComponent(CustomComponent):
"operator": {
"display_name": "Operator",
"info": "The operator to apply for comparing the texts.",
"options": ["equals", "not equals", "contains", "starts with", "ends with"],
"options": ["equals", "not equals", "contains", "starts with", "ends with", "exists"],
},
"case_sensitive": {
"display_name": "Case Sensitive",
"info": "If true, the comparison will be case sensitive.",
"field_type": "bool",
"default": False,
}
},
"true_output": {
"display_name": "Output",
"info": "The output to return or display when the comparison is true.",
"input_types": ["Text", "Record"], # Allow both text and record types
},
}
def build(self, input_text: Text, match_text: Text, operator: Text, case_sensitive: bool = False) -> Text:
def build(
self,
input_text: Text,
match_text: Text,
operator: Text,
case_sensitive: bool = False,
true_output: Optional[Text] = "",
) -> Union[Text, Record]:
if not input_text or not match_text:
raise ValueError("Both 'input_text' and 'match_text' must be provided and non-empty.")
@ -49,7 +64,13 @@ class TextOperatorComponent(CustomComponent):
elif operator == "ends with":
result = input_text.endswith(match_text)
if not result:
output_record = true_output if true_output else input_text
if result:
self.status = output_record
return output_record
else:
self.status = "Comparison failed, stopping execution."
self.stop()
self.status = f"{result} \n\n {input_text}"
return input_text
return output_record

View file

@ -1,87 +0,0 @@
from typing import Optional, Union
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langflow.field_typing import Text
from langflow.interface.custom.custom_component import CustomComponent
from langflow.schema import Record
from langflow.utils.util import unescape_string
class SplitTextComponent(CustomComponent):
display_name: str = "Split Text"
description: str = "Split text into chunks of a specified length."
def build_config(self):
return {
"inputs": {
"display_name": "Inputs",
"info": "Texts to split.",
"input_types": ["Record", "Text"],
},
"separators": {
"display_name": "Separators",
"info": 'The characters to split on. Defaults to [" "].',
"is_list": True,
},
"chunk_size": {
"display_name": "Max Chunk Size",
"info": "The maximum length (in number of characters) of each chunk.",
"field_type": "int",
"value": 1000,
},
"chunk_overlap": {
"display_name": "Chunk Overlap",
"info": "The amount of character overlap between chunks.",
"field_type": "int",
"value": 200,
},
"recursive": {
"display_name": "Recursive",
},
"code": {"show": False},
}
def build(
self,
inputs: list[Text],
separators: Optional[list[str]] = [" "],
chunk_size: Optional[int] = 1000,
chunk_overlap: Optional[int] = 200,
recursive: bool = False,
) -> list[Record]:
if separators is None:
separators = []
separators = [unescape_string(x) for x in separators]
# Make sure chunk_size and chunk_overlap are ints
if isinstance(chunk_size, str):
chunk_size = int(chunk_size)
if isinstance(chunk_overlap, str):
chunk_overlap = int(chunk_overlap)
splitter: Optional[Union[CharacterTextSplitter, RecursiveCharacterTextSplitter]] = None
if recursive:
splitter = RecursiveCharacterTextSplitter(
separators=separators,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
else:
splitter = CharacterTextSplitter(
separator=separators[0],
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
documents = []
for _input in inputs:
if isinstance(_input, Record):
documents.append(_input.to_lc_document())
else:
documents.append(Document(page_content=_input))
records = self.to_records(splitter.split_documents(documents))
self.status = records
return records

View file

@ -54,6 +54,10 @@ class ZepMessageReaderComponent(BaseMemoryComponent):
"info": "Limit of search results.",
"advanced": True,
},
"api_base_path": {
"display_name": "API Base Path",
"options": ["api/v1", "api/v2"],
},
}
def get_messages(self, **kwargs) -> list[Record]:
@ -108,6 +112,7 @@ class ZepMessageReaderComponent(BaseMemoryComponent):
def build(
self,
session_id: Text,
api_base_path: str = "api/v1",
url: Optional[Text] = None,
api_key: Optional[Text] = None,
query: Optional[Text] = None,
@ -118,12 +123,21 @@ class ZepMessageReaderComponent(BaseMemoryComponent):
try:
from zep_python import ZepClient
from zep_python.langchain import ZepChatMessageHistory
# Monkeypatch API_BASE_PATH to
# avoid 404
# This is a workaround for the local Zep instance
# cloud Zep works with v2
import zep_python.zep_client
zep_python.zep_client.API_BASE_PATH = api_base_path
except ImportError:
raise ImportError(
"Could not import zep-python package. " "Please install it with `pip install zep-python`."
)
if url == "":
url = None
zep_client = ZepClient(api_url=url, api_key=api_key)
memory = ZepChatMessageHistory(session_id=session_id, zep_client=zep_client)
records = self.get_messages(

View file

@ -1,5 +1,4 @@
from typing import Optional, TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
from langflow.base.memory.memory import BaseMemoryComponent
from langflow.field_typing import Text
@ -39,6 +38,10 @@ class ZepMessageWriterComponent(BaseMemoryComponent):
"display_name": "Input Record",
"info": "Record to write to Zep.",
},
"api_base_path": {
"display_name": "API Base Path",
"options": ["api/v1", "api/v2"],
},
}
def add_message(
@ -77,18 +80,27 @@ class ZepMessageWriterComponent(BaseMemoryComponent):
self,
input_value: Record,
session_id: Text,
api_base_path: str = "api/v1",
url: Optional[Text] = None,
api_key: Optional[Text] = None,
) -> Record:
try:
# Monkeypatch API_BASE_PATH to
# avoid 404
# This is a workaround for the local Zep instance
# cloud Zep works with v2
import zep_python.zep_client
from zep_python import ZepClient
from zep_python.langchain import ZepChatMessageHistory
zep_python.zep_client.API_BASE_PATH = api_base_path
except ImportError:
raise ImportError(
"Could not import zep-python package. " "Please install it with `pip install zep-python`."
)
if url == "":
url = None
zep_client = ZepClient(api_url=url, api_key=api_key)
memory = ZepChatMessageHistory(session_id=session_id, zep_client=zep_client)
self.add_message(**input_value.data, memory=memory)

View file

@ -1,6 +1,6 @@
from typing import Optional
from langchain.llms.baidu_qianfan_endpoint import QianfanLLMEndpoint
from langchain_community.llms.baidu_qianfan_endpoint import QianfanLLMEndpoint
from langflow.interface.custom.custom_component import CustomComponent
from langflow.field_typing import BaseLanguageModel

View file

@ -0,0 +1,87 @@
from typing import Optional
from langchain_mistralai import ChatMistralAI
from pydantic.v1 import SecretStr
from langflow.custom import CustomComponent
from langflow.field_typing import BaseLanguageModel
class MistralAIModelComponent(CustomComponent):
display_name: str = "MistralAI"
description: str = "Generate text using MistralAI LLMs."
icon = "MistralAI"
field_order = [
"model",
"mistral_api_key",
"max_tokens",
"temperature",
"mistral_api_base",
]
def build_config(self):
return {
"model": {
"display_name": "Model Name",
"options": [
"open-mistral-7b",
"open-mixtral-8x7b",
"open-mixtral-8x22b",
"mistral-small-latest",
"mistral-medium-latest",
"mistral-large-latest",
],
"info": "Name of the model to use.",
"required": True,
"value": "open-mistral-7b",
},
"mistral_api_key": {
"display_name": "Mistral API Key",
"required": True,
"password": True,
"info": "Your Mistral API key.",
},
"max_tokens": {
"display_name": "Max Tokens",
"field_type": "int",
"advanced": True,
"value": 256,
},
"temperature": {
"display_name": "Temperature",
"field_type": "float",
"value": 0.1,
},
"mistral_api_base": {
"display_name": "Mistral API Base",
"advanced": True,
"info": "Endpoint of the Mistral API. Defaults to 'https://api.mistral.ai' if not specified.",
},
"code": {"show": False},
}
def build(
self,
model: str,
temperature: float = 0.1,
mistral_api_key: Optional[str] = None,
max_tokens: Optional[int] = None,
mistral_api_base: Optional[str] = None,
) -> BaseLanguageModel:
# Set default API endpoint if not provided
if not mistral_api_base:
mistral_api_base = "https://api.mistral.ai"
try:
output = ChatMistralAI(
model_name=model,
api_key=(SecretStr(mistral_api_key) if mistral_api_key else None),
max_tokens=max_tokens,
temperature=temperature,
endpoint=mistral_api_base,
)
except Exception as e:
raise ValueError("Could not connect to Mistral API.") from e
return output

View file

@ -1,6 +1,6 @@
from typing import Optional
from langchain_community.chat_models.openai import ChatOpenAI
from langchain_openai import ChatOpenAI
from langflow.base.models.openai_constants import MODEL_NAMES
from langflow.field_typing import BaseLanguageModel, NestedDict
@ -52,7 +52,7 @@ class ChatOpenAIComponent(CustomComponent):
self,
max_tokens: Optional[int] = 256,
model_kwargs: NestedDict = {},
model_name: str = "gpt-4-1106-preview",
model_name: str = "gpt-4o",
openai_api_base: Optional[str] = None,
openai_api_key: Optional[str] = None,
temperature: float = 0.7,

View file

@ -1,7 +1,7 @@
from typing import Optional
from langflow.field_typing import BaseLanguageModel
from langchain.llms.huggingface_endpoint import HuggingFaceEndpoint
from langchain_community.llms.huggingface_endpoint import HuggingFaceEndpoint
from langflow.interface.custom.custom_component import CustomComponent

View file

@ -0,0 +1,141 @@
from typing import Optional
from langchain_mistralai import ChatMistralAI
from pydantic.v1 import SecretStr
from langflow.base.constants import STREAM_INFO_TEXT
from langflow.base.models.model import LCModelComponent
from langflow.field_typing import Text
class MistralAIModelComponent(LCModelComponent):
display_name = "MistralAI"
description = "Generates text using MistralAI LLMs."
icon = "MistralAI"
field_order = [
"max_tokens",
"model_kwargs",
"model_name",
"mistral_api_base",
"mistral_api_key",
"temperature",
"input_value",
"system_message",
"stream",
]
def build_config(self):
return {
"input_value": {"display_name": "Input"},
"max_tokens": {
"display_name": "Max Tokens",
"advanced": True,
},
"model_name": {
"display_name": "Model Name",
"advanced": False,
"options": [
"open-mistral-7b",
"open-mixtral-8x7b",
"open-mixtral-8x22b",
"mistral-small-latest",
"mistral-medium-latest",
"mistral-large-latest",
],
"value": "open-mistral-7b",
},
"mistral_api_base": {
"display_name": "Mistral API Base",
"advanced": True,
"info": (
"The base URL of the Mistral API. Defaults to https://api.mistral.ai.\n\n"
"You can change this to use other APIs like JinaChat, LocalAI and Prem."
),
},
"mistral_api_key": {
"display_name": "Mistral API Key",
"info": "The Mistral API Key to use for the Mistral model.",
"advanced": False,
"password": True,
},
"temperature": {
"display_name": "Temperature",
"advanced": False,
"value": 0.1,
},
"stream": {
"display_name": "Stream",
"info": STREAM_INFO_TEXT,
"advanced": True,
},
"system_message": {
"display_name": "System Message",
"info": "System message to pass to the model.",
"advanced": True,
},
"max_retries": {
"display_name": "Max Retries",
"advanced": True,
},
"timeout": {
"display_name": "Timeout",
"advanced": True,
},
"max_concurrent_requests": {
"display_name": "Max Concurrent Requests",
"advanced": True,
},
"top_p": {
"display_name": "Top P",
"advanced": True,
},
"random_seed": {
"display_name": "Random Seed",
"advanced": True,
},
"safe_mode": {
"display_name": "Safe Mode",
"advanced": True,
},
}
def build(
self,
input_value: Text,
mistral_api_key: str,
model_name: str,
temperature: float = 0.1,
max_tokens: Optional[int] = 256,
mistral_api_base: Optional[str] = None,
stream: bool = False,
system_message: Optional[str] = None,
max_retries: int = 5,
timeout: int = 120,
max_concurrent_requests: int = 64,
top_p: float = 1,
random_seed: Optional[int] = None,
safe_mode: bool = False,
) -> Text:
if not mistral_api_base:
mistral_api_base = "https://api.mistral.ai"
if mistral_api_key:
api_key = SecretStr(mistral_api_key)
else:
api_key = None
chat_model = ChatMistralAI(
max_tokens=max_tokens,
model_name=model_name,
endpoint=mistral_api_base,
api_key=api_key,
temperature=temperature,
max_retries=max_retries,
timeout=timeout,
max_concurrent_requests=max_concurrent_requests,
top_p=top_p,
random_seed=random_seed,
safe_mode=safe_mode,
)
return self.get_chat_result(chat_model, stream, input_value, system_message)

View file

@ -41,7 +41,6 @@ class OpenAIModelComponent(LCModelComponent):
"display_name": "Model Name",
"advanced": False,
"options": MODEL_NAMES,
"value": "gpt-4-turbo-preview",
},
"openai_api_base": {
"display_name": "OpenAI API Base",
@ -79,7 +78,7 @@ class OpenAIModelComponent(LCModelComponent):
input_value: Text,
openai_api_key: str,
temperature: float,
model_name: str,
model_name: str = "gpt-4o",
max_tokens: Optional[int] = 256,
model_kwargs: NestedDict = {},
openai_api_base: Optional[str] = None,

View file

@ -0,0 +1,10 @@
from langflow.base.io.text import TextComponent
from langflow.schema import Record
class RecordsOutput(TextComponent):
display_name = "Records Output"
description = "Display Records as a Table"
def build(self, input_value: Record) -> Record:
return input_value

View file

@ -224,11 +224,6 @@ wrappers:
documentation: ""
SQLDatabase:
documentation: ""
output_parsers:
StructuredOutputParser:
documentation: "https://python.langchain.com/docs/modules/model_io/output_parsers/structured"
ResponseSchema:
documentation: "https://python.langchain.com/docs/modules/model_io/output_parsers/structured"
custom_components:
CustomComponent:
documentation: "https://docs.langflow.org/guidelines/custom-component"

View file

@ -3,9 +3,8 @@ from typing import TYPE_CHECKING, Any, List, Optional
from loguru import logger
from pydantic import BaseModel, Field
from langflow.graph.edge.utils import build_clean_params
from langflow.graph.vertex.utils import log_transaction
from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.services.deps import get_monitor_service
from langflow.services.monitor.utils import log_message
if TYPE_CHECKING:
@ -157,26 +156,9 @@ class ContractEdge(Edge):
message=target.params.get(INPUT_FIELD_NAME, {}),
session_id=target.params.get("session_id", ""),
artifacts=target.artifacts,
flow_id=target.graph.flow_id,
)
return self.result
def __repr__(self) -> str:
return f"{self.source_id} -[{self.target_param}]-> {self.target_id}"
def log_transaction(edge: ContractEdge, source: "Vertex", target: "Vertex", status, error=None):
try:
monitor_service = get_monitor_service()
clean_params = build_clean_params(target)
data = {
"source": source.vertex_type,
"target": target.vertex_type,
"target_args": clean_params,
"timestamp": monitor_service.get_timestamp(),
"status": status,
"error": error,
}
monitor_service.add_row(table_name="transactions", data=data)
except Exception as e:
logger.error(f"Error logging transaction: {e}")
logger.error(f"Error logging transaction: {e}")

View file

@ -1,19 +0,0 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
def build_clean_params(target: "Vertex") -> dict:
"""
Cleans the parameters of the target vertex.
"""
# Removes all keys that the values aren't python types like str, int, bool, etc.
params = {
key: value for key, value in target.params.items() if isinstance(value, (str, int, bool, float, list, dict))
}
# if it is a list we need to check if the contents are python types
for key, value in params.items():
if isinstance(value, list):
params[key] = [item for item in value if isinstance(item, (str, int, bool, float, list, dict))]
return params

View file

@ -14,7 +14,7 @@ from langflow.graph.graph.state_manager import GraphStateManager
from langflow.graph.graph.utils import process_flow
from langflow.graph.schema import InterfaceComponentTypes, RunOutputs
from langflow.graph.vertex.base import Vertex
from langflow.graph.vertex.types import ChatVertex, FileToolVertex, LLMVertex, StateVertex, ToolkitVertex
from langflow.graph.vertex.types import FileToolVertex, InterfaceVertex, LLMVertex, StateVertex, ToolkitVertex
from langflow.interface.tools.constants import FILE_TOOLS
from langflow.schema import Record
from langflow.schema.schema import INPUT_FIELD_NAME, InputType
@ -242,6 +242,7 @@ class Graph:
outputs: list[str],
stream: bool,
session_id: str,
fallback_to_env_vars: bool,
) -> List[Optional["ResultData"]]:
"""
Runs the graph with the given inputs.
@ -289,7 +290,7 @@ class Graph:
start_component_id = next(
(vertex_id for vertex_id in self._is_input_vertices if "chat" in vertex_id.lower()), None
)
await self.process(start_component_id=start_component_id)
await self.process(start_component_id=start_component_id, fallback_to_env_vars=fallback_to_env_vars)
self.increment_run_count()
except Exception as exc:
logger.exception(exc)
@ -315,6 +316,7 @@ class Graph:
outputs: Optional[list[str]] = None,
session_id: Optional[str] = None,
stream: bool = False,
fallback_to_env_vars: bool = False,
) -> List[RunOutputs]:
"""
Run the graph with the given inputs and return the outputs.
@ -340,6 +342,7 @@ class Graph:
outputs=outputs,
session_id=session_id,
stream=stream,
fallback_to_env_vars=fallback_to_env_vars,
)
try:
@ -362,6 +365,7 @@ class Graph:
outputs: Optional[list[str]] = None,
session_id: Optional[str] = None,
stream: bool = False,
fallback_to_env_vars: bool = False,
) -> List[RunOutputs]:
"""
Runs the graph with the given inputs.
@ -403,6 +407,7 @@ class Graph:
outputs=outputs or [],
stream=stream,
session_id=session_id or "",
fallback_to_env_vars=fallback_to_env_vars,
)
run_output_object = RunOutputs(inputs=run_inputs, outputs=run_outputs)
logger.debug(f"Run outputs: {run_output_object}")
@ -468,9 +473,9 @@ class Graph:
"""Marks a branch of the graph."""
if visited is None:
visited = set()
visited.add(vertex_id)
if vertex_id in visited:
return
visited.add(vertex_id)
self.mark_vertex(vertex_id, state)
@ -712,6 +717,7 @@ class Graph:
vertex_id: str,
inputs_dict: Optional[Dict[str, str]] = None,
user_id: Optional[str] = None,
fallback_to_env_vars: bool = False,
):
"""
Builds a vertex in the graph.
@ -733,7 +739,7 @@ class Graph:
vertex = self.get_vertex(vertex_id)
try:
if not vertex.frozen or not vertex._built:
await vertex.build(user_id=user_id, inputs=inputs_dict)
await vertex.build(user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars)
if vertex.result is not None:
params = vertex._built_object_repr()
@ -796,7 +802,7 @@ class Graph:
vertices.append(vertex)
return vertices
async def process(self, start_component_id: Optional[str] = None) -> "Graph":
async def process(self, fallback_to_env_vars: bool, start_component_id: Optional[str] = None) -> "Graph":
"""Processes the graph with vertices in each layer run in parallel."""
first_layer = self.sort_vertices(start_component_id=start_component_id)
@ -821,6 +827,7 @@ class Graph:
vertex_id=vertex_id,
user_id=self.user_id,
inputs_dict={},
fallback_to_env_vars=fallback_to_env_vars,
),
name=f"{vertex.display_name} Run {vertex_task_run_count.get(vertex_id, 0)}",
)
@ -987,8 +994,8 @@ class Graph:
"""Returns the node class based on the node type."""
# First we check for the node_base_type
node_name = node_id.split("-")[0]
if node_name in ["ChatOutput", "ChatInput"]:
return ChatVertex
if node_name in InterfaceComponentTypes:
return InterfaceVertex
elif node_name in ["SharedState", "Notify", "Listen"]:
return StateVertex
elif node_base_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP:

View file

@ -1,3 +1,4 @@
from langflow.graph.schema import CHAT_COMPONENTS
from langflow.graph.vertex import types
from langflow.interface.agents.base import agent_creator
from langflow.interface.custom.base import custom_component_creator
@ -5,7 +6,6 @@ from langflow.interface.document_loaders.base import documentloader_creator
from langflow.interface.embeddings.base import embedding_creator
from langflow.interface.llms.base import llm_creator
from langflow.interface.memories.base import memory_creator
from langflow.interface.output_parsers.base import output_parser_creator
from langflow.interface.prompts.base import prompt_creator
from langflow.interface.retrievers.base import retriever_creator
from langflow.interface.text_splitters.base import textsplitter_creator
@ -14,8 +14,6 @@ from langflow.interface.tools.base import tool_creator
from langflow.interface.wrappers.base import wrapper_creator
from langflow.utils.lazy_load import LazyLoadDictBase
CHAT_COMPONENTS = ["ChatInput", "ChatOutput", "TextInput", "SessionID"]
class VertexTypesDict(LazyLoadDictBase):
def __init__(self):
@ -46,10 +44,9 @@ class VertexTypesDict(LazyLoadDictBase):
# **{t: types.VectorStoreVertex for t in vectorstore_creator.to_list()},
**{t: types.DocumentLoaderVertex for t in documentloader_creator.to_list()},
**{t: types.TextSplitterVertex for t in textsplitter_creator.to_list()},
**{t: types.OutputParserVertex for t in output_parser_creator.to_list()},
**{t: types.CustomComponentVertex for t in custom_component_creator.to_list()},
**{t: types.RetrieverVertex for t in retriever_creator.to_list()},
**{t: types.ChatVertex for t in CHAT_COMPONENTS},
**{t: types.InterfaceVertex for t in CHAT_COMPONENTS},
}
def get_custom_component_vertex_type(self):

View file

@ -30,6 +30,7 @@ class InterfaceComponentTypes(str, Enum, metaclass=ContainsEnumMeta):
ChatOutput = "ChatOutput"
TextInput = "TextInput"
TextOutput = "TextOutput"
RecordsOutput = "RecordsOutput"
def __contains__(cls, item):
try:
@ -40,6 +41,8 @@ class InterfaceComponentTypes(str, Enum, metaclass=ContainsEnumMeta):
return True
CHAT_COMPONENTS = [InterfaceComponentTypes.ChatInput, InterfaceComponentTypes.ChatOutput]
RECORDS_COMPONENTS = [InterfaceComponentTypes.RecordsOutput]
INPUT_COMPONENTS = [
InterfaceComponentTypes.ChatInput,
InterfaceComponentTypes.TextInput,

View file

@ -10,7 +10,7 @@ from loguru import logger
from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, UnbuiltResult
from langflow.graph.vertex.utils import generate_result
from langflow.graph.vertex.utils import generate_result, log_transaction
from langflow.interface.initialize import loading
from langflow.interface.listing import lazy_load_dict
from langflow.schema.schema import INPUT_FIELD_NAME
@ -315,7 +315,11 @@ class Vertex:
params[field_name] = full_path
elif field.get("required"):
field_display_name = field.get("display_name")
raise ValueError(f"File path not found for {field_display_name} in component {self.display_name}")
logger.warning(
f"File path not found for {field_display_name} in component {self.display_name}. Setting to None."
)
params[field_name] = None
elif field.get("type") in DIRECT_TYPES and params.get(field_name) is None:
val = field.get("value")
if field.get("type") == "code":
@ -390,13 +394,17 @@ class Vertex:
self.params = self._raw_params.copy()
self.updated_raw_params = True
async def _build(self, user_id=None):
async def _build(
self,
fallback_to_env_vars,
user_id=None,
):
"""
Initiate the build process.
"""
logger.debug(f"Building {self.display_name}")
await self._build_each_vertex_in_params_dict(user_id)
await self._get_and_instantiate_class(user_id)
await self._get_and_instantiate_class(user_id, fallback_to_env_vars)
self._validate_built_object()
self._built = True
@ -432,7 +440,11 @@ class Vertex:
# to the frontend
self.set_artifacts()
artifacts = self.artifacts
messages = self.extract_messages_from_artifacts(artifacts)
if isinstance(artifacts, dict):
messages = self.extract_messages_from_artifacts(artifacts)
else:
messages = []
result_dict = ResultData(
results=result_dict,
artifacts=artifacts,
@ -500,7 +512,7 @@ class Vertex:
if not self._is_vertex(value):
self.params[key][sub_key] = value
else:
result = await value.get_result()
result = await value.get_result(self)
self.params[key][sub_key] = result
def _is_vertex(self, value):
@ -515,9 +527,7 @@ class Vertex:
"""
return all(self._is_vertex(vertex) for vertex in value)
async def get_result(
self,
) -> Any:
async def get_result(self, requester: "Vertex") -> Any:
"""
Retrieves the result of the vertex.
@ -527,9 +537,9 @@ class Vertex:
The result of the vertex.
"""
async with self._lock:
return await self._get_result()
return await self._get_result(requester)
async def _get_result(self) -> Any:
async def _get_result(self, requester: "Vertex") -> Any:
"""
Retrieves the result of the built component.
@ -539,15 +549,19 @@ class Vertex:
The built result if use_result is True, else the built object.
"""
if not self._built:
log_transaction(source=self, target=requester, flow_id=self.graph.flow_id, status="error")
raise ValueError(f"Component {self.display_name} has not been built yet")
return self._built_result if self.use_result else self._built_object
result = self._built_result if self.use_result else self._built_object
log_transaction(source=self, target=requester, flow_id=self.graph.flow_id, status="success")
return result
async def _build_vertex_and_update_params(self, key, vertex: "Vertex"):
"""
Builds a given vertex and updates the params dictionary accordingly.
"""
result = await vertex.get_result()
result = await vertex.get_result(self)
self._handle_func(key, result)
if isinstance(result, list):
self._extend_params_list_with_result(key, result)
@ -563,7 +577,7 @@ class Vertex:
"""
self.params[key] = []
for vertex in vertices:
result = await vertex.get_result()
result = await vertex.get_result(self)
# Weird check to see if the params[key] is a list
# because sometimes it is a Record and breaks the code
if not isinstance(self.params[key], list):
@ -606,7 +620,7 @@ class Vertex:
if isinstance(self.params[key], list):
self.params[key].extend(result)
async def _get_and_instantiate_class(self, user_id=None):
async def _get_and_instantiate_class(self, user_id=None, fallback_to_env_vars=False):
"""
Gets the class from a dictionary and instantiates it with the params.
"""
@ -615,6 +629,7 @@ class Vertex:
try:
result = await loading.instantiate_class(
user_id=user_id,
fallback_to_env_vars=fallback_to_env_vars,
vertex=self,
)
self._update_built_object_and_artifacts(result)

View file

@ -6,14 +6,14 @@ import yaml
from langchain_core.messages import AIMessage
from loguru import logger
from langflow.graph.schema import InterfaceComponentTypes
from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes
from langflow.graph.utils import UnbuiltObject, flatten_list, serialize_field
from langflow.graph.vertex.base import Vertex
from langflow.interface.utils import extract_input_variables_from_prompt
from langflow.schema import Record
from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.services.monitor.utils import log_vertex_build
from langflow.utils.schemas import ChatOutputResponse
from langflow.utils.schemas import ChatOutputResponse, RecordOutputResponse
from langflow.utils.util import unescape_string
@ -300,11 +300,6 @@ class PromptVertex(Vertex):
return str(self._built_object)
class OutputParserVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="output_parsers")
class CustomComponentVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="custom_components")
@ -314,7 +309,7 @@ class CustomComponentVertex(Vertex):
return self.artifacts["repr"] or super()._built_object_repr()
class ChatVertex(Vertex):
class InterfaceVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="custom_components", is_task=True)
self.steps = [self._build, self._run]
@ -330,56 +325,131 @@ class ChatVertex(Vertex):
return f"Task {self.task_id} is not running"
if self.artifacts:
# dump as a yaml string
artifacts = {k.title().replace("_", " "): v for k, v in self.artifacts.items() if v is not None}
if isinstance(self.artifacts, dict):
_artifacts = [self.artifacts]
elif hasattr(self.artifacts, "records"):
_artifacts = self.artifacts.records
else:
_artifacts = self.artifacts
artifacts = []
for artifact in _artifacts:
# artifacts = {k.title().replace("_", " "): v for k, v in self.artifacts.items() if v is not None}
artifact = {k.title().replace("_", " "): v for k, v in artifact.items() if v is not None}
artifacts.append(artifact)
yaml_str = yaml.dump(artifacts, default_flow_style=False, allow_unicode=True)
return yaml_str
return super()._built_object_repr()
def _process_chat_component(self):
"""
Process the chat component and return the message.
This method processes the chat component by extracting the necessary parameters
such as sender, sender_name, and message from the `params` dictionary. It then
performs additional operations based on the type of the `_built_object` attribute.
If `_built_object` is an instance of `AIMessage`, it creates a `ChatOutputResponse`
object using the `from_message` method. If `_built_object` is not an instance of
`UnbuiltObject`, it checks the type of `_built_object` and performs specific
operations accordingly. If `_built_object` is a dictionary, it converts it into a
code block. If `_built_object` is an instance of `Record`, it assigns the `text`
attribute to the `message` variable. If `message` is an instance of `AsyncIterator`
or `Iterator`, it builds a stream URL and sets `message` to an empty string. If
`_built_object` is not a string, it converts it to a string. If `message` is a
generator or iterator, it assigns it to the `message` variable. Finally, it creates
a `ChatOutputResponse` object using the extracted parameters and assigns it to the
`artifacts` attribute. If `artifacts` is not None, it calls the `model_dump` method
on it and assigns the result to the `artifacts` attribute. It then returns the
`message` variable.
Returns:
str: The processed message.
"""
artifacts = None
sender = self.params.get("sender", None)
sender_name = self.params.get("sender_name", None)
message = self.params.get(INPUT_FIELD_NAME, None)
if isinstance(message, str):
message = unescape_string(message)
stream_url = None
if isinstance(self._built_object, AIMessage):
artifacts = ChatOutputResponse.from_message(
self._built_object,
sender=sender,
sender_name=sender_name,
)
elif not isinstance(self._built_object, UnbuiltObject):
if isinstance(self._built_object, dict):
# Turn the dict into a pleasing to
# read JSON inside a code block
message = dict_to_codeblock(self._built_object)
elif isinstance(self._built_object, Record):
message = self._built_object.text
elif isinstance(message, (AsyncIterator, Iterator)):
stream_url = self.build_stream_url()
message = ""
elif not isinstance(self._built_object, str):
message = str(self._built_object)
# if the message is a generator or iterator
# it means that it is a stream of messages
else:
message = self._built_object
artifacts = ChatOutputResponse(
message=message,
sender=sender,
sender_name=sender_name,
stream_url=stream_url,
)
self.will_stream = stream_url is not None
if artifacts:
self.artifacts = artifacts.model_dump(exclude_none=True)
return message
def _process_record_component(self):
"""
Process the record component of the vertex.
If the built object is an instance of `Record`, it calls the `model_dump` method
and assigns the result to the `artifacts` attribute.
If the built object is a list, it iterates over each element and checks if it is
an instance of `Record`. If it is, it calls the `model_dump` method and appends
the result to the `artifacts` list. If it is not, it raises a `ValueError` if the
`ignore_errors` parameter is set to `False`, or logs an error message if it is set
to `True`.
Returns:
The built object.
Raises:
ValueError: If an element in the list is not an instance of `Record` and
`ignore_errors` is set to `False`.
"""
if isinstance(self._built_object, Record):
artifacts = [self._built_object.data]
elif isinstance(self._built_object, list):
artifacts = []
ignore_errors = self.params.get("ignore_errors", False)
for record in self._built_object:
if isinstance(record, Record):
artifacts.append(record.data)
elif ignore_errors:
logger.error(f"Record expected, but got {record} of type {type(record)}")
else:
raise ValueError(f"Record expected, but got {record} of type {type(record)}")
self.artifacts = RecordOutputResponse(records=artifacts)
return self._built_object
async def _run(self, *args, **kwargs):
if self.is_interface_component:
if self.vertex_type in ["ChatOutput", "ChatInput"]:
artifacts = None
sender = self.params.get("sender", None)
sender_name = self.params.get("sender_name", None)
message = self.params.get(INPUT_FIELD_NAME, None)
if isinstance(message, str):
message = unescape_string(message)
stream_url = None
if isinstance(self._built_object, AIMessage):
artifacts = ChatOutputResponse.from_message(
self._built_object,
sender=sender,
sender_name=sender_name,
)
elif not isinstance(self._built_object, UnbuiltObject):
if isinstance(self._built_object, dict):
# Turn the dict into a pleasing to
# read JSON inside a code block
message = dict_to_codeblock(self._built_object)
elif isinstance(self._built_object, Record):
message = self._built_object.text
elif isinstance(message, (AsyncIterator, Iterator)):
stream_url = self.build_stream_url()
message = ""
elif not isinstance(self._built_object, str):
message = str(self._built_object)
# if the message is a generator or iterator
# it means that it is a stream of messages
else:
message = self._built_object
artifacts = ChatOutputResponse(
message=message,
sender=sender,
sender_name=sender_name,
stream_url=stream_url,
)
self.will_stream = stream_url is not None
if artifacts:
self.artifacts = artifacts.model_dump(exclude_none=True)
if self.vertex_type in CHAT_COMPONENTS:
message = self._process_chat_component()
elif self.vertex_type in RECORDS_COMPONENTS:
message = self._process_record_component()
if isinstance(self._built_object, (AsyncIterator, Iterator)):
if self.params["return_record"]:
if self.params.get("return_record", False):
self._built_object = Record(text=message, data=self.artifacts)
else:
self._built_object = message

View file

@ -1,11 +1,15 @@
from typing import Any, Optional, Union
from typing import Any, Optional, Union, TYPE_CHECKING
from langchain_core.messages import BaseMessage
from langchain_core.runnables import Runnable
from loguru import logger
from langflow.services.deps import get_monitor_service
from langflow.utils.constants import PYTHON_BASIC_TYPES
if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
def is_basic_type(obj):
return type(obj) in PYTHON_BASIC_TYPES
@ -63,3 +67,49 @@ async def generate_result(built_object: Any, inputs: dict, has_external_output:
else:
result = built_object
return result
def build_clean_params(target: "Vertex") -> dict:
"""
Cleans the parameters of the target vertex.
"""
# Removes all keys that the values aren't python types like str, int, bool, etc.
params = {
key: value for key, value in target.params.items() if isinstance(value, (str, int, bool, float, list, dict))
}
# if it is a list we need to check if the contents are python types
for key, value in params.items():
if isinstance(value, list):
params[key] = [item for item in value if isinstance(item, (str, int, bool, float, list, dict))]
return params
def log_transaction(source: "Vertex", target: "Vertex", flow_id, status, error=None):
"""
Logs a transaction between two vertices.
Args:
source (Vertex): The source vertex of the transaction.
target (Vertex): The target vertex of the transaction.
status: The status of the transaction.
error (Optional): Any error associated with the transaction.
Raises:
Exception: If there is an error while logging the transaction.
"""
try:
monitor_service = get_monitor_service()
clean_params = build_clean_params(target)
data = {
"source": source.vertex_type,
"target": target.vertex_type,
"target_args": clean_params,
"timestamp": monitor_service.get_timestamp(),
"status": status,
"error": error,
"flow_id": flow_id,
}
monitor_service.add_row(table_name="transactions", data=data)
except Exception as e:
logger.error(f"Error logging transaction: {e}")

View file

@ -11,10 +11,11 @@ from sqlmodel import select
from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES
from langflow.interface.types import get_all_components
from langflow.services.database.models.flow.model import Flow, FlowCreate
from langflow.services.database.models.folder.model import Folder, FolderCreate
from langflow.services.deps import get_settings_service, session_scope
STARTER_FOLDER_NAME = "Starter Projects"
STARTER_FOLDER_DESCRIPTION = "Starter projects to help you get started in Langflow."
# In the folder ./starter_projects we have a few JSON files that represent
# starter projects. We want to load these into the database so that users
@ -158,6 +159,7 @@ def create_new_project(
project_data,
project_icon,
project_icon_bg_color,
new_folder_id
):
logger.debug(f"Creating starter project {project_name}")
new_project = FlowCreate(
@ -168,33 +170,41 @@ def create_new_project(
data=project_data,
is_component=project_is_component,
updated_at=updated_at_datetime,
folder=STARTER_FOLDER_NAME,
folder_id=new_folder_id,
)
db_flow = Flow.model_validate(new_project, from_attributes=True)
session.add(db_flow)
def get_all_flows_similar_to_project(session, project_name):
flows = session.exec(
select(Flow).where(
Flow.name == project_name,
Flow.folder == STARTER_FOLDER_NAME,
)
).all()
def get_all_flows_similar_to_project(session, folder_id):
flows = session.exec(select(Folder).where(Folder.id == folder_id)).first().flows
return flows
def delete_start_projects(session):
flows = session.exec(
select(Flow).where(
Flow.folder == STARTER_FOLDER_NAME,
)
).all()
def delete_start_projects(session, folder_id):
flows = session.exec(select(Folder).where(Folder.id == folder_id)).first().flows
for flow in flows:
session.delete(flow)
session.commit()
def folder_exists(session, folder_name):
folder = session.exec(select(Folder).where(Folder.name == folder_name)).first()
return folder is not None
def create_starter_folder(session):
if not folder_exists(session, STARTER_FOLDER_NAME):
new_folder = FolderCreate(name=STARTER_FOLDER_NAME, description=STARTER_FOLDER_DESCRIPTION)
db_folder = Folder.model_validate(new_folder, from_attributes=True)
session.add(db_folder)
session.commit()
session.refresh(db_folder)
return db_folder
else:
return session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first()
def create_or_update_starter_projects():
components_paths = get_settings_service().settings.COMPONENTS_PATH
try:
@ -203,8 +213,9 @@ def create_or_update_starter_projects():
logger.exception(f"Error loading components: {e}")
raise e
with session_scope() as session:
new_folder = create_starter_folder(session)
starter_projects = load_starter_projects()
delete_start_projects(session)
delete_start_projects(session, new_folder.id)
for project_path, project in starter_projects:
(
project_name,
@ -224,7 +235,7 @@ def create_or_update_starter_projects():
update_project_file(project_path, project, updated_project_data)
if project_name and project_data:
for existing_project in get_all_flows_similar_to_project(session, project_name):
for existing_project in get_all_flows_similar_to_project(session, new_folder.id):
session.delete(existing_project)
create_new_project(
@ -236,4 +247,5 @@ def create_or_update_starter_projects():
project_data,
project_icon,
project_icon_bg_color,
new_folder.id
)

File diff suppressed because one or more lines are too long

View file

@ -7,7 +7,7 @@ from langchain.agents.agent_toolkits.vectorstore.prompt import ROUTER_PREFIX as
from langchain.agents.mrkl.prompt import FORMAT_INSTRUCTIONS
from langchain.base_language import BaseLanguageModel
from langchain.chains.llm import LLMChain
from langchain.sql_database import SQLDatabase
from langchain_community.utilities import SQLDatabase
from langchain.tools.sql_database.prompt import QUERY_CHECKER
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_community.agent_toolkits.json.prompt import JSON_PREFIX, JSON_SUFFIX

View file

@ -1,7 +1,7 @@
import inspect
from typing import Any
from langchain import llms, memory, requests, text_splitter
from langchain import llms, memory, text_splitter
from langchain_community import agent_toolkits, document_loaders, embeddings
from langchain_community.chat_models import AzureChatOpenAI, ChatAnthropic, ChatOpenAI, ChatVertexAI
@ -43,8 +43,6 @@ memory_type_to_cls_dict: dict[str, Any] = {
memory_name: import_class(f"langchain.memory.{memory_name}") for memory_name in memory.__all__
}
# Wrappers
wrapper_type_to_cls_dict: dict[str, Any] = {wrapper.__name__: wrapper for wrapper in [requests.RequestsWrapper]}
# Embeddings
embedding_type_to_cls_dict: dict[str, Any] = {

View file

@ -45,7 +45,6 @@ def import_by_type(_type: str, name: str) -> Any:
"documentloaders": import_documentloader,
"textsplitters": import_textsplitter,
"utilities": import_utility,
"output_parsers": import_output_parser,
"retrievers": import_retriever,
}
if _type == "models":
@ -57,11 +56,6 @@ def import_by_type(_type: str, name: str) -> Any:
return loaded_func(name)
def import_output_parser(output_parser: str) -> Any:
"""Import output parser from output parser name"""
return import_module(f"from langchain.output_parsers import {output_parser}")
def import_chat_llm(llm: str) -> BaseChatModel:
"""Import chat llm from llm name"""
return import_class(f"langchain_community.chat_models.{llm}")

View file

@ -1,8 +1,8 @@
import inspect
import json
import os
from typing import TYPE_CHECKING, Any, Callable, Dict, Sequence, Type
import orjson
from langchain.agents import agent as agent_module
from langchain.agents.agent import AgentExecutor
@ -20,7 +20,6 @@ from langflow.interface.importing.utils import import_by_type
from langflow.interface.initialize.llm import initialize_vertexai
from langflow.interface.initialize.utils import handle_format_kwargs, handle_node_type, handle_partial_variables
from langflow.interface.initialize.vector_store import vecstore_initializer
from langflow.interface.output_parsers.base import output_parser_creator
from langflow.interface.retrievers.base import retriever_creator
from langflow.interface.toolkits.base import toolkits_creator
from langflow.interface.utils import load_file_into_dict
@ -36,6 +35,7 @@ if TYPE_CHECKING:
async def instantiate_class(
vertex: "Vertex",
fallback_to_env_vars,
user_id=None,
) -> Any:
"""Instantiate class from module type and key, and params"""
@ -58,7 +58,7 @@ async def instantiate_class(
if not base_type:
raise ValueError("No base type provided for vertex")
if base_type == "custom_components":
return await instantiate_custom_component(params, user_id, vertex)
return await instantiate_custom_component(params, user_id, vertex, fallback_to_env_vars=fallback_to_env_vars)
class_object = import_by_type(_type=base_type, name=vertex_type)
return await instantiate_based_on_type(
class_object=class_object,
@ -67,6 +67,7 @@ async def instantiate_class(
params=params,
user_id=user_id,
vertex=vertex,
fallback_to_env_vars=fallback_to_env_vars,
)
@ -94,14 +95,7 @@ def convert_kwargs(params):
return params
async def instantiate_based_on_type(
class_object,
base_type,
node_type,
params,
user_id,
vertex,
):
async def instantiate_based_on_type(class_object, base_type, node_type, params, user_id, vertex, fallback_to_env_vars):
if base_type == "agents":
return instantiate_agent(node_type, class_object, params)
elif base_type == "prompts":
@ -126,8 +120,6 @@ async def instantiate_based_on_type(
return instantiate_utility(node_type, class_object, params)
elif base_type == "chains":
return instantiate_chains(node_type, class_object, params)
elif base_type == "output_parsers":
return instantiate_output_parser(node_type, class_object, params)
elif base_type == "models":
return instantiate_llm(node_type, class_object, params)
elif base_type == "retrievers":
@ -135,33 +127,49 @@ async def instantiate_based_on_type(
elif base_type == "memory":
return instantiate_memory(node_type, class_object, params)
elif base_type == "custom_components":
return await instantiate_custom_component(
params,
user_id,
vertex,
)
return await instantiate_custom_component(params, user_id, vertex, fallback_to_env_vars=fallback_to_env_vars)
elif base_type == "wrappers":
return instantiate_wrapper(node_type, class_object, params)
else:
return class_object(**params)
def update_params_with_load_from_db_fields(custom_component: "CustomComponent", params, load_from_db_fields):
def update_params_with_load_from_db_fields(
custom_component: "CustomComponent", params, load_from_db_fields, fallback_to_env_vars=False
):
# For each field in load_from_db_fields, we will check if it's in the params
# and if it is, we will get the value from the custom_component.keys(name)
# and update the params with the value
for field in load_from_db_fields:
if field in params:
try:
key = custom_component.variables(params[field])
params[field] = key if key else params[field]
key = None
try:
key = custom_component.variables(params[field])
except ValueError as e:
# check if "User id is not set" is in the error message
if "User id is not set" in str(e) and not fallback_to_env_vars:
raise e
logger.debug(str(e))
if fallback_to_env_vars and key is None:
var = os.getenv(params[field])
if var is None:
raise ValueError(f"Environment variable {params[field]} is not set.")
key = var
logger.info(f"Using environment variable {params[field]} for {field}")
if key is None:
logger.warning(f"Could not get value for {field}. Setting it to None.")
params[field] = key
except Exception as exc:
logger.error(f"Failed to get value for {field} from custom component. Error: {exc}")
pass
logger.error(f"Failed to get value for {field} from custom component. Setting it to None. Error: {exc}")
params[field] = None
return params
async def instantiate_custom_component(params, user_id, vertex):
async def instantiate_custom_component(params, user_id, vertex, fallback_to_env_vars: bool = False):
params_copy = params.copy()
class_object: Type["CustomComponent"] = eval_custom_component_code(params_copy.pop("code"))
custom_component: "CustomComponent" = class_object(
@ -170,7 +178,9 @@ async def instantiate_custom_component(params, user_id, vertex):
vertex=vertex,
selected_output_type=vertex.selected_output_type,
)
params_copy = update_params_with_load_from_db_fields(custom_component, params_copy, vertex.load_from_db_fields)
params_copy = update_params_with_load_from_db_fields(
custom_component, params_copy, vertex.load_from_db_fields, fallback_to_env_vars
)
if "retriever" in params_copy and hasattr(params_copy["retriever"], "as_retriever"):
params_copy["retriever"] = params_copy["retriever"].as_retriever()
@ -201,15 +211,6 @@ def instantiate_wrapper(node_type, class_object, params):
return class_object(**params)
def instantiate_output_parser(node_type, class_object, params):
if node_type in output_parser_creator.from_method_nodes:
method = output_parser_creator.from_method_nodes[node_type]
if class_method := getattr(class_object, method, None):
return class_method(**params)
raise ValueError(f"Method {method} not found in {class_object}")
return class_object(**params)
def instantiate_llm(node_type, class_object, params: Dict):
# This is a workaround so JinaChat works until streaming is implemented
# if "openai_api_base" in params and "jina" in params["openai_api_base"]:
@ -514,15 +515,6 @@ def build_prompt_template(prompt, tools):
"show": False,
"multiline": False,
},
"output_parser": {
"type": "BaseOutputParser",
"required": False,
"placeholder": "",
"list": False,
"show": False,
"multline": False,
"value": None,
},
"template": {
"type": "str",
"required": True,

View file

@ -1,63 +0,0 @@
from typing import ClassVar, Dict, List, Optional, Type
from langchain import output_parsers
from langflow.interface.base import LangChainTypeCreator
from langflow.interface.importing.utils import import_class
from langflow.interface.utils import build_template_from_class
from langflow.services.deps import get_settings_service
from langflow.template.frontend_node.output_parsers import OutputParserFrontendNode
from langflow.utils.util import build_template_from_method
from loguru import logger
class OutputParserCreator(LangChainTypeCreator):
type_name: str = "output_parsers"
from_method_nodes: ClassVar[Dict] = {
"StructuredOutputParser": "from_response_schemas",
}
@property
def frontend_node_class(self) -> Type[OutputParserFrontendNode]:
return OutputParserFrontendNode
@property
def type_to_loader_dict(self) -> Dict:
if self.type_dict is None:
settings_service = get_settings_service()
self.type_dict = {
output_parser_name: import_class(f"langchain.output_parsers.{output_parser_name}")
# if output_parser_name is not lower case it is a class
for output_parser_name in output_parsers.__all__
}
self.type_dict = {
name: output_parser
for name, output_parser in self.type_dict.items()
if name in settings_service.settings.OUTPUT_PARSERS or settings_service.settings.DEV
}
return self.type_dict
def get_signature(self, name: str) -> Optional[Dict]:
try:
if name in self.from_method_nodes:
return build_template_from_method(
name,
type_to_cls_dict=self.type_to_loader_dict,
method_name=self.from_method_nodes[name],
)
else:
return build_template_from_class(
name,
type_to_cls_dict=self.type_to_loader_dict,
)
except ValueError as exc:
# raise ValueError("OutputParser not found") from exc
logger.error(f"OutputParser {name} not found: {exc}")
except AttributeError as exc:
logger.error(f"OutputParser {name} not loaded: {exc}")
return None
def to_list(self) -> List[str]:
return list(self.type_to_loader_dict.keys())
output_parser_creator = OutputParserCreator()

View file

@ -8,7 +8,6 @@ from langflow.interface.document_loaders.base import documentloader_creator
from langflow.interface.embeddings.base import embedding_creator
from langflow.interface.llms.base import llm_creator
from langflow.interface.memories.base import memory_creator
from langflow.interface.output_parsers.base import output_parser_creator
from langflow.interface.retrievers.base import retriever_creator
from langflow.interface.text_splitters.base import textsplitter_creator
from langflow.interface.toolkits.base import toolkits_creator
@ -48,7 +47,6 @@ def build_langchain_types_dict(): # sourcery skip: dict-assign-update-to-union
documentloader_creator,
textsplitter_creator,
# utility_creator,
output_parser_creator,
retriever_creator,
]

View file

@ -106,7 +106,7 @@ def set_langchain_cache(settings):
if cache_type := os.getenv("LANGFLOW_LANGCHAIN_CACHE"):
try:
cache_class = import_class(f"langchain.cache.{cache_type or settings.LANGCHAIN_CACHE}")
cache_class = import_class(f"langchain_community.cache.{cache_type or settings.LANGCHAIN_CACHE}")
logger.debug(f"Setting up LLM caching with {cache_class.__name__}")
set_llm_cache(cache_class())

View file

@ -1,5 +1,5 @@
import warnings
from typing import Optional, Union
from typing import List, Optional, Union
from loguru import logger
@ -60,7 +60,7 @@ def get_messages(
return records
def add_messages(records: Union[list[Record], Record]):
def add_messages(records: Union[list[Record], Record], flow_id: Optional[str] = None):
"""
Add a message to the monitor service.
"""
@ -76,7 +76,8 @@ def add_messages(records: Union[list[Record], Record]):
messages: list[MessageModel] = []
for record in records:
messages.append(MessageModel.from_record(record))
record.timestamp = monitor_service.get_timestamp()
messages.append(MessageModel.from_record(record, flow_id=flow_id))
for message in messages:
try:
@ -107,8 +108,24 @@ def store_message(
session_id: Optional[str] = None,
sender: Optional[str] = None,
sender_name: Optional[str] = None,
) -> list[Record]:
flow_id: Optional[str] = None,
) -> List[Record]:
"""
Stores a message in the memory.
Args:
message (Union[str, Record]): The message to be stored. It can be either a string or a Record object.
session_id (Optional[str]): The session ID associated with the message.
sender (Optional[str]): The sender ID associated with the message.
sender_name (Optional[str]): The name of the sender associated with the message.
flow_id (Optional[str]): The flow ID associated with the message. When running from the CustomComponent you can access this using `self.graph.flow_id`.
Returns:
List[Record]: A list of records containing the stored message.
Raises:
ValueError: If any of the required parameters (session_id, sender, sender_name) is not provided.
"""
if not message:
warnings.warn("No message provided.")
return []
@ -135,4 +152,4 @@ def store_message(
},
)
return add_messages([record])
return add_messages([record], flow_id=flow_id)

View file

@ -82,6 +82,7 @@ def run_flow_from_json(
env_file: Optional[str] = None,
cache: Optional[str] = None,
disable_logs: Optional[bool] = True,
fallback_to_env_vars: bool = False,
) -> List[RunOutputs]:
"""
Run a flow from a JSON file or dictionary.
@ -98,6 +99,7 @@ def run_flow_from_json(
env_file (Optional[str], optional): The environment file to load. Defaults to None.
cache (Optional[str], optional): The cache directory to use. Defaults to None.
disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True.
fallback_to_env_vars (bool, optional): Whether Global Variables should fallback to environment variables if not found. Defaults to False.
Returns:
List[RunOutputs]: A list of RunOutputs objects representing the results of running the flow.
@ -127,5 +129,6 @@ def run_flow_from_json(
input_type=input_type,
output_type=output_type,
output_component=output_component,
fallback_to_env_vars=fallback_to_env_vars,
)
return result

View file

@ -175,6 +175,7 @@ def run_graph(
input_value: str,
input_type: str,
output_type: str,
fallback_to_env_vars: bool = False,
output_component: Optional[str] = None,
) -> List[RunOutputs]:
"""
@ -218,6 +219,7 @@ def run_graph(
outputs or [],
stream=False,
session_id="",
fallback_to_env_vars=fallback_to_env_vars,
)
return run_outputs

View file

@ -1,10 +1,10 @@
import copy
import json
from typing import Literal, Optional, cast
from langchain_core.documents import Document
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from pydantic import BaseModel, model_validator
from langchain_core.messages import HumanMessage, AIMessage
class Record(BaseModel):
@ -163,23 +163,15 @@ class Record(BaseModel):
# Create a new Record object with a deep copy of the data dictionary
return Record(data=copy.deepcopy(self.data, memo), text_key=self.text_key, default_value=self.default_value)
def __str__(self) -> str:
"""
Returns a string representation of the Record, including text and data.
"""
# Assuming a method to dump model data as JSON string exists.
# If it doesn't, you might need to implement it or use json.dumps() directly.
# build the string considering all keys in the data dictionary
prefix = "Record("
suffix = ")"
text = f"text_key={self.text_key}, "
text += ", ".join([f"{k}={v}" for k, v in self.data.items()])
return prefix + text + suffix
# check which attributes the Record has by checking the keys in the data dictionary
def __dir__(self):
return super().__dir__() + list(self.data.keys())
def __str__(self) -> str:
# return a JSON string representation of the Record atributes
return json.dumps(self.data)
INPUT_FIELD_NAME = "input_value"

View file

@ -1,21 +1,20 @@
from datetime import datetime, timedelta, timezone
from typing import Annotated, Coroutine, Optional, Union
from uuid import UUID
import warnings
from cryptography.fernet import Fernet
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader, APIKeyQuery, OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlmodel import Session
from starlette.websockets import WebSocket
from langflow.services.database.models.api_key.crud import check_key
from langflow.services.database.models.api_key.model import ApiKey
from langflow.services.database.models.user.crud import (
get_user_by_id,
get_user_by_username,
update_user_last_login_at,
)
from langflow.services.database.models.user.crud import get_user_by_id, get_user_by_username, update_user_last_login_at
from langflow.services.database.models.user.model import User
from langflow.services.deps import get_session, get_settings_service
@ -111,11 +110,15 @@ async def get_current_user_by_jwt(
raise credentials_exception
try:
payload = jwt.decode(
token,
settings_service.auth_settings.SECRET_KEY.get_secret_value(),
algorithms=[settings_service.auth_settings.ALGORITHM],
)
# Ignore warning about datetime.utcnow
with warnings.catch_warnings():
warnings.simplefilter("ignore")
payload = jwt.decode(
token,
settings_service.auth_settings.SECRET_KEY.get_secret_value(),
algorithms=[settings_service.auth_settings.ALGORITHM],
)
user_id: UUID = payload.get("sub") # type: ignore
token_type: str = payload.get("type") # type: ignore
if expires := payload.get("exp", None):
@ -285,11 +288,14 @@ def create_refresh_token(refresh_token: str, db: Session = Depends(get_session))
settings_service = get_settings_service()
try:
payload = jwt.decode(
refresh_token,
settings_service.auth_settings.SECRET_KEY.get_secret_value(),
algorithms=[settings_service.auth_settings.ALGORITHM],
)
# Ignore warning about datetime.utcnow
with warnings.catch_warnings():
warnings.simplefilter("ignore")
payload = jwt.decode(
refresh_token,
settings_service.auth_settings.SECRET_KEY.get_secret_value(),
algorithms=[settings_service.auth_settings.ALGORITHM],
)
user_id: UUID = payload.get("sub") # type: ignore
token_type: str = payload.get("type") # type: ignore
@ -349,3 +355,4 @@ def decrypt_api_key(encrypted_api_key: str, settings_service=Depends(get_setting
encoded_bytes = encrypted_api_key
decrypted_key = fernet.decrypt(encoded_bytes).decode()
return decrypted_key
return decrypted_key

View file

@ -1,6 +1,7 @@
from .api_key import ApiKey
from .flow import Flow
from .folder import Folder
from .user import User
from .variable import Variable
__all__ = ["Flow", "User", "ApiKey", "Variable"]
__all__ = ["Flow", "User", "ApiKey", "Variable", "Folder"]

View file

@ -1,7 +1,7 @@
# Path: src/backend/langflow/services/database/models/flow/model.py
import warnings
from datetime import datetime
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, Optional
from uuid import UUID, uuid4
@ -13,6 +13,7 @@ from sqlmodel import JSON, Column, Field, Relationship, SQLModel
from langflow.schema.schema import Record
if TYPE_CHECKING:
from langflow.services.database.models.folder import Folder
from langflow.services.database.models.user import User
@ -23,8 +24,8 @@ class FlowBase(SQLModel):
icon_bg_color: Optional[str] = Field(default=None, nullable=True)
data: Optional[Dict] = Field(default=None, nullable=True)
is_component: Optional[bool] = Field(default=False, nullable=True)
updated_at: Optional[datetime] = Field(default_factory=datetime.utcnow, nullable=True)
folder: Optional[str] = Field(default=None, nullable=True)
updated_at: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=True)
folder_id: Optional[UUID] = Field(default=None, nullable=True)
@field_validator("icon_bg_color")
def validate_icon_bg_color(cls, v):
@ -112,6 +113,8 @@ class Flow(FlowBase, table=True):
data: Optional[Dict] = Field(default=None, sa_column=Column(JSON))
user_id: Optional[UUID] = Field(index=True, foreign_key="user.id", nullable=True)
user: "User" = Relationship(back_populates="flows")
folder_id: Optional[UUID] = Field(default=None, foreign_key="folder.id", nullable=True, index=True)
folder: Optional["Folder"] = Relationship(back_populates="flows")
def to_record(self):
serialized = self.model_dump()
@ -128,14 +131,17 @@ class Flow(FlowBase, table=True):
class FlowCreate(FlowBase):
user_id: Optional[UUID] = None
folder_id: Optional[UUID] = None
class FlowRead(FlowBase):
id: UUID
user_id: Optional[UUID] = Field()
folder_id: Optional[UUID] = Field()
class FlowUpdate(SQLModel):
name: Optional[str] = None
description: Optional[str] = None
data: Optional[Dict] = None
folder_id: Optional[UUID] = None

View file

@ -0,0 +1,3 @@
from .model import Folder, FolderCreate, FolderRead, FolderUpdate
__all__ = ["Folder", "FolderCreate", "FolderRead", "FolderUpdate"]

View file

@ -0,0 +1,2 @@
DEFAULT_FOLDER_DESCRIPTION = "Manage your personal projects. Download and upload entire collections."
DEFAULT_FOLDER_NAME = "My Projects"

View file

@ -0,0 +1,55 @@
from typing import TYPE_CHECKING, List, Optional
from uuid import UUID, uuid4
from sqlmodel import Field, Relationship, SQLModel
from langflow.services.database.models.flow.model import FlowRead
if TYPE_CHECKING:
from langflow.services.database.models.flow.model import Flow
from langflow.services.database.models.user.model import User
class FolderBase(SQLModel):
name: str = Field(index=True)
description: Optional[str] = Field(default=None)
class Folder(FolderBase, table=True):
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
parent_id: Optional[UUID] = Field(default=None, foreign_key="folder.id")
parent: Optional["Folder"] = Relationship(
back_populates="children",
sa_relationship_kwargs=dict(remote_side="Folder.id"),
)
children: List["Folder"] = Relationship(back_populates="parent")
user_id: Optional[UUID] = Field(default=None, foreign_key="user.id")
user: "User" = Relationship(back_populates="folders")
flows: List["Flow"] = Relationship(
back_populates="folder", sa_relationship_kwargs={"cascade": "all, delete, delete-orphan"}
)
class FolderCreate(FolderBase):
components_list: Optional[List[UUID]] = None
flows_list: Optional[List[UUID]] = None
class FolderRead(FolderBase):
id: UUID
parent_id: Optional[UUID] = Field()
class FolderReadWithFlows(FolderBase):
id: UUID
parent_id: Optional[UUID] = Field()
flows: List["FlowRead"] = Field(default=[])
class FolderUpdate(SQLModel):
name: Optional[str] = None
description: Optional[str] = None
parent_id: Optional[UUID] = None
components: Optional[List[UUID]] = None
flows: Optional[List[UUID]] = None

View file

@ -0,0 +1,26 @@
from typing import TYPE_CHECKING
from uuid import UUID
from langflow.services.database.models.flow.model import Flow
from sqlmodel import Session, select, update
from .constants import DEFAULT_FOLDER_DESCRIPTION, DEFAULT_FOLDER_NAME
from .model import Folder
if TYPE_CHECKING:
from langflow.services.database.models.user.model import User
def create_default_folder_if_it_doesnt_exist(session: Session, user_id: UUID):
if not session.exec(select(Folder).where(Folder.user_id == user_id)).first():
folder = Folder(name=DEFAULT_FOLDER_NAME, user_id=user_id, description=DEFAULT_FOLDER_DESCRIPTION)
session.add(folder)
session.commit()
session.refresh(folder)
session.exec(
update(Flow)
.where((Flow.folder_id == None) & (Flow.user_id == user_id))
.values(folder_id=folder.id)
)
session.commit()
return None

View file

@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
from uuid import UUID, uuid4
@ -8,6 +8,7 @@ if TYPE_CHECKING:
from langflow.services.database.models.api_key import ApiKey
from langflow.services.database.models.variable import Variable
from langflow.services.database.models.flow import Flow
from langflow.services.database.models.folder import Folder
class User(SQLModel, table=True):
@ -17,8 +18,8 @@ class User(SQLModel, table=True):
profile_image: Optional[str] = Field(default=None, nullable=True)
is_active: bool = Field(default=False)
is_superuser: bool = Field(default=False)
create_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
create_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
last_login_at: Optional[datetime] = Field(default=None, nullable=True)
api_keys: list["ApiKey"] = Relationship(
back_populates="user",
@ -30,6 +31,10 @@ class User(SQLModel, table=True):
back_populates="user",
sa_relationship_kwargs={"cascade": "delete"},
)
folders: list["Folder"] = Relationship(
back_populates="user",
sa_relationship_kwargs={"cascade": "delete"},
)
class UserCreate(SQLModel):

View file

@ -2,15 +2,16 @@ import json
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional
from pydantic import BaseModel, Field, field_serializer, validator
from pydantic import BaseModel, Field, field_serializer, field_validator
if TYPE_CHECKING:
from langflow.schema import Record
class TransactionModel(BaseModel):
id: Optional[int] = Field(default=None, alias="id")
index: Optional[int] = Field(default=None)
timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp")
flow_id: str
source: str
target: str
target_args: dict
@ -22,15 +23,53 @@ class TransactionModel(BaseModel):
populate_by_name = True
# validate target_args in case it is a JSON
@validator("target_args", pre=True)
@field_validator("target_args", mode="before")
def validate_target_args(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
@field_serializer("target_args")
def serialize_target_args(v):
if isinstance(v, dict):
return json.dumps(v)
return v
class TransactionModelResponse(BaseModel):
index: Optional[int] = Field(default=None)
timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp")
flow_id: str
source: str
target: str
target_args: dict
status: str
error: Optional[str] = None
class Config:
from_attributes = True
populate_by_name = True
# validate target_args in case it is a JSON
@field_validator("target_args", mode="before")
def validate_target_args(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
@field_validator("index", mode="before")
def validate_id(cls, v):
if isinstance(v, float):
try:
return int(v)
except ValueError:
return None
return v
class MessageModel(BaseModel):
id: Optional[int] = Field(default=None, alias="id")
index: Optional[int] = Field(default=None)
flow_id: Optional[str] = Field(default=None, alias="flow_id")
timestamp: datetime = Field(default_factory=datetime.now)
sender: str
sender_name: str
@ -42,14 +81,14 @@ class MessageModel(BaseModel):
from_attributes = True
populate_by_name = True
@validator("artifacts", pre=True)
@field_validator("artifacts", mode="before")
def validate_target_args(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
@classmethod
def from_record(cls, record: "Record"):
def from_record(cls, record: "Record", flow_id: Optional[str] = None):
# first check if the record has all the required fields
if not record.data or ("sender" not in record.data and "sender_name" not in record.data):
raise ValueError("The record does not have the required fields 'sender' and 'sender_name' in the data.")
@ -59,9 +98,30 @@ class MessageModel(BaseModel):
message=record.text,
session_id=record.session_id,
artifacts=record.artifacts or {},
timestamp=record.timestamp,
flow_id=flow_id,
)
class MessageModelResponse(MessageModel):
index: Optional[int] = Field(default=None)
@field_validator("artifacts", mode="before")
def serialize_artifacts(v):
if isinstance(v, str):
return json.loads(v)
return v
@field_validator("index", mode="before")
def validate_id(cls, v):
if isinstance(v, float):
try:
return int(v)
except ValueError:
return None
return v
class VertexBuildModel(BaseModel):
index: Optional[int] = Field(default=None, alias="index", exclude=True)
id: Optional[str] = Field(default=None, alias="id")
@ -86,9 +146,11 @@ class VertexBuildModel(BaseModel):
elif isinstance(value, list) and all(isinstance(i, BaseModel) for i in value):
v[key] = [i.model_dump() for i in value]
return json.dumps(v)
elif isinstance(v, BaseModel):
return v.model_dump_json()
return v
@validator("params", pre=True)
@field_validator("params", mode="before")
def validate_params(cls, v):
if isinstance(v, str):
try:
@ -103,16 +165,18 @@ class VertexBuildModel(BaseModel):
return json.dumps([i.model_dump() for i in v])
return v
@validator("data", pre=True)
@field_validator("data", mode="before")
def validate_data(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
@validator("artifacts", pre=True)
@field_validator("artifacts", mode="before")
def validate_artifacts(cls, v):
if isinstance(v, str):
return json.loads(v)
elif isinstance(v, BaseModel):
return v.model_dump()
return v

View file

@ -69,7 +69,7 @@ class MonitorService(Service):
valid: Optional[bool] = None,
order_by: Optional[str] = "timestamp",
):
query = "SELECT id, flow_id, valid, params, data, artifacts, timestamp FROM vertex_builds"
query = "SELECT index,flow_id, valid, params, data, artifacts, timestamp FROM vertex_builds"
conditions = []
if flow_id:
conditions.append(f"flow_id = '{flow_id}'")
@ -109,6 +109,7 @@ class MonitorService(Service):
def get_messages(
self,
flow_id: Optional[str] = None,
sender: Optional[str] = None,
sender_name: Optional[str] = None,
session_id: Optional[str] = None,
@ -116,7 +117,7 @@ class MonitorService(Service):
order: Optional[str] = "DESC",
limit: Optional[int] = None,
):
query = "SELECT sender_name, sender, session_id, message, artifacts, timestamp FROM messages"
query = "SELECT index, flow_id, sender_name, sender, session_id, message, artifacts, timestamp FROM messages"
conditions = []
if sender:
conditions.append(f"sender = '{sender}'")
@ -124,6 +125,8 @@ class MonitorService(Service):
conditions.append(f"sender_name = '{sender_name}'")
if session_id:
conditions.append(f"session_id = '{session_id}'")
if flow_id:
conditions.append(f"flow_id = '{flow_id}'")
if conditions:
query += " WHERE " + " AND ".join(conditions)
@ -146,8 +149,9 @@ class MonitorService(Service):
target: Optional[str] = None,
status: Optional[str] = None,
order_by: Optional[str] = "timestamp",
flow_id: Optional[str] = None,
):
query = "SELECT source, target, target_args, status, error, timestamp FROM transactions"
query = "SELECT index,flow_id, source, target, target_args, status, error, timestamp FROM transactions"
conditions = []
if source:
conditions.append(f"source = '{source}'")
@ -155,6 +159,8 @@ class MonitorService(Service):
conditions.append(f"target = '{target}'")
if status:
conditions.append(f"status = '{status}'")
if flow_id:
conditions.append(f"flow_id = '{flow_id}'")
if conditions:
query += " WHERE " + " AND ".join(conditions)

View file

@ -61,7 +61,7 @@ def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, mode
if current_schema != desired_schema:
# If they don't match, drop the existing table and create a new one
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
if "id" in desired_schema.keys():
if INDEX_KEY in desired_schema.keys():
# Create a sequence for the id column
try:
conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;")
@ -91,7 +91,7 @@ def add_row_to_table(
columns = ", ".join(keys)
values_placeholders = ", ".join(["?" for _ in keys])
values = list(validated_dict.values())
values = [validated_dict[key] for key in keys]
# Create the insert statement
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values_placeholders})"
@ -104,7 +104,7 @@ def add_row_to_table(
column_error_message = ""
for key, value in validated_dict.items():
logger.error(f"{key}: {type(value)}")
if value in str(e):
if str(value) in str(e):
column_error_message = f"Column: {key} Value: {value} Error: {e}"
if column_error_message:
@ -119,6 +119,7 @@ async def log_message(
message: str,
session_id: str,
artifacts: Optional[dict] = None,
flow_id: Optional[str] = None,
):
try:
from langflow.graph.vertex.base import Vertex
@ -134,6 +135,7 @@ async def log_message(
"artifacts": artifacts or {},
"session_id": session_id,
"timestamp": monitor_service.get_timestamp(),
"flow_id": flow_id,
}
monitor_service.add_row(table_name="messages", data=row)
except Exception as e:

View file

@ -7,13 +7,12 @@ from typing import Any, List, Optional, Tuple, Type
import orjson
import yaml
from langflow.services.settings.constants import VARIABLES_TO_GET_FROM_ENVIRONMENT
from loguru import logger
from pydantic import field_validator, validator
from pydantic import field_validator
from pydantic.fields import FieldInfo
from pydantic_settings import BaseSettings, EnvSettingsSource, PydanticBaseSettingsSource, SettingsConfigDict
from langflow.services.settings.constants import VARIABLES_TO_GET_FROM_ENVIRONMENT
# BASE_COMPONENTS_PATH = str(Path(__file__).parent / "components")
BASE_COMPONENTS_PATH = str(Path(__file__).parent.parent.parent / "components")
@ -72,11 +71,14 @@ class Settings(BaseSettings):
TOOLKITS: dict = {}
TEXTSPLITTERS: dict = {}
UTILITIES: dict = {}
OUTPUT_PARSERS: dict = {}
CUSTOM_COMPONENTS: dict = {}
# Define the default LANGFLOW_DIR
CONFIG_DIR: Optional[str] = None
# Define if langflow db should be saved in config dir or
# in the langflow directory
SAVE_DB_IN_CONFIG_DIR: bool = False
"""Define if langflow database should be saved in LANGFLOW_CONFIG_DIR or in the langflow directory (i.e. in the package directory)."""
DEV: bool = False
DATABASE_URL: Optional[str] = None
@ -109,12 +111,16 @@ class Settings(BaseSettings):
CELERY_ENABLED: bool = False
fallback_to_env_var: bool = True
"""If set to True, Global Variables set in the UI will fallback to a environment variable
with the same name in case Langflow fails to retrieve the variable value."""
store_environment_variables: bool = True
"""Whether to store environment variables as Global Variables in the database."""
variables_to_get_from_environment: list[str] = VARIABLES_TO_GET_FROM_ENVIRONMENT
"""List of environment variables to get from the environment and store in the database."""
@validator("CONFIG_DIR", pre=True, allow_reuse=True)
@field_validator("CONFIG_DIR", mode="before")
def set_langflow_dir(cls, value):
if not value:
from platformdirs import user_cache_dir
@ -137,8 +143,8 @@ class Settings(BaseSettings):
return str(value)
@validator("DATABASE_URL", pre=True)
def set_database_url(cls, value, values):
@field_validator("DATABASE_URL", mode="before")
def set_database_url(cls, value, info):
if not value:
logger.debug("No database_url provided, trying LANGFLOW_DATABASE_URL env variable")
if langflow_database_url := os.getenv("LANGFLOW_DATABASE_URL"):
@ -149,29 +155,36 @@ class Settings(BaseSettings):
# Originally, we used sqlite:///./langflow.db
# so we need to migrate to the new format
# if there is a database in that location
if not values["CONFIG_DIR"]:
if not info.data["CONFIG_DIR"]:
raise ValueError("CONFIG_DIR not set, please set it or provide a DATABASE_URL")
from langflow.version import is_pre_release # type: ignore
if info.data["SAVE_DB_IN_CONFIG_DIR"]:
database_dir = info.data["CONFIG_DIR"]
logger.debug(f"Saving database to CONFIG_DIR: {database_dir}")
else:
database_dir = Path(__file__).parent.parent.parent.resolve()
logger.debug(f"Saving database to langflow directory: {database_dir}")
pre_db_file_name = "langflow-pre.db"
db_file_name = "langflow.db"
new_pre_path = f"{values['CONFIG_DIR']}/{pre_db_file_name}"
new_path = f"{values['CONFIG_DIR']}/{db_file_name}"
new_pre_path = f"{database_dir}/{pre_db_file_name}"
new_path = f"{database_dir}/{db_file_name}"
final_path = None
if is_pre_release:
if Path(new_pre_path).exists():
final_path = new_pre_path
elif Path(new_path).exists():
elif Path(new_path).exists() and info.data["SAVE_DB_IN_CONFIG_DIR"]:
# We need to copy the current db to the new location
logger.debug("Copying existing database to new location")
copy2(new_path, new_pre_path)
logger.debug(f"Copied existing database to {new_pre_path}")
elif Path(f"./{db_file_name}").exists():
elif Path(f"./{db_file_name}").exists() and info.data["SAVE_DB_IN_CONFIG_DIR"]:
logger.debug("Copying existing database to new location")
copy2(f"./{db_file_name}", new_pre_path)
logger.debug(f"Copied existing database to {new_pre_path}")
else:
logger.debug(f"Database already exists at {new_pre_path}, using it")
logger.debug(f"Creating new database at {new_pre_path}")
final_path = new_pre_path
else:
if Path(new_path).exists():
@ -241,7 +254,6 @@ class Settings(BaseSettings):
self.VECTORSTORES = new_settings.VECTORSTORES or {}
self.DOCUMENTLOADERS = new_settings.DOCUMENTLOADERS or {}
self.RETRIEVERS = new_settings.RETRIEVERS or {}
self.OUTPUT_PARSERS = new_settings.OUTPUT_PARSERS or {}
self.CUSTOM_COMPONENTS = new_settings.CUSTOM_COMPONENTS or {}
self.COMPONENTS_PATH = new_settings.COMPONENTS_PATH or []
self.DEV = dev
@ -313,3 +325,6 @@ def load_settings_from_yaml(file_path: str) -> Settings:
logger.debug(f"Loading {len(settings_dict[key])} {key} from {file_path}")
return Settings(**settings_dict)
return Settings(**settings_dict)
return Settings(**settings_dict)
return Settings(**settings_dict)

Some files were not shown because too many files have changed in this diff Show more