Merge branch 'zustand/io/migration' into better_io

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-03-03 14:32:04 -03:00
commit 390ccb892a
343 changed files with 8981 additions and 6790 deletions

View file

@ -109,7 +109,11 @@ def version_callback(value: bool):
@app.callback()
def main_entry_point(
version: bool = typer.Option(
None, "--version", callback=version_callback, is_eager=True, help="Show the version and exit."
None,
"--version",
callback=version_callback,
is_eager=True,
help="Show the version and exit.",
),
):
"""

View file

@ -63,7 +63,7 @@ version_path_separator = os # Use os.pathsep. Default configuration used for ne
# This is the path to the db in the root of the project.
# When the user runs the Langflow the database url will
# be set dinamically.
sqlalchemy.url = sqlite:///../../../langflow.db
sqlalchemy.url = sqlite:///./langflow.db
[post_write_hooks]
@ -98,7 +98,7 @@ handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
level = DEBUG
handlers =
qualname = alembic

View file

@ -1,10 +1,11 @@
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from loguru import logger
from sqlalchemy import engine_from_config, pool
from langflow.services.database.models import * # noqa
from langflow.services.database.service import SQLModel
# this is the Alembic Config object, which provides
@ -40,7 +41,8 @@ def run_migrations_offline() -> None:
script output.
"""
url = config.get_main_option("sqlalchemy.url")
url = os.getenv("LANGFLOW_DATABASE_URL")
url = url or config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
@ -60,12 +62,17 @@ def run_migrations_online() -> None:
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
from langflow.services.deps import get_db_service
try:
connectable = get_db_service().engine
except Exception as e:
logger.error(f"Error getting database engine: {e}")
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata, render_as_batch=True

View file

@ -10,6 +10,7 @@ from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import sqlmodel
from sqlalchemy.engine.reflection import Inspector
${imports if imports else ""}
# revision identifiers, used by Alembic.
@ -20,8 +21,12 @@ depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
${upgrades if upgrades else "pass"}
def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
${downgrades if downgrades else "pass"}

View file

@ -5,28 +5,43 @@ Revises: 1ef9c4f3765d
Create Date: 2023-12-13 18:55:52.587360
"""
from typing import Sequence, Union
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = '006b3990db50'
down_revision: Union[str, None] = '1ef9c4f3765d'
revision: str = "006b3990db50"
down_revision: Union[str, None] = "1ef9c4f3765d"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
api_key_constraints = inspector.get_unique_constraints("apikey")
flow_constraints = inspector.get_unique_constraints("flow")
user_constraints = inspector.get_unique_constraints("user")
try:
with op.batch_alter_table('apikey', schema=None) as batch_op:
batch_op.create_unique_constraint('uq_apikey_id', ['id'])
if not any(
constraint["name"] == "uq_apikey_id" for constraint in api_key_constraints
):
with op.batch_alter_table("apikey", schema=None) as batch_op:
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.create_unique_constraint('uq_flow_id', ['id'])
with op.batch_alter_table('user', schema=None) as batch_op:
batch_op.create_unique_constraint('uq_user_id', ['id'])
batch_op.create_unique_constraint("uq_apikey_id", ["id"])
if not any(
constraint["name"] == "uq_flow_id" for constraint in flow_constraints
):
with op.batch_alter_table("flow", schema=None) as batch_op:
batch_op.create_unique_constraint("uq_flow_id", ["id"])
if not any(
constraint["name"] == "uq_user_id" for constraint in user_constraints
):
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.create_unique_constraint("uq_user_id", ["id"])
except Exception as e:
print(e)
pass
@ -36,15 +51,24 @@ def upgrade() -> None:
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
api_key_constraints = inspector.get_unique_constraints("apikey")
flow_constraints = inspector.get_unique_constraints("flow")
user_constraints = inspector.get_unique_constraints("user")
try:
with op.batch_alter_table('user', schema=None) as batch_op:
batch_op.drop_constraint('uq_user_id', type_='unique')
if any(
constraint["name"] == "uq_apikey_id" for constraint in api_key_constraints
):
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.drop_constraint("uq_user_id", type_="unique")
if any(constraint["name"] == "uq_flow_id" for constraint in flow_constraints):
with op.batch_alter_table("flow", schema=None) as batch_op:
batch_op.drop_constraint("uq_flow_id", type_="unique")
if any(constraint["name"] == "uq_user_id" for constraint in user_constraints):
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.drop_constraint('uq_flow_id', type_='unique')
with op.batch_alter_table('apikey', schema=None) as batch_op:
batch_op.drop_constraint('uq_apikey_id', type_='unique')
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.drop_constraint("uq_apikey_id", type_="unique")
except Exception as e:
print(e)
pass

View file

@ -5,67 +5,25 @@ Revises: 006b3990db50
Create Date: 2024-01-17 10:32:56.686287
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = '0b8757876a7c'
down_revision: Union[str, None] = '006b3990db50'
revision: str = "0b8757876a7c"
down_revision: Union[str, None] = "006b3990db50"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
with op.batch_alter_table('apikey', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_apikey_api_key'), ['api_key'], unique=True)
batch_op.create_index(batch_op.f('ix_apikey_name'), ['name'], unique=False)
batch_op.create_index(batch_op.f('ix_apikey_user_id'), ['user_id'], unique=False)
except Exception as e:
print(e)
pass
try:
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_flow_description'), ['description'], unique=False)
batch_op.create_index(batch_op.f('ix_flow_name'), ['name'], unique=False)
batch_op.create_index(batch_op.f('ix_flow_user_id'), ['user_id'], unique=False)
except Exception as e:
print(e)
pass
pass
try:
with op.batch_alter_table('user', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_user_username'), ['username'], unique=True)
except Exception as e:
print(e)
pass
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
with op.batch_alter_table('user', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_user_username'))
except Exception as e:
print(e)
pass
try:
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_flow_user_id'))
batch_op.drop_index(batch_op.f('ix_flow_name'))
batch_op.drop_index(batch_op.f('ix_flow_description'))
except Exception as e:
print(e)
pass
try:
with op.batch_alter_table('apikey', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_apikey_user_id'))
batch_op.drop_index(batch_op.f('ix_apikey_name'))
batch_op.drop_index(batch_op.f('ix_apikey_api_key'))
except Exception as e:
print(e)
pass
# ### end Alembic commands ###
pass
# ### end Alembic commands ###

View file

@ -6,6 +6,7 @@ Revises: fd531f8868b1
Create Date: 2023-12-04 15:00:27.968998
"""
from typing import Sequence, Union
import sqlalchemy as sa
@ -13,8 +14,8 @@ import sqlmodel
from alembic import op
# revision identifiers, used by Alembic.
revision: str = '1ef9c4f3765d'
down_revision: Union[str, None] = 'fd531f8868b1'
revision: str = "1ef9c4f3765d"
down_revision: Union[str, None] = "fd531f8868b1"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
@ -22,10 +23,10 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
with op.batch_alter_table('apikey', schema=None) as batch_op:
batch_op.alter_column('name',
existing_type=sqlmodel.sql.sqltypes.AutoString(),
nullable=True)
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.alter_column(
"name", existing_type=sqlmodel.sql.sqltypes.AutoString(), nullable=True
)
except Exception as e:
pass
# ### end Alembic commands ###
@ -34,10 +35,8 @@ def upgrade() -> None:
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
with op.batch_alter_table('apikey', schema=None) as batch_op:
batch_op.alter_column('name',
existing_type=sa.VARCHAR(),
nullable=False)
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=False)
except Exception as e:
pass
# ### end Alembic commands ###

View file

@ -5,6 +5,7 @@ Revises:
Create Date: 2023-08-27 19:49:02.681355
"""
from typing import Sequence, Union
import sqlalchemy as sa
@ -33,7 +34,9 @@ def upgrade() -> None:
if "ix_flowstyle_flow_id" in [
index["name"] for index in inspector.get_indexes("flowstyle")
]:
op.drop_index("ix_flowstyle_flow_id", table_name="flowstyle")
op.drop_index(
"ix_flowstyle_flow_id", table_name="flowstyle", if_exists=True
)
existing_indices_flow = []
existing_fks_flow = []
@ -80,8 +83,7 @@ def upgrade() -> None:
sa.Column("api_key", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.ForeignKeyConstraint(
["user_id"],
["user.id"],
["user_id"], ["user.id"], name="fk_apikey_user_id_user"
),
sa.PrimaryKeyConstraint("id", name="pk_apikey"),
sa.UniqueConstraint("id", name="uq_apikey_id"),
@ -103,8 +105,7 @@ def upgrade() -> None:
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.ForeignKeyConstraint(
["user_id"],
["user.id"],
["user_id"], ["user.id"], name="fk_flow_user_id_user"
),
sa.PrimaryKeyConstraint("id", name="pk_flow"),
sa.UniqueConstraint("id", name="uq_flow_id"),
@ -151,21 +152,21 @@ def downgrade() -> None:
existing_tables = inspector.get_table_names()
if "flow" in existing_tables:
with op.batch_alter_table("flow", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_flow_user_id"))
batch_op.drop_index(batch_op.f("ix_flow_name"))
batch_op.drop_index(batch_op.f("ix_flow_description"))
batch_op.drop_index(batch_op.f("ix_flow_user_id"), if_exists=True)
batch_op.drop_index(batch_op.f("ix_flow_name"), if_exists=True)
batch_op.drop_index(batch_op.f("ix_flow_description"), if_exists=True)
op.drop_table("flow")
if "apikey" in existing_tables:
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_apikey_user_id"))
batch_op.drop_index(batch_op.f("ix_apikey_name"))
batch_op.drop_index(batch_op.f("ix_apikey_api_key"))
batch_op.drop_index(batch_op.f("ix_apikey_user_id"), if_exists=True)
batch_op.drop_index(batch_op.f("ix_apikey_name"), if_exists=True)
batch_op.drop_index(batch_op.f("ix_apikey_api_key"), if_exists=True)
op.drop_table("apikey")
if "user" in existing_tables:
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_user_username"))
batch_op.drop_index(batch_op.f("ix_user_username"), if_exists=True)
op.drop_table("user")

View file

@ -5,34 +5,44 @@ Revises: 7d2162acc8b2
Create Date: 2023-11-24 10:45:38.465302
"""
from typing import Sequence, Union
import sqlalchemy as sa
import sqlmodel
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = '2ac71eb9c3ae'
down_revision: Union[str, None] = '7d2162acc8b2'
revision: str = "2ac71eb9c3ae"
down_revision: Union[str, None] = "7d2162acc8b2"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
tables = inspector.get_table_names()
try:
op.create_table('credential',
sa.Column('name', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('value', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('provider', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('user_id', sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column('id', sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
)
if "credential" not in tables:
op.create_table(
"credential",
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("value", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column(
"provider", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
except Exception as e:
print(e)
pass
# ### end Alembic commands ###
@ -40,7 +50,7 @@ def upgrade() -> None:
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
op.drop_table('credential')
op.drop_table("credential")
except Exception as e:
print(e)
pass

View file

@ -5,6 +5,7 @@ Revises: 260dbcc8b680
Create Date: 2023-09-08 07:36:13.387318
"""
from typing import Sequence, Union
import sqlalchemy as sa
@ -21,29 +22,36 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
if "user" in inspector.get_table_names() and "profile_image" not in [
column["name"] for column in inspector.get_columns("user")
]:
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"profile_image", sqlmodel.sql.sqltypes.AutoString(), nullable=True
try:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
if "user" in inspector.get_table_names() and "profile_image" not in [
column["name"] for column in inspector.get_columns("user")
]:
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"profile_image",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
)
)
)
except Exception as e:
print(e)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
if "user" in inspector.get_table_names() and "profile_image" in [
column["name"] for column in inspector.get_columns("user")
]:
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.drop_column("profile_image")
try:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
if "user" in inspector.get_table_names() and "profile_image" in [
column["name"] for column in inspector.get_columns("user")
]:
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.drop_column("profile_image")
except Exception as e:
print(e)
# ### end Alembic commands ###

View file

@ -5,12 +5,13 @@ Revises: eb5866d51fd2
Create Date: 2023-10-18 23:08:57.744906
"""
from typing import Sequence, Union
import sqlalchemy as sa
import sqlmodel
from alembic import op
from loguru import logger
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = "7843803a87b5"
@ -21,19 +22,26 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
flow_columns = [column["name"] for column in inspector.get_columns("flow")]
user_columns = [column["name"] for column in inspector.get_columns("user")]
try:
with op.batch_alter_table("flow", schema=None) as batch_op:
batch_op.add_column(sa.Column("is_component", sa.Boolean(), nullable=True))
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"store_api_key", sqlmodel.AutoString(), nullable=True
if "is_component" not in flow_columns:
with op.batch_alter_table("flow", schema=None) as batch_op:
batch_op.add_column(
sa.Column("is_component", sa.Boolean(), nullable=True)
)
)
except Exception as e:
logger.exception(e)
pass
try:
if "store_api_key" not in user_columns:
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.add_column(
sa.Column("store_api_key", sqlmodel.AutoString(), nullable=True)
)
except Exception as e:
pass
# ### end Alembic commands ###

View file

@ -5,88 +5,74 @@ Revises: f5ee9749d1a6
Create Date: 2023-11-21 20:56:53.998781
"""
from typing import Sequence, Union
import sqlalchemy as sa
import sqlmodel
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = '7d2162acc8b2'
down_revision: Union[str, None] = 'f5ee9749d1a6'
revision: str = "7d2162acc8b2"
down_revision: Union[str, None] = "f5ee9749d1a6"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
api_key_columns = [column["name"] for column in inspector.get_columns("apikey")]
flow_columns = [column["name"] for column in inspector.get_columns("flow")]
try:
with op.batch_alter_table('component', schema=None) as batch_op:
batch_op.drop_index('ix_component_frontend_node_id')
batch_op.drop_index('ix_component_name')
op.drop_table('component')
op.drop_table('flowstyle')
if "name" in api_key_columns:
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.alter_column(
"name", existing_type=sa.VARCHAR(), nullable=False
)
except Exception as e:
print(e)
pass
with op.batch_alter_table('apikey', schema=None) as batch_op:
batch_op.alter_column('name',
existing_type=sa.VARCHAR(),
nullable=False)
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.add_column(sa.Column('updated_at', sa.DateTime(), nullable=True))
batch_op.add_column(sa.Column('folder', sqlmodel.sql.sqltypes.AutoString(), nullable=True))
pass
try:
with op.batch_alter_table("flow", schema=None) as batch_op:
if "updated_at" not in flow_columns:
batch_op.add_column(
sa.Column("updated_at", sa.DateTime(), nullable=True)
)
if "folder" not in flow_columns:
batch_op.add_column(
sa.Column(
"folder", sqlmodel.sql.sqltypes.AutoString(), nullable=True
)
)
except Exception as e:
print(e)
pass
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.drop_column('folder')
batch_op.drop_column('updated_at')
with op.batch_alter_table("flow", schema=None) as batch_op:
batch_op.drop_column("folder")
batch_op.drop_column("updated_at")
except Exception as e:
print(e)
pass
try:
with op.batch_alter_table('apikey', schema=None) as batch_op:
batch_op.alter_column('name',
existing_type=sa.VARCHAR(),
nullable=True)
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=True)
except Exception as e:
print(e)
pass
try:
op.create_table('flowstyle',
sa.Column('color', sa.VARCHAR(), nullable=False),
sa.Column('emoji', sa.VARCHAR(), nullable=False),
sa.Column('flow_id', sa.CHAR(length=32), nullable=True),
sa.Column('id', sa.CHAR(length=32), nullable=False),
sa.ForeignKeyConstraint(['flow_id'], ['flow.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('id')
)
op.create_table('component',
sa.Column('id', sa.CHAR(length=32), nullable=False),
sa.Column('frontend_node_id', sa.CHAR(length=32), nullable=False),
sa.Column('name', sa.VARCHAR(), nullable=False),
sa.Column('description', sa.VARCHAR(), nullable=True),
sa.Column('python_code', sa.VARCHAR(), nullable=True),
sa.Column('return_type', sa.VARCHAR(), nullable=True),
sa.Column('is_disabled', sa.BOOLEAN(), nullable=False),
sa.Column('is_read_only', sa.BOOLEAN(), nullable=False),
sa.Column('create_at', sa.DATETIME(), nullable=False),
sa.Column('update_at', sa.DATETIME(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
with op.batch_alter_table('component', schema=None) as batch_op:
batch_op.create_index('ix_component_name', ['name'], unique=False)
batch_op.create_index('ix_component_frontend_node_id', ['frontend_node_id'], unique=False)
except Exception as e:
print(e)
pass
# ### end Alembic commands ###

View file

@ -5,55 +5,105 @@ Revises: 0b8757876a7c
Create Date: 2024-01-26 13:31:14.797548
"""
from typing import Sequence, Union
import sqlalchemy as sa
import sqlmodel
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = 'b2fa308044b5'
down_revision: Union[str, None] = '0b8757876a7c'
revision: str = "b2fa308044b5"
down_revision: Union[str, None] = "0b8757876a7c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
tables = inspector.get_table_names()
# ### commands auto generated by Alembic - please adjust! ###
try:
op.drop_table('flowstyle')
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.add_column(sa.Column('is_component', sa.Boolean(), nullable=True))
batch_op.add_column(sa.Column('updated_at', sa.DateTime(), nullable=True))
batch_op.add_column(sa.Column('folder', sqlmodel.sql.sqltypes.AutoString(), nullable=True))
batch_op.add_column(sa.Column('user_id', sqlmodel.sql.sqltypes.GUID(), nullable=True))
batch_op.create_index(batch_op.f('ix_flow_user_id'), ['user_id'], unique=False)
batch_op.create_foreign_key('fk_flow_user_id_user', 'user', ['user_id'], ['id'])
if "flowstyle" in tables:
op.drop_table("flowstyle")
with op.batch_alter_table("flow", schema=None) as batch_op:
flow_columns = [column["name"] for column in inspector.get_columns("flow")]
if "is_component" not in flow_columns:
batch_op.add_column(
sa.Column("is_component", sa.Boolean(), nullable=True)
)
if "updated_at" not in flow_columns:
batch_op.add_column(
sa.Column("updated_at", sa.DateTime(), nullable=True)
)
if "folder" not in flow_columns:
batch_op.add_column(
sa.Column(
"folder", sqlmodel.sql.sqltypes.AutoString(), nullable=True
)
)
if "user_id" not in flow_columns:
batch_op.add_column(
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=True)
)
indices = inspector.get_indexes("flow")
indices_names = [index["name"] for index in indices]
if "ix_flow_user_id" not in indices_names:
batch_op.create_index(
batch_op.f("ix_flow_user_id"), ["user_id"], unique=False
)
if "fk_flow_user_id_user" not in indices_names:
batch_op.create_foreign_key(
"fk_flow_user_id_user", "user", ["user_id"], ["id"]
)
except Exception:
pass
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
try:
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.drop_constraint('fk_flow_user_id_user', type_='foreignkey')
batch_op.drop_index(batch_op.f('ix_flow_user_id'))
batch_op.drop_column('user_id')
batch_op.drop_column('folder')
batch_op.drop_column('updated_at')
batch_op.drop_column('is_component')
# Re-create the dropped table 'flowstyle' if it was previously dropped in upgrade
if "flowstyle" not in inspector.get_table_names():
op.create_table(
"flowstyle",
sa.Column("color", sa.String(), nullable=False),
sa.Column("emoji", sa.String(), nullable=False),
sa.Column("flow_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.ForeignKeyConstraint(["flow_id"], ["flow.id"]),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("id"),
)
op.create_table('flowstyle',
sa.Column('color', sa.VARCHAR(), nullable=False),
sa.Column('emoji', sa.VARCHAR(), nullable=False),
sa.Column('flow_id', sa.CHAR(length=32), nullable=True),
sa.Column('id', sa.CHAR(length=32), nullable=False),
sa.ForeignKeyConstraint(['flow_id'], ['flow.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('id')
)
except Exception:
pass
# ### end Alembic commands ###
with op.batch_alter_table("flow", schema=None) as batch_op:
# Check and remove newly added columns and constraints in upgrade
flow_columns = [column["name"] for column in inspector.get_columns("flow")]
if "user_id" in flow_columns:
batch_op.drop_column("user_id")
if "folder" in flow_columns:
batch_op.drop_column("folder")
if "updated_at" in flow_columns:
batch_op.drop_column("updated_at")
if "is_component" in flow_columns:
batch_op.drop_column("is_component")
indices = inspector.get_indexes("flow")
indices_names = [index["name"] for index in indices]
if "ix_flow_user_id" in indices_names:
batch_op.drop_index("ix_flow_user_id")
# Assuming fk_flow_user_id_user is a foreign key constraint's name, not an index
constraints = inspector.get_foreign_keys("flow")
constraint_names = [constraint["name"] for constraint in constraints]
if "fk_flow_user_id_user" in constraint_names:
batch_op.drop_constraint("fk_flow_user_id_user", type_="foreignkey")
except Exception as e:
# It's generally a good idea to log the exception or handle it in a way other than a bare pass
print(f"Error during downgrade: {e}")

View file

@ -5,46 +5,68 @@ Revises: b2fa308044b5
Create Date: 2024-01-26 13:34:14.496769
"""
from typing import Sequence, Union
import sqlalchemy as sa
import sqlmodel
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = 'bc2f01c40e4a'
down_revision: Union[str, None] = 'b2fa308044b5'
revision: str = "bc2f01c40e4a"
down_revision: Union[str, None] = "b2fa308044b5"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.add_column(sa.Column('is_component', sa.Boolean(), nullable=True))
batch_op.add_column(sa.Column('updated_at', sa.DateTime(), nullable=True))
batch_op.add_column(sa.Column('folder', sqlmodel.sql.sqltypes.AutoString(), nullable=True))
batch_op.add_column(sa.Column('user_id', sqlmodel.sql.sqltypes.GUID(), nullable=True))
batch_op.create_index(batch_op.f('ix_flow_user_id'), ['user_id'], unique=False)
batch_op.create_foreign_key('flow_user_id_fkey'
, 'user', ['user_id'], ['id'])
except Exception:
pass
# ### end Alembic commands ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
flow_columns = {column["name"] for column in inspector.get_columns("flow")}
flow_indexes = {index["name"] for index in inspector.get_indexes("flow")}
flow_fks = {fk["name"] for fk in inspector.get_foreign_keys("flow")}
with op.batch_alter_table("flow", schema=None) as batch_op:
if "is_component" not in flow_columns:
batch_op.add_column(sa.Column("is_component", sa.Boolean(), nullable=True))
if "updated_at" not in flow_columns:
batch_op.add_column(sa.Column("updated_at", sa.DateTime(), nullable=True))
if "folder" not in flow_columns:
batch_op.add_column(
sa.Column("folder", sqlmodel.sql.sqltypes.AutoString(), nullable=True)
)
if "user_id" not in flow_columns:
batch_op.add_column(
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=True)
)
if "ix_flow_user_id" not in flow_indexes:
batch_op.create_index(
batch_op.f("ix_flow_user_id"), ["user_id"], unique=False
)
if "flow_user_id_fkey" not in flow_fks:
batch_op.create_foreign_key(
"flow_user_id_fkey", "user", ["user_id"], ["id"]
)
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
with op.batch_alter_table('flow', schema=None) as batch_op:
batch_op.drop_constraint('flow_user_id_fkey', type_='foreignkey')
batch_op.drop_index(batch_op.f('ix_flow_user_id'))
batch_op.drop_column('user_id')
batch_op.drop_column('folder')
batch_op.drop_column('updated_at')
batch_op.drop_column('is_component')
except Exception:
pass
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
flow_columns = {column["name"] for column in inspector.get_columns("flow")}
flow_indexes = {index["name"] for index in inspector.get_indexes("flow")}
flow_fks = {fk["name"] for fk in inspector.get_foreign_keys("flow")}
# ### end Alembic commands ###
with op.batch_alter_table("flow", schema=None) as batch_op:
if "flow_user_id_fkey" in flow_fks:
batch_op.drop_constraint("flow_user_id_fkey", type_="foreignkey")
if "ix_flow_user_id" in flow_indexes:
batch_op.drop_index(batch_op.f("ix_flow_user_id"))
if "user_id" in flow_columns:
batch_op.drop_column("user_id")
if "folder" in flow_columns:
batch_op.drop_column("folder")
if "updated_at" in flow_columns:
batch_op.drop_column("updated_at")
if "is_component" in flow_columns:
batch_op.drop_column("is_component")

View file

@ -5,11 +5,10 @@ Revises: 67cc006d50bf
Create Date: 2023-10-04 10:18:25.640458
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy import exc
# revision identifiers, used by Alembic.
revision: str = "eb5866d51fd2"
@ -21,70 +20,12 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
connection = op.get_bind()
try:
op.drop_table("flowstyle")
with op.batch_alter_table("component", schema=None) as batch_op:
batch_op.drop_index("ix_component_frontend_node_id")
batch_op.drop_index("ix_component_name")
except exc.SQLAlchemyError:
# connection.execute(text("ROLLBACK"))
pass
except Exception as e:
print(e)
pass
try:
op.drop_table("component")
except exc.SQLAlchemyError:
# connection.execute(text("ROLLBACK"))
pass
except Exception as e:
print(e)
pass
pass
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
op.create_table(
"component",
sa.Column("id", sa.CHAR(length=32), nullable=False),
sa.Column("frontend_node_id", sa.CHAR(length=32), nullable=False),
sa.Column("name", sa.VARCHAR(), nullable=False),
sa.Column("description", sa.VARCHAR(), nullable=True),
sa.Column("python_code", sa.VARCHAR(), nullable=True),
sa.Column("return_type", sa.VARCHAR(), nullable=True),
sa.Column("is_disabled", sa.BOOLEAN(), nullable=False),
sa.Column("is_read_only", sa.BOOLEAN(), nullable=False),
sa.Column("create_at", sa.DATETIME(), nullable=False),
sa.Column("update_at", sa.DATETIME(), nullable=False),
sa.PrimaryKeyConstraint("id", name="pk_component"),
)
with op.batch_alter_table("component", schema=None) as batch_op:
batch_op.create_index("ix_component_name", ["name"], unique=False)
batch_op.create_index(
"ix_component_frontend_node_id", ["frontend_node_id"], unique=False
)
except Exception as e:
print(e)
pass
try:
op.create_table(
"flowstyle",
sa.Column("color", sa.VARCHAR(), nullable=False),
sa.Column("emoji", sa.VARCHAR(), nullable=False),
sa.Column("flow_id", sa.CHAR(length=32), nullable=True),
sa.Column("id", sa.CHAR(length=32), nullable=False),
sa.ForeignKeyConstraint(
["flow_id"],
["flow.id"],
),
sa.PrimaryKeyConstraint("id", name="pk_flowstyle"),
sa.UniqueConstraint("id", name="uq_flowstyle_id"),
)
except Exception as e:
print(e)
pass
pass
# ### end Alembic commands ###

View file

@ -5,6 +5,7 @@ Revises: 7843803a87b5
Create Date: 2023-10-18 23:12:27.297016
"""
from typing import Sequence, Union
import sqlalchemy as sa

View file

@ -5,22 +5,35 @@ Revises: 2ac71eb9c3ae
Create Date: 2023-11-24 15:07:37.566516
"""
from typing import Sequence, Union
from typing import Optional, Sequence, Union
from alembic import op
from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision: str = 'fd531f8868b1'
down_revision: Union[str, None] = '2ac71eb9c3ae'
revision: str = "fd531f8868b1"
down_revision: Union[str, None] = "2ac71eb9c3ae"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
tables = inspector.get_table_names()
foreign_keys_names = []
if "credential" in tables:
foreign_keys = inspector.get_foreign_keys("credential")
foreign_keys_names = [fk["name"] for fk in foreign_keys]
try:
with op.batch_alter_table('credential', schema=None) as batch_op:
batch_op.create_foreign_key("fk_credential_user_id", 'user', ['user_id'], ['id'])
if "credential" in tables and "fk_credential_user_id" not in foreign_keys_names:
with op.batch_alter_table("credential", schema=None) as batch_op:
batch_op.create_foreign_key(
"fk_credential_user_id", "user", ["user_id"], ["id"]
)
except Exception as e:
print(e)
pass
@ -30,9 +43,17 @@ def upgrade() -> None:
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
tables = inspector.get_table_names()
foreign_keys_names: list[Optional[str]] = []
if "credential" in tables:
foreign_keys = inspector.get_foreign_keys("credential")
foreign_keys_names = [fk["name"] for fk in foreign_keys]
try:
with op.batch_alter_table('credential', schema=None) as batch_op:
batch_op.drop_constraint("fk_credential_user_id", type_='foreignkey')
if "credential" in tables and "fk_credential_user_id" in foreign_keys_names:
with op.batch_alter_table("credential", schema=None) as batch_op:
batch_op.drop_constraint("fk_credential_user_id", type_="foreignkey")
except Exception as e:
print(e)
pass

View file

@ -3,9 +3,7 @@ from pathlib import Path
from typing import TYPE_CHECKING, List, Optional
from fastapi import HTTPException
from langchain_core.documents import Document
from platformdirs import user_cache_dir
from pydantic import BaseModel
from sqlmodel import Session
from langflow.graph.graph.base import Graph
@ -199,20 +197,6 @@ def format_elapsed_time(elapsed_time: float) -> str:
return f"{minutes} {minutes_unit}, {seconds} {seconds_unit}"
def serialize_field(value):
"""Unified serialization function for handling both BaseModel and Document types,
including handling lists of these types."""
if isinstance(value, (list, tuple)):
return [serialize_field(v) for v in value]
elif isinstance(value, Document):
return value.to_json()
elif isinstance(value, BaseModel):
return value.model_dump()
elif isinstance(value, str):
return {"result": value}
return value
def build_and_cache_graph(
flow_id: str,
session: Session,
@ -220,13 +204,37 @@ def build_and_cache_graph(
graph: Optional[Graph] = None,
):
"""Build and cache the graph."""
flow: Flow = session.get(Flow, flow_id)
flow: Optional[Flow] = session.get(Flow, flow_id)
if not flow or not flow.data:
raise ValueError("Invalid flow ID")
other_graph = Graph.from_payload(flow.data)
other_graph = Graph.from_payload(flow.data, flow_id)
if graph is None:
graph = other_graph
else:
graph = graph.update(other_graph)
chat_service.set_cache(flow_id, graph)
return graph
def format_syntax_error_message(exc: SyntaxError) -> str:
"""Format a SyntaxError message for returning to the frontend."""
if exc.text is None:
return f"Syntax error in code. Error on line {exc.lineno}"
return f"Syntax error in code. Error on line {exc.lineno}: {exc.text.strip()}"
def get_causing_exception(exc: BaseException) -> BaseException:
"""Get the causing exception from an exception."""
if hasattr(exc, "__cause__") and exc.__cause__:
return get_causing_exception(exc.__cause__)
return exc
def format_exception_message(exc: Exception) -> str:
"""Format an exception message for returning to the frontend."""
# We need to check if the __cause__ is a SyntaxError
# If it is, we need to return the message of the SyntaxError
causing_exception = get_causing_exception(exc)
if isinstance(causing_exception, SyntaxError):
return format_syntax_error_message(causing_exception)
return str(exc)

View file

@ -68,8 +68,6 @@ INVALID_CHARACTERS = {
")",
"[",
"]",
"{",
"}",
}
INVALID_NAMES = {
@ -88,73 +86,110 @@ def validate_prompt(template: str):
# Check if there are invalid characters in the input_variables
input_variables = check_input_variables(input_variables)
if any(var in INVALID_NAMES for var in input_variables):
raise ValueError(f"Invalid input variables. None of the variables can be named {', '.join(input_variables)}. ")
raise ValueError(
f"Invalid input variables. None of the variables can be named {', '.join(input_variables)}. "
)
try:
PromptTemplate(template=template, input_variables=input_variables)
except Exception as exc:
raise ValueError(str(exc)) from exc
raise ValueError(f"Invalid prompt: {exc}") from exc
return input_variables
def check_input_variables(input_variables: list):
def is_json_like(var):
if var.startswith("{{") and var.endswith("}}"):
# If it is a double brance variable
# we don't want to validate any of its content
return True
# the above doesn't work on all cases because the json string can be multiline
# or indented which can add \n or spaces at the start or end of the string
# test_case_3 new_var == '\n{{\n "test": "hello",\n "text": "world"\n}}\n'
# what we can do is to remove the \n and spaces from the start and end of the string
# and then check if the string starts with {{ and ends with }}
var = var.strip()
var = var.replace("\n", "")
var = var.replace(" ", "")
# Now it should be a valid json string
return var.startswith("{{") and var.endswith("}}")
def fix_variable(var, invalid_chars, wrong_variables):
if not var:
return var, invalid_chars, wrong_variables
new_var = var
# Handle variables starting with a number
if var[0].isdigit():
invalid_chars.append(var[0])
new_var, invalid_chars, wrong_variables = fix_variable(
var[1:], invalid_chars, wrong_variables
)
# Temporarily replace {{ and }} to avoid treating them as invalid
new_var = new_var.replace("{{", "ᴛᴇᴍᴘᴏᴘᴇɴ").replace("}}", "ᴛᴇᴍᴘʟsᴇ")
# Remove invalid characters
for char in new_var:
if char in INVALID_CHARACTERS:
invalid_chars.append(char)
new_var = new_var.replace(char, "")
if var not in wrong_variables: # Avoid duplicating entries
wrong_variables.append(var)
# Restore {{ and }}
new_var = new_var.replace("ᴛᴇᴍᴘᴏᴘᴇɴ", "{{").replace("ᴛᴇᴍᴘʟsᴇ", "}}")
return new_var, invalid_chars, wrong_variables
def check_variable(var, invalid_chars, wrong_variables, empty_variables):
if any(char in invalid_chars for char in var):
wrong_variables.append(var)
elif var == "":
empty_variables.append(var)
return wrong_variables, empty_variables
def check_for_errors(
input_variables, fixed_variables, wrong_variables, empty_variables
):
if any(var for var in input_variables if var not in fixed_variables):
error_message = (
f"Error: Input variables contain invalid characters or formats. \n"
f"Invalid variables: {', '.join(wrong_variables)}.\n"
f"Empty variables: {', '.join(empty_variables)}. \n"
f"Fixed variables: {', '.join(fixed_variables)}."
)
raise ValueError(error_message)
def check_input_variables(input_variables):
invalid_chars = []
fixed_variables = []
wrong_variables = []
empty_variables = []
for variable in input_variables:
new_var = variable
variables_to_check = []
# if variable is empty, then we should add that to the wrong variables
if not variable:
empty_variables.append(variable)
for var in input_variables:
# First, let's check if the variable is a JSON string
# because if it is, it won't be considered a variable
# and we don't need to validate it
if is_json_like(var):
continue
# if variable starts with a number we should add that to the invalid chars
# and wrong variables
if variable[0].isdigit():
invalid_chars.append(variable[0])
new_var = new_var.replace(variable[0], "")
wrong_variables.append(variable)
else:
for char in INVALID_CHARACTERS:
if char in variable:
invalid_chars.append(char)
new_var = new_var.replace(char, "")
wrong_variables.append(variable)
fixed_variables.append(new_var)
# If any of the input_variables is not in the fixed_variables, then it means that
# there are invalid characters in the input_variables
if any(var not in fixed_variables for var in input_variables):
error_message = build_error_message(
input_variables,
invalid_chars,
wrong_variables,
fixed_variables,
empty_variables,
new_var, wrong_variables, empty_variables = fix_variable(
var, invalid_chars, wrong_variables
)
raise ValueError(error_message)
return input_variables
wrong_variables, empty_variables = check_variable(
var, INVALID_CHARACTERS, wrong_variables, empty_variables
)
fixed_variables.append(new_var)
variables_to_check.append(var)
check_for_errors(
variables_to_check, fixed_variables, wrong_variables, empty_variables
)
def build_error_message(input_variables, invalid_chars, wrong_variables, fixed_variables, empty_variables):
input_variables_str = ", ".join([f"'{var}'" for var in input_variables])
error_string = f"Invalid input variables: {input_variables_str}. "
if wrong_variables and invalid_chars:
# fix the wrong variables replacing invalid chars and find them in the fixed variables
error_string_vars = "You can fix them by replacing the invalid characters: "
wvars = wrong_variables.copy()
for i, wrong_var in enumerate(wvars):
for char in invalid_chars:
wrong_var = wrong_var.replace(char, "")
if wrong_var in fixed_variables:
error_string_vars += f"'{wrong_variables[i]}' -> '{wrong_var}'"
error_string += error_string_vars
elif empty_variables:
error_string += f" There are {len(empty_variables)} empty variable{'s' if len(empty_variables) > 1 else ''}."
elif len(set(fixed_variables)) != len(fixed_variables):
error_string += "There are duplicate variables."
return error_string
return fixed_variables

View file

@ -1,128 +1,31 @@
import asyncio
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from uuid import UUID
from langchain.schema import AgentAction, AgentFinish
from langchain_core.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from langflow.api.v1.schemas import ChatResponse, PromptResponse
from langflow.services.deps import get_chat_service
from langflow.utils.util import remove_ansi_escape_codes
from langchain_core.callbacks.base import AsyncCallbackHandler
from loguru import logger
from langflow.api.v1.schemas import ChatResponse, PromptResponse
from langflow.services.deps import get_chat_service, get_socket_service
from langflow.utils.util import remove_ansi_escape_codes
if TYPE_CHECKING:
from langflow.services.socket.service import SocketIOService
class AsyncStreamingLLMCallbackHandler(AsyncCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, client_id: str = None):
self.chat_service = get_chat_service()
self.client_id = client_id
self.websocket = self.chat_service.active_connections[self.client_id]
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
await self.websocket.send_json(resp.model_dump())
async def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
"""Run when tool starts running."""
resp = ChatResponse(
message="",
type="stream",
intermediate_steps=f"Tool input: {input_str}",
)
await self.websocket.send_json(resp.model_dump())
async def on_tool_end(self, output: str, **kwargs: Any) -> Any:
"""Run when tool ends running."""
observation_prefix = kwargs.get("observation_prefix", "Tool output: ")
split_output = output.split()
first_word = split_output[0]
rest_of_output = split_output[1:]
# Create a formatted message.
intermediate_steps = f"{observation_prefix}{first_word}"
# Create a ChatResponse instance.
resp = ChatResponse(
message="",
type="stream",
intermediate_steps=intermediate_steps,
)
rest_of_resps = [
ChatResponse(
message="",
type="stream",
intermediate_steps=f"{word}",
)
for word in rest_of_output
]
resps = [resp] + rest_of_resps
# Try to send the response, handle potential errors.
try:
# This is to emulate the stream of tokens
for resp in resps:
await self.websocket.send_json(resp.model_dump())
except Exception as exc:
logger.error(f"Error sending response: {exc}")
async def on_tool_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run when tool errors."""
async def on_text(self, text: str, **kwargs: Any) -> Any:
"""Run on arbitrary text."""
# This runs when first sending the prompt
# to the LLM, adding it will send the final prompt
# to the frontend
if "Prompt after formatting" in text:
text = text.replace("Prompt after formatting:\n", "")
text = remove_ansi_escape_codes(text)
resp = PromptResponse(
prompt=text,
)
await self.websocket.send_json(resp.model_dump())
self.chat_service.chat_history.add_message(self.client_id, resp)
async def on_agent_action(self, action: AgentAction, **kwargs: Any):
log = f"Thought: {action.log}"
# if there are line breaks, split them and send them
# as separate messages
if "\n" in log:
logs = log.split("\n")
for log in logs:
resp = ChatResponse(message="", type="stream", intermediate_steps=log)
await self.websocket.send_json(resp.model_dump())
else:
resp = ChatResponse(message="", type="stream", intermediate_steps=log)
await self.websocket.send_json(resp.model_dump())
async def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
"""Run on agent end."""
resp = ChatResponse(
message="",
type="stream",
intermediate_steps=finish.log,
)
await self.websocket.send_json(resp.model_dump())
# https://github.com/hwchase17/chat-langchain/blob/master/callback.py
class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler):
"""Callback handler for streaming LLM responses."""
@property
def ignore_chain(self) -> bool:
"""Whether to ignore chain callbacks."""
return False
def __init__(self, session_id: str):
self.chat_service = get_chat_service()
self.client_id = session_id
self.socketio_service: "SocketIOService" = self.chat_service.socketio_service
self.socketio_service: "SocketIOService" = get_socket_service()
self.sid = session_id
# self.socketio_service = self.chat_service.active_connections[self.client_id]
@ -130,7 +33,9 @@ class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler):
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump())
async def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
async def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> Any:
"""Run when tool starts running."""
resp = ChatResponse(
message="",
@ -168,7 +73,9 @@ class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler):
try:
# This is to emulate the stream of tokens
for resp in resps:
await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump())
await self.socketio_service.emit_token(
to=self.sid, data=resp.model_dump()
)
except Exception as exc:
logger.error(f"Error sending response: {exc}")
@ -194,8 +101,9 @@ class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler):
resp = PromptResponse(
prompt=text,
)
await self.socketio_service.emit_message(to=self.sid, data=resp.model_dump())
self.chat_service.chat_history.add_message(self.client_id, resp)
await self.socketio_service.emit_message(
to=self.sid, data=resp.model_dump()
)
async def on_agent_action(self, action: AgentAction, **kwargs: Any):
log = f"Thought: {action.log}"
@ -205,7 +113,9 @@ class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler):
logs = log.split("\n")
for log in logs:
resp = ChatResponse(message="", type="stream", intermediate_steps=log)
await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump())
await self.socketio_service.emit_token(
to=self.sid, data=resp.model_dump()
)
else:
resp = ChatResponse(message="", type="stream", intermediate_steps=log)
await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump())
@ -218,19 +128,3 @@ class AsyncStreamingLLMCallbackHandleSIO(AsyncCallbackHandler):
intermediate_steps=finish.log,
)
await self.socketio_service.emit_token(to=self.sid, data=resp.model_dump())
class StreamingLLMCallbackHandler(BaseCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, client_id: str):
self.chat_service = get_chat_service()
self.client_id = client_id
self.socketio_service = self.chat_service.active_connections[self.client_id]
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
loop = asyncio.get_event_loop()
coroutine = self.socketio_service.emit_token(to=self.sid, data=resp.model_dump())
asyncio.run_coroutine_threadsafe(coroutine, loop)

View file

@ -1,72 +1,35 @@
import time
from typing import Optional
import uuid
from typing import TYPE_CHECKING, Annotated, Optional
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
HTTPException,
WebSocket,
WebSocketException,
status,
)
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException
from fastapi.responses import StreamingResponse
from loguru import logger
from sqlmodel import Session
from langflow.api.utils import build_and_cache_graph, format_elapsed_time
from langflow.api.utils import (
build_and_cache_graph,
format_elapsed_time,
format_exception_message,
)
from langflow.api.v1.schemas import (
ResultData,
InputValueRequest,
ResultDataResponse,
StreamData,
VertexBuildResponse,
VerticesOrderResponse,
)
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.types import RoutingVertex
from langflow.services.auth.utils import (
get_current_active_user,
get_current_user_for_websocket,
)
from langflow.services.auth.utils import get_current_active_user
from langflow.services.chat.service import ChatService
from langflow.services.deps import get_chat_service, get_session
from langflow.services.deps import get_chat_service, get_session, get_session_service
from langflow.services.monitor.utils import log_vertex_build
if TYPE_CHECKING:
from langflow.graph.vertex.types import ChatVertex
from langflow.services.session.service import SessionService
router = APIRouter(tags=["Chat"])
@router.websocket("/chat/{client_id}")
async def chat(
client_id: str,
websocket: WebSocket,
db: Session = Depends(get_session),
chat_service: "ChatService" = Depends(get_chat_service),
):
"""Websocket endpoint for chat."""
try:
user = await get_current_user_for_websocket(websocket, db)
await websocket.accept()
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized")
elif not user.is_active:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized")
if client_id in chat_service.cache_service:
await chat_service.handle_websocket(client_id, websocket)
else:
# We accept the connection but close it immediately
# if the flow is not built yet
message = "Please, build the flow before sending messages"
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=message)
except WebSocketException as exc:
logger.error(f"Websocket exrror: {exc}")
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))
except Exception as exc:
logger.error(f"Error in chat websocket: {exc}")
messsage = exc.detail if isinstance(exc, HTTPException) else str(exc)
if "Could not validate credentials" in str(exc):
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized")
else:
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=messsage)
async def try_running_celery_task(vertex, user_id):
# Try running the task in celery
# and set the task_id to the local vertex
@ -95,7 +58,7 @@ async def get_vertices(
# First, we need to check if the flow_id is in the cache
graph = None
if cache := chat_service.get_cache(flow_id):
graph: Graph = cache.get("result")
graph = cache.get("result")
graph = build_and_cache_graph(flow_id, session, chat_service, graph)
if stop_component_id:
try:
@ -109,7 +72,9 @@ async def get_vertices(
# Now vertices is a list of lists
# We need to get the id of each vertex
# and return the same structure but only with the ids
return VerticesOrderResponse(ids=vertices)
run_id = uuid.uuid4()
graph.set_run_id(run_id)
return VerticesOrderResponse(ids=vertices, run_id=run_id)
except Exception as exc:
logger.error(f"Error checking build status: {exc}")
@ -122,87 +87,169 @@ async def build_vertex(
flow_id: str,
vertex_id: str,
background_tasks: BackgroundTasks,
inputs: Annotated[Optional[InputValueRequest], Body(embed=True)] = None,
chat_service: "ChatService" = Depends(get_chat_service),
current_user=Depends(get_current_active_user),
):
"""Build a vertex instead of the entire graph."""
{"inputs": {"input_value": "some value"}}
start_time = time.perf_counter()
next_vertices_ids = []
try:
start_time = time.perf_counter()
cache = chat_service.get_cache(flow_id)
if not cache:
# If there's no cache
logger.warning(f"No cache found for {flow_id}. Building graph starting at {vertex_id}")
graph = build_and_cache_graph(flow_id=flow_id, session=next(get_session()), chat_service=chat_service)
logger.warning(
f"No cache found for {flow_id}. Building graph starting at {vertex_id}"
)
graph = build_and_cache_graph(
flow_id=flow_id, session=next(get_session()), chat_service=chat_service
)
else:
graph = cache.get("result")
result_dict = {}
result_data_response = ResultDataResponse(results={})
duration = ""
vertex = graph.get_vertex(vertex_id)
try:
if not vertex.pinned or not vertex._built:
await vertex.build(user_id=current_user.id)
params = vertex._built_object_repr()
valid = True
result_dict = vertex.get_built_result()
# We need to set the artifacts to pass information
# to the frontend
vertex.set_artifacts()
artifacts = vertex.artifacts
result_dict = ResultData(
results=result_dict,
artifacts=artifacts,
)
vertex.set_result(result_dict)
elif vertex.result is not None:
inputs_dict = inputs.model_dump() if inputs else {}
await vertex.build(user_id=current_user.id, inputs=inputs_dict)
if vertex.result is not None:
params = vertex._built_object_repr()
valid = True
result_dict = vertex.result
artifacts = vertex.artifacts
else:
raise ValueError(f"No result found for vertex {vertex_id}")
if isinstance(vertex, RoutingVertex):
if vertex._built_object is True:
next_vertex_id = next(graph.next_vertex_to_build())
else:
next_vertex_id = None
chat_service.set_cache(flow_id, graph)
result_data_response = ResultDataResponse(**result_dict.model_dump())
except Exception as exc:
params = str(exc)
logger.exception(f"Error building vertex: {exc}")
params = format_exception_message(exc)
valid = False
result_dict = ResultData(results={})
result_data_response = ResultDataResponse(results={})
artifacts = {}
# If there's an error building the vertex
# we need to clear the cache
chat_service.clear_cache(flow_id)
# Log the vertex build
background_tasks.add_task(
log_vertex_build,
flow_id=flow_id,
vertex_id=vertex_id,
valid=valid,
params=params,
data=result_dict,
artifacts=artifacts,
)
if not vertex.will_stream:
background_tasks.add_task(
log_vertex_build,
flow_id=flow_id,
vertex_id=vertex_id,
valid=valid,
params=params,
data=result_data_response,
artifacts=artifacts,
)
timedelta = time.perf_counter() - start_time
duration = format_elapsed_time(timedelta)
result_dict.duration = duration
result_dict.timedelta = timedelta
result_data_response.duration = duration
result_data_response.timedelta = timedelta
vertex.add_build_time(timedelta)
inactive_vertices = None
if graph.inactive_vertices:
inactive_vertices = list(graph.inactive_vertices)
graph.reset_inactive_vertices()
chat_service.set_cache(flow_id, graph)
return VertexBuildResponse(
next_vertex_id=next_vertex_id,
build_response = VertexBuildResponse(
next_vertices_ids=next_vertices_ids,
inactive_vertices=inactive_vertices,
valid=valid,
params=params,
id=vertex.id,
data=result_dict,
data=result_data_response,
)
return build_response
except Exception as exc:
logger.error(f"Error building vertex: {exc}")
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc
# Now onto an endpoint that is an SSE endpoint
# it will receive a component_id and a flow_id
#
@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
flow_id: str,
vertex_id: str,
session_id: Optional[str] = None,
chat_service: "ChatService" = Depends(get_chat_service),
session_service: "SessionService" = Depends(get_session_service),
):
"""Build a vertex instead of the entire graph."""
try:
async def stream_vertex():
try:
if not session_id:
cache = chat_service.get_cache(flow_id)
if not cache:
# If there's no cache
raise ValueError(f"No cache found for {flow_id}.")
else:
graph = cache.get("result")
else:
session_data = await session_service.load_session(
session_id, flow_id=flow_id
)
graph, artifacts = session_data if session_data else (None, None)
if not graph:
raise ValueError(f"No graph found for {flow_id}.")
vertex: "ChatVertex" = graph.get_vertex(vertex_id)
if not hasattr(vertex, "stream"):
raise ValueError(f"Vertex {vertex_id} does not support streaming")
if isinstance(vertex._built_result, str) and vertex._built_result:
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
elif not vertex.pinned or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
yield str(stream_data)
elif vertex.result is not None:
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
else:
raise ValueError(f"No result found for vertex {vertex_id}")
except Exception as exc:
logger.error(f"Error building vertex: {exc}")
yield str(StreamData(event="error", data={"error": str(exc)}))
finally:
logger.debug("Closing stream")
yield str(StreamData(event="close", data={"message": "Stream closed"}))
return StreamingResponse(stream_vertex(), media_type="text/event-stream")
except Exception as exc:
raise HTTPException(status_code=500, detail="Error building vertex") from exc

View file

@ -3,112 +3,39 @@ from typing import Annotated, Any, List, Optional, Union
import sqlalchemy as sa
from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, status
from loguru import logger
from sqlmodel import Session, select
from langflow.api.utils import update_frontend_node_with_template_values
from langflow.api.v1.schemas import (
CustomComponentCode,
PreloadResponse,
InputValueRequest,
ProcessResponse,
TaskResponse,
RunResponse,
TaskStatusResponse,
UploadFileResponse,
)
from langflow.interface.custom.custom_component import CustomComponent
from langflow.interface.custom.directory_reader import DirectoryReader
from langflow.interface.custom.utils import build_custom_component_template
from langflow.processing.process import build_graph_and_generate_result, process_graph_cached, process_tweaks
from langflow.processing.process import process_tweaks, run_graph
from langflow.services.auth.utils import api_key_security, get_current_active_user
from langflow.services.cache.utils import save_uploaded_file
from langflow.services.database.models.flow import Flow
from langflow.services.database.models.user.model import User
from langflow.services.deps import get_session, get_session_service, get_settings_service, get_task_service
from langflow.services.deps import (
get_session,
get_session_service,
get_settings_service,
get_task_service,
)
from langflow.services.session.service import SessionService
from loguru import logger
from sqlmodel import select
try:
from langflow.worker import process_graph_cached_task
except ImportError:
def process_graph_cached_task(*args, **kwargs):
raise NotImplementedError("Celery is not installed")
from langflow.services.task.service import TaskService
from sqlmodel import Session
# build router
router = APIRouter(tags=["Base"])
async def process_graph_data(
graph_data: dict,
inputs: Optional[Union[List[dict], dict]] = None,
tweaks: Optional[dict] = None,
clear_cache: bool = False,
session_id: Optional[str] = None,
task_service: "TaskService" = Depends(get_task_service),
sync: bool = True,
):
task_result: Any = None
task_status = None
if tweaks:
try:
graph_data = process_tweaks(graph_data, tweaks)
except Exception as exc:
logger.error(f"Error processing tweaks: {exc}")
if sync:
result = await process_graph_cached(
graph_data,
inputs,
clear_cache,
session_id,
)
task_id = str(id(result))
if isinstance(result, dict) and "result" in result:
task_result = result["result"]
session_id = result["session_id"]
elif hasattr(result, "result") and hasattr(result, "session_id"):
task_result = result.result
session_id = result.session_id
else:
task_result = result
else:
logger.warning(
"This is an experimental feature and may not work as expected."
"Please report any issues to our GitHub repository."
)
if session_id is None:
# Generate a session ID
session_id = get_session_service().generate_key(session_id=session_id, data_graph=graph_data)
task_id, task = await task_service.launch_task(
process_graph_cached_task if task_service.use_celery else process_graph_cached,
graph_data,
inputs,
clear_cache,
session_id,
)
task_status = task.status
if task.status == "FAILURE":
logger.error(f"Task {task_id} failed: {task.traceback}")
task_result = str(task._exception)
else:
task_result = task.result
if task_id:
task_response = TaskResponse(id=task_id, href=f"api/v1/task/{task_id}")
else:
task_response = None
return ProcessResponse(
result=task_result,
status=task_status,
task=task_response,
session_id=session_id,
backend=task_service.backend_name,
)
@router.get("/all", dependencies=[Depends(get_current_active_user)])
def get_all(
settings_service=Depends(get_settings_service),
@ -117,84 +44,91 @@ def get_all(
logger.debug("Building langchain types dict")
try:
return get_all_types_dict(settings_service)
all_types_dict = get_all_types_dict(settings_service)
return all_types_dict
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
@router.post("/process/json", response_model=ProcessResponse)
async def process_json(
session: Annotated[Session, Depends(get_session)],
data: dict,
inputs: Optional[dict] = None,
tweaks: Optional[dict] = None,
clear_cache: Annotated[bool, Body(embed=True)] = False, # noqa: F821
session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821
task_service: "TaskService" = Depends(get_task_service),
sync: Annotated[bool, Body(embed=True)] = True, # noqa: F821
):
try:
return await process_graph_data(
graph_data=data,
inputs=inputs,
tweaks=tweaks,
clear_cache=clear_cache,
session_id=session_id,
task_service=task_service,
sync=sync,
)
except Exception as exc:
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc
# Endpoint to preload a graph
@router.post("/process/preload/{flow_id}", response_model=PreloadResponse)
async def preload_flow(
@router.post(
"/run/{flow_id}", response_model=RunResponse, response_model_exclude_none=True
)
async def run_flow_with_caching(
session: Annotated[Session, Depends(get_session)],
flow_id: str,
session_id: Optional[str] = None,
session_service: SessionService = Depends(get_session_service),
inputs: Optional[InputValueRequest] = None,
tweaks: Optional[dict] = None,
stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821
session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821
api_key_user: User = Depends(api_key_security),
clear_session: Annotated[bool, Body(embed=True)] = False, # noqa: F821
session_service: SessionService = Depends(get_session_service),
):
try:
# Get the flow that matches the flow_id and belongs to the user
# flow = session.query(Flow).filter(Flow.id == flow_id).filter(Flow.user_id == api_key_user.id).first()
if clear_session:
session_service.clear_session(session_id)
# Check if the session exists
session_data = await session_service.load_session(session_id)
# Session data is a tuple of (graph, artifacts)
# or (None, None) if the session is empty
if isinstance(session_data, tuple):
graph, artifacts = session_data
is_clear = graph is None and artifacts is None
else:
is_clear = session_data is None
return PreloadResponse(session_id=session_id, is_clear=is_clear)
if inputs is not None:
input_values_dict: dict[str, Union[str, list[str]]] = inputs.model_dump()
else:
if session_id is None:
session_id = flow_id
flow = session.exec(select(Flow).where(Flow.id == flow_id).where(Flow.user_id == api_key_user.id)).first()
input_values_dict = {}
if session_id:
session_data = await session_service.load_session(
session_id, flow_id=flow_id
)
graph, artifacts = session_data if session_data else (None, None)
task_result: Any = None
if not graph:
raise ValueError("Graph not found in the session")
task_result, session_id = await run_graph(
graph=graph,
flow_id=flow_id,
session_id=session_id,
inputs=input_values_dict,
artifacts=artifacts,
session_service=session_service,
stream=stream,
)
else:
# Get the flow that matches the flow_id and belongs to the user
# flow = session.query(Flow).filter(Flow.id == flow_id).filter(Flow.user_id == api_key_user.id).first()
flow = session.exec(
select(Flow)
.where(Flow.id == flow_id)
.where(Flow.user_id == api_key_user.id)
).first()
if flow is None:
raise ValueError(f"Flow {flow_id} not found")
if flow.data is None:
raise ValueError(f"Flow {flow_id} has no data")
graph_data = flow.data
session_service.clear_session(session_id)
# Load the graph using SessionService
session_data = await session_service.load_session(session_id, graph_data)
graph, artifacts = session_data if session_data else (None, None)
if not graph:
raise ValueError("Graph not found in the session")
_ = await graph.build()
session_service.update_session(session_id, (graph, artifacts))
return PreloadResponse(session_id=session_id)
except Exception as exc:
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc
graph_data = process_tweaks(graph_data, tweaks or {})
task_result, session_id = await run_graph(
graph=graph_data,
flow_id=flow_id,
session_id=session_id,
inputs=input_values_dict,
artifacts={},
session_service=session_service,
stream=stream,
)
return RunResponse(outputs=task_result, session_id=session_id)
except sa.exc.StatementError as exc:
# StatementError('(builtins.ValueError) badly formed hexadecimal UUID string')
if "badly formed hexadecimal UUID string" in str(exc):
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)
) from exc
except ValueError as exc:
if f"Flow {flow_id} not found" in str(exc):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)
) from exc
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)
) from exc
@router.post(
@ -221,84 +155,16 @@ async def process(
"""
Endpoint to process an input with a given flow_id.
"""
try:
if session_id:
session_data = await session_service.load_session(session_id)
graph, artifacts = session_data if session_data else (None, None)
task_result: Any = None
task_status = None
task_id = None
if not graph:
raise ValueError("Graph not found in the session")
result = await build_graph_and_generate_result(
graph=graph,
inputs=inputs,
artifacts=artifacts,
session_id=session_id,
session_service=session_service,
)
task_id = str(id(result))
if isinstance(result, dict) and "result" in result:
task_result = result["result"]
session_id = result["session_id"]
elif hasattr(result, "result") and hasattr(result, "session_id"):
task_result = result.result
session_id = result.session_id
else:
task_result = result
if task_id:
task_response = TaskResponse(id=task_id, href=f"api/v1/task/{task_id}")
else:
task_response = None
return ProcessResponse(
result=task_result,
status=task_status,
task=task_response,
session_id=session_id,
backend=task_service.backend_name,
)
else:
if api_key_user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API Key",
)
# Get the flow that matches the flow_id and belongs to the user
# flow = session.query(Flow).filter(Flow.id == flow_id).filter(Flow.user_id == api_key_user.id).first()
flow = session.exec(select(Flow).where(Flow.id == flow_id).where(Flow.user_id == api_key_user.id)).first()
if flow is None:
raise ValueError(f"Flow {flow_id} not found")
if flow.data is None:
raise ValueError(f"Flow {flow_id} has no data")
graph_data = flow.data
return await process_graph_data(
graph_data=graph_data,
inputs=inputs,
tweaks=tweaks,
clear_cache=clear_cache,
session_id=session_id,
task_service=task_service,
sync=sync,
)
except sa.exc.StatementError as exc:
# StatementError('(builtins.ValueError) badly formed hexadecimal UUID string')
if "badly formed hexadecimal UUID string" in str(exc):
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except ValueError as exc:
if f"Flow {flow_id} not found" in str(exc):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
else:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
except Exception as e:
# Log stack trace
logger.exception(e)
raise HTTPException(status_code=500, detail=str(e)) from e
# Raise a depreciation warning
logger.warning(
"The /process endpoint is deprecated and will be removed in a future version. "
"Please use /run instead."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The /process endpoint is deprecated and will be removed in a future version. "
"Please use /run instead.",
)
@router.get("/task/{task_id}", response_model=TaskStatusResponse)
@ -364,12 +230,16 @@ async def custom_component(
built_frontend_node = build_custom_component_template(component, user_id=user.id)
built_frontend_node = update_frontend_node_with_template_values(built_frontend_node, raw_code.frontend_node)
built_frontend_node = update_frontend_node_with_template_values(
built_frontend_node, raw_code.frontend_node
)
return built_frontend_node
@router.post("/custom_component/reload", status_code=HTTPStatus.OK)
async def reload_custom_component(path: str, user: User = Depends(get_current_active_user)):
async def reload_custom_component(
path: str, user: User = Depends(get_current_active_user)
):
from langflow.interface.custom.utils import build_custom_component_template
try:
@ -391,6 +261,8 @@ async def custom_component_update(
):
component = CustomComponent(code=raw_code.code)
component_node = build_custom_component_template(component, user_id=user.id, update_field=raw_code.field)
component_node = build_custom_component_template(
component, user_id=user.id, update_field=raw_code.field
)
# Update the field
return component_node

View file

@ -4,9 +4,8 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from pydantic import BaseModel, Field, field_serializer, field_validator
from pydantic import BaseModel, Field, field_validator, model_serializer
from langflow.api.utils import serialize_field
from langflow.services.database.models.api_key.model import ApiKeyRead
from langflow.services.database.models.base import orjson_dumps
from langflow.services.database.models.flow import FlowCreate, FlowRead
@ -22,26 +21,6 @@ class BuildStatus(Enum):
IN_PROGRESS = "in_progress"
class GraphData(BaseModel):
"""Data inside the exported flow."""
nodes: List[Dict[str, Any]]
edges: List[Dict[str, Any]]
class ExportedFlow(BaseModel):
"""Exported flow from Langflow."""
description: str
name: str
id: str
data: GraphData
class InputRequest(BaseModel):
input: dict
class TweaksRequest(BaseModel):
tweaks: Optional[Dict[str, Dict[str, str]]] = Field(default_factory=dict)
@ -67,6 +46,26 @@ class ProcessResponse(BaseModel):
backend: Optional[str] = None
class RunResponse(BaseModel):
"""Run response schema."""
outputs: Optional[List[Any]] = None
session_id: Optional[str] = None
@model_serializer(mode="wrap")
def serialize(self, handler):
# Serialize all the outputs if they are base models
if self.outputs:
serialized_outputs = []
for output in self.outputs:
if isinstance(output, BaseModel):
serialized_outputs.append(output.model_dump(exclude_none=True))
else:
serialized_outputs.append(output)
self.outputs = serialized_outputs
return handler(self)
class PreloadResponse(BaseModel):
"""Preload response schema."""
@ -74,9 +73,6 @@ class PreloadResponse(BaseModel):
is_clear: Optional[bool] = None
# TaskStatusResponse(
# status=task.status, result=task.result if task.ready() else None
# )
class TaskStatusResponse(BaseModel):
"""Task status response schema."""
@ -162,7 +158,9 @@ class StreamData(BaseModel):
data: dict
def __str__(self) -> str:
return f"event: {self.event}\ndata: {orjson_dumps(self.data, indent_2=False)}\n\n"
return (
f"event: {self.event}\ndata: {orjson_dumps(self.data, indent_2=False)}\n\n"
)
class CustomComponentCode(BaseModel):
@ -219,28 +217,24 @@ class ApiKeyCreateRequest(BaseModel):
class VerticesOrderResponse(BaseModel):
ids: List[List[str]]
run_id: UUID
class ResultData(BaseModel):
class ResultDataResponse(BaseModel):
results: Optional[Any] = Field(default_factory=dict)
artifacts: Optional[Any] = Field(default_factory=dict)
timedelta: Optional[float] = None
duration: Optional[str] = None
@field_serializer("results")
def serialize_results(self, value):
if isinstance(value, dict):
return {key: serialize_field(val) for key, val in value.items()}
return serialize_field(value)
class VertexBuildResponse(BaseModel):
id: Optional[str] = None
next_vertex_id: Optional[str] = None
next_verteices_ids: Optional[List[str]] = None
inactive_vertices: Optional[List[str]] = None
valid: bool
params: Optional[str]
"""JSON string of the params."""
data: ResultData
data: ResultDataResponse
"""Mapping of vertex ids to result dict containing the param name and result value."""
timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow)
"""Timestamp of the build."""
@ -248,3 +242,7 @@ class VertexBuildResponse(BaseModel):
class VerticesBuiltResponse(BaseModel):
vertices: List[VertexBuildResponse]
class InputValueRequest(BaseModel):
input_value: str

View file

@ -16,6 +16,7 @@ from langflow.field_typing.range_spec import RangeSpec
class ConversationalAgent(CustomComponent):
display_name: str = "OpenAI Conversational Agent"
description: str = "Conversational Agent that can use OpenAI's function calling API"
icon = "OpenAI"
def build_config(self):
openai_function_models = [

View file

@ -1,9 +1,9 @@
from typing import Callable, Optional, Union
from typing import Optional
from langchain.chains import ConversationChain
from langflow import CustomComponent
from langflow.field_typing import BaseLanguageModel, BaseMemory, Chain, Text
from langflow.field_typing import BaseLanguageModel, BaseMemory, Text
class ConversationChainComponent(CustomComponent):
@ -23,10 +23,10 @@ class ConversationChainComponent(CustomComponent):
def build(
self,
inputs: str,
input_value: Text,
llm: BaseLanguageModel,
memory: Optional[BaseMemory] = None,
) -> Union[Chain, Callable, Text]:
) -> Text:
if memory is None:
chain = ConversationChain(llm=llm)
else:

View file

@ -1,8 +1,14 @@
from typing import Callable, Optional, Union
from typing import Optional
from langchain.chains import LLMChain
from langflow import CustomComponent
from langflow.field_typing import BaseLanguageModel, BaseMemory, BasePromptTemplate, Chain, Text
from langflow.field_typing import (
BaseLanguageModel,
BaseMemory,
BasePromptTemplate,
Text,
)
class LLMChainComponent(CustomComponent):
@ -22,5 +28,10 @@ class LLMChainComponent(CustomComponent):
prompt: BasePromptTemplate,
llm: BaseLanguageModel,
memory: Optional[BaseMemory] = None,
) -> Union[Chain, Callable, Text]:
return LLMChain(prompt=prompt, llm=llm, memory=memory)
) -> Text:
runnable = LLMChain(prompt=prompt, llm=llm, memory=memory)
result_dict = runnable.invoke({})
output_key = runnable.output_key
result = result_dict[output_key]
self.status = result
return result

View file

@ -1,8 +1,7 @@
from typing import Callable, Union
from langchain.chains import LLMCheckerChain
from langflow import CustomComponent
from langflow.field_typing import BaseLanguageModel, Chain
from langflow.field_typing import BaseLanguageModel, Text
class LLMCheckerChainComponent(CustomComponent):
@ -17,6 +16,12 @@ class LLMCheckerChainComponent(CustomComponent):
def build(
self,
input_value: Text,
llm: BaseLanguageModel,
) -> Union[Chain, Callable]:
return LLMCheckerChain.from_llm(llm=llm)
) -> Text:
chain = LLMCheckerChain.from_llm(llm=llm)
response = chain.invoke({chain.input_key: input_value})
result = response.get(chain.output_key, "")
result_str = Text(result)
self.status = result_str
return result_str

View file

@ -1,9 +1,9 @@
from typing import Callable, Optional, Union
from typing import Optional
from langchain.chains import LLMChain, LLMMathChain
from langflow import CustomComponent
from langflow.field_typing import BaseLanguageModel, BaseMemory, Chain
from langflow.field_typing import BaseLanguageModel, BaseMemory, Text
class LLMMathChainComponent(CustomComponent):
@ -22,10 +22,22 @@ class LLMMathChainComponent(CustomComponent):
def build(
self,
input_value: Text,
llm: BaseLanguageModel,
llm_chain: LLMChain,
input_key: str = "question",
output_key: str = "answer",
memory: Optional[BaseMemory] = None,
) -> Union[LLMMathChain, Callable, Chain]:
return LLMMathChain(llm=llm, llm_chain=llm_chain, input_key=input_key, output_key=output_key, memory=memory)
) -> Text:
chain = LLMMathChain(
llm=llm,
llm_chain=llm_chain,
input_key=input_key,
output_key=output_key,
memory=memory,
)
response = chain.invoke({input_key: input_value})
result = response.get(output_key)
result_str = Text(result)
self.status = result_str
return result_str

View file

@ -1,32 +0,0 @@
from langchain.llms.base import BaseLLM
from langchain.prompts import PromptTemplate
from langchain_core.messages import BaseMessage
from langflow import CustomComponent
from langflow.field_typing import Text
class PromptRunner(CustomComponent):
display_name: str = "Prompt Runner"
description: str = "Run a Chain with the given PromptTemplate"
beta: bool = True
field_config = {
"llm": {"display_name": "LLM"},
"prompt": {
"display_name": "Prompt Template",
"info": "Make sure the prompt has all variables filled.",
},
"code": {"show": False},
}
def build(self, llm: BaseLLM, prompt: PromptTemplate, inputs: dict = {}) -> Text:
chain = prompt | llm
# The input is an empty dict because the prompt is already filled
result_message: BaseMessage = chain.invoke(input=inputs)
if hasattr(result_message, "content"):
result: str = result_message.content
elif isinstance(result_message, str):
result = result_message
else:
result = str(result_message)
self.repr_value = result
return result

View file

@ -1,8 +1,9 @@
from typing import Callable, Optional, Union
from typing import Optional
from langchain.chains.combine_documents.base import BaseCombineDocumentsChain
from langchain.chains.retrieval_qa.base import BaseRetrievalQA, RetrievalQA
from langchain.chains.retrieval_qa.base import RetrievalQA
from langchain_core.documents import Document
from langflow import CustomComponent
from langflow.field_typing import BaseMemory, BaseRetriever, Text
@ -19,19 +20,22 @@ class RetrievalQAComponent(CustomComponent):
"input_key": {"display_name": "Input Key", "advanced": True},
"output_key": {"display_name": "Output Key", "advanced": True},
"return_source_documents": {"display_name": "Return Source Documents"},
"inputs": {"display_name": "Input", "input_types": ["Text", "Document"]},
"input_value": {
"display_name": "Input",
"input_types": ["Text", "Document"],
},
}
def build(
self,
combine_documents_chain: BaseCombineDocumentsChain,
retriever: BaseRetriever,
inputs: str = "",
input_value: str = "",
memory: Optional[BaseMemory] = None,
input_key: str = "query",
output_key: str = "result",
return_source_documents: bool = True,
) -> Union[BaseRetrievalQA, Callable, Text]:
) -> Text:
runnable = RetrievalQA(
combine_documents_chain=combine_documents_chain,
retriever=retriever,
@ -40,11 +44,19 @@ class RetrievalQAComponent(CustomComponent):
output_key=output_key,
return_source_documents=return_source_documents,
)
if isinstance(inputs, Document):
inputs = inputs.page_content
if isinstance(input_value, Document):
input_value = input_value.page_content
self.status = runnable
result = runnable.invoke({input_key: inputs})
result = runnable.invoke({input_key: input_value})
result = result.content if hasattr(result, "content") else result
# Result is a dict with keys "query", "result" and "source_documents"
# for now we just return the result
return result.get("result")
records = self.to_records(result.get("source_documents"))
references_str = ""
if return_source_documents:
references_str = self.create_references_from_records(records)
result_str = result.get("result", "")
final_result = "\n".join([Text(result_str), references_str])
self.status = final_result
return final_result # OK

View file

@ -1,11 +1,10 @@
from typing import Optional
from langchain.chains import RetrievalQAWithSourcesChain
from langchain.chains.qa_with_sources.base import BaseQAWithSourcesChain
from langchain.chains.combine_documents.base import BaseCombineDocumentsChain
from langchain_core.documents import Document
from langflow import CustomComponent
from langflow.field_typing import BaseLanguageModel, BaseMemory, BaseRetriever
from langflow.field_typing import BaseLanguageModel, BaseMemory, BaseRetriever, Text
class RetrievalQAWithSourcesChainComponent(CustomComponent):
@ -18,25 +17,42 @@ class RetrievalQAWithSourcesChainComponent(CustomComponent):
"chain_type": {
"display_name": "Chain Type",
"options": ["stuff", "map_reduce", "map_rerank", "refine"],
"info": "The type of chain to use to combined Documents.",
},
"memory": {"display_name": "Memory"},
"return_source_documents": {"display_name": "Return Source Documents"},
"retriever": {"display_name": "Retriever"},
}
def build(
self,
input_value: Text,
retriever: BaseRetriever,
llm: BaseLanguageModel,
combine_documents_chain: BaseCombineDocumentsChain,
chain_type: str,
memory: Optional[BaseMemory] = None,
return_source_documents: Optional[bool] = True,
) -> BaseQAWithSourcesChain:
return RetrievalQAWithSourcesChain.from_chain_type(
) -> Text:
runnable = RetrievalQAWithSourcesChain.from_chain_type(
llm=llm,
chain_type=chain_type,
combine_documents_chain=combine_documents_chain,
memory=memory,
return_source_documents=return_source_documents,
retriever=retriever,
)
if isinstance(input_value, Document):
input_value = input_value.page_content
self.status = runnable
input_key = runnable.input_keys[0]
result = runnable.invoke({input_key: input_value})
result = result.content if hasattr(result, "content") else result
# Result is a dict with keys "query", "result" and "source_documents"
# for now we just return the result
records = self.to_records(result.get("source_documents"))
references_str = ""
if return_source_documents:
references_str = self.create_references_from_records(records)
result_str = Text(result.get("answer", ""))
final_result = "\n".join([result_str, references_str])
self.status = final_result
return final_result

View file

@ -1,25 +0,0 @@
from langflow import CustomComponent
from typing import Callable, Union
from langflow.field_typing import BasePromptTemplate, BaseLanguageModel, Chain
from langchain_community.utilities.sql_database import SQLDatabase
from langchain_experimental.sql.base import SQLDatabaseChain
class SQLDatabaseChainComponent(CustomComponent):
display_name = "SQLDatabaseChain"
description = ""
def build_config(self):
return {
"db": {"display_name": "Database"},
"llm": {"display_name": "LLM"},
"prompt": {"display_name": "Prompt"},
}
def build(
self,
db: SQLDatabase,
llm: BaseLanguageModel,
prompt: BasePromptTemplate,
) -> Union[Chain, Callable, SQLDatabaseChain]:
return SQLDatabaseChain.from_llm(llm=llm, db=db, prompt=prompt)

View file

@ -0,0 +1,57 @@
from typing import Optional
from langchain.chains import create_sql_query_chain
from langchain_community.utilities.sql_database import SQLDatabase
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import Runnable
from langflow import CustomComponent
from langflow.field_typing import BaseLanguageModel, Text
class SQLGeneratorComponent(CustomComponent):
display_name = "Natural Language to SQL"
description = "Generate SQL from natural language."
def build_config(self):
return {
"db": {"display_name": "Database"},
"llm": {"display_name": "LLM"},
"prompt": {
"display_name": "Prompt",
"info": "The prompt must contain `{question}`.",
},
"top_k": {
"display_name": "Top K",
"info": "The number of results per select statement to return. If 0, no limit.",
},
}
def build(
self,
input_value: Text,
db: SQLDatabase,
llm: BaseLanguageModel,
top_k: int = 5,
prompt: Optional[Text] = None,
) -> Text:
if prompt:
prompt_template = PromptTemplate.from_template(template=prompt)
else:
prompt_template = None
if top_k < 1:
raise ValueError("Top K must be greater than 0.")
if not prompt_template:
sql_query_chain = create_sql_query_chain(llm=llm, db=db, k=top_k)
else:
# Check if {question} is in the prompt
if "{question}" not in prompt_template.template or "question" not in prompt_template.input_variables:
raise ValueError("Prompt must contain `{question}` to be used with Natural Language to SQL.")
sql_query_chain = create_sql_query_chain(llm=llm, db=db, prompt=prompt_template, k=top_k)
query_writer: Runnable = sql_query_chain | {"query": lambda x: x.replace("SQLQuery:", "").strip()}
response = query_writer.invoke({"question": input_value})
query = response.get("query")
self.status = query
return query

View file

@ -1,42 +0,0 @@
from typing import Any, Dict, List
from langchain.docstore.document import Document
from langchain.document_loaders.directory import DirectoryLoader
from langflow import CustomComponent
class DirectoryLoaderComponent(CustomComponent):
display_name = "DirectoryLoader"
description = "Load from a directory."
def build_config(self) -> Dict[str, Any]:
return {
"glob": {"display_name": "Glob Pattern", "value": "**/*.txt"},
"load_hidden": {"display_name": "Load Hidden Files", "value": False, "advanced": True},
"max_concurrency": {"display_name": "Max Concurrency", "value": 10, "advanced": True},
"metadata": {"display_name": "Metadata", "value": {}},
"path": {"display_name": "Local Directory"},
"recursive": {"display_name": "Recursive", "value": True, "advanced": True},
"silent_errors": {"display_name": "Silent Errors", "value": False, "advanced": True},
"use_multithreading": {"display_name": "Use Multithreading", "value": True, "advanced": True},
}
def build(
self,
glob: str,
path: str,
max_concurrency: int = 2,
load_hidden: bool = False,
recursive: bool = True,
silent_errors: bool = False,
use_multithreading: bool = True,
) -> List[Document]:
return DirectoryLoader(
glob=glob,
path=path,
load_hidden=load_hidden,
max_concurrency=max_concurrency,
recursive=recursive,
silent_errors=silent_errors,
use_multithreading=use_multithreading,
).load()

View file

@ -1,5 +1,7 @@
from langchain_core.documents import Document
from typing import List
from langflow import CustomComponent
from langflow.schema import Record
from langflow.utils.constants import LOADERS_INFO
@ -9,7 +11,9 @@ class FileLoaderComponent(CustomComponent):
beta = True
def build_config(self):
loader_options = ["Automatic"] + [loader_info["name"] for loader_info in LOADERS_INFO]
loader_options = ["Automatic"] + [
loader_info["name"] for loader_info in LOADERS_INFO
]
file_types = []
suffixes = []
@ -38,6 +42,7 @@ class FileLoaderComponent(CustomComponent):
"srt",
"eml",
"md",
"mdx",
"pptx",
"docx",
],
@ -55,6 +60,7 @@ class FileLoaderComponent(CustomComponent):
".srt",
".eml",
".md",
".mdx",
".pptx",
".docx",
],
@ -71,10 +77,10 @@ class FileLoaderComponent(CustomComponent):
"code": {"show": False},
}
def build(self, file_path: str, loader: str) -> Document:
def build(self, file_path: str, loader: str) -> List[Record]:
file_type = file_path.split(".")[-1]
# Mapeie o nome do loader selecionado para suas informações
# Map the loader to the correct loader class
selected_loader_info = None
for loader_info in LOADERS_INFO:
if loader_info["name"] == loader:
@ -85,7 +91,7 @@ class FileLoaderComponent(CustomComponent):
raise ValueError(f"Loader {loader} not found in the loader info list")
if loader == "Automatic":
# Determine o loader automaticamente com base na extensão do arquivo
# Determine the loader based on the file type
default_loader_info = None
for info in LOADERS_INFO:
if "defaultFor" in info and file_type in info["defaultFor"]:
@ -99,15 +105,20 @@ class FileLoaderComponent(CustomComponent):
if isinstance(selected_loader_info, dict):
loader_import: str = selected_loader_info["import"]
else:
raise ValueError(f"Loader info for {loader} is not a dict\nLoader info:\n{selected_loader_info}")
raise ValueError(
f"Loader info for {loader} is not a dict\nLoader info:\n{selected_loader_info}"
)
module_name, class_name = loader_import.rsplit(".", 1)
try:
# Importe o loader dinamicamente
# Import the loader class
loader_module = __import__(module_name, fromlist=[class_name])
loader_instance = getattr(loader_module, class_name)
except ImportError as e:
raise ValueError(f"Loader {loader} could not be imported\nLoader info:\n{selected_loader_info}") from e
raise ValueError(
f"Loader {loader} could not be imported\nLoader info:\n{selected_loader_info}"
) from e
result = loader_instance(file_path=file_path)
return result.load()
docs = result.load()
return self.to_records(docs)

View file

@ -0,0 +1,161 @@
from concurrent import futures
from pathlib import Path
from typing import Any, Dict, List, Optional, Text
from langflow import CustomComponent
from langflow.schema import Record
class GatherRecordsComponent(CustomComponent):
display_name = "Gather Records"
description = "Gather records from a directory."
def build_config(self) -> Dict[str, Any]:
return {
"path": {"display_name": "Path"},
"types": {
"display_name": "Types",
"info": "File types to load. Leave empty to load all types.",
},
"depth": {"display_name": "Depth", "info": "Depth to search for files."},
"max_concurrency": {"display_name": "Max Concurrency", "advanced": True},
"load_hidden": {
"display_name": "Load Hidden",
"advanced": True,
"info": "If true, hidden files will be loaded.",
},
"recursive": {
"display_name": "Recursive",
"advanced": True,
"info": "If true, the search will be recursive.",
},
"silent_errors": {
"display_name": "Silent Errors",
"advanced": True,
"info": "If true, errors will not raise an exception.",
},
"use_multithreading": {
"display_name": "Use Multithreading",
"advanced": True,
},
}
def is_hidden(self, path: Path) -> bool:
return path.name.startswith(".")
def retrieve_file_paths(
self,
path: str,
types: List[str],
load_hidden: bool,
recursive: bool,
depth: int,
) -> List[str]:
path_obj = Path(path)
if not path_obj.exists() or not path_obj.is_dir():
raise ValueError(f"Path {path} must exist and be a directory.")
def match_types(p: Path) -> bool:
return any(p.suffix == f".{t}" for t in types) if types else True
def is_not_hidden(p: Path) -> bool:
return not self.is_hidden(p) or load_hidden
def walk_level(directory: Path, max_depth: int):
directory = directory.resolve()
prefix_length = len(directory.parts)
for p in directory.rglob("*" if recursive else "[!.]*"):
if len(p.parts) - prefix_length <= max_depth:
yield p
glob = "**/*" if recursive else "*"
paths = walk_level(path_obj, depth) if depth else path_obj.glob(glob)
file_paths = [
Text(p)
for p in paths
if p.is_file() and match_types(p) and is_not_hidden(p)
]
return file_paths
def parse_file_to_record(
self, file_path: str, silent_errors: bool
) -> Optional[Record]:
# Use the partition function to load the file
from unstructured.partition.auto import partition # type: ignore
try:
elements = partition(file_path)
except Exception as e:
if not silent_errors:
raise ValueError(f"Error loading file {file_path}: {e}") from e
return None
# Create a Record
text = "\n\n".join([Text(el) for el in elements])
metadata = elements.metadata if hasattr(elements, "metadata") else {}
metadata["file_path"] = file_path
record = Record(text=text, data=metadata)
return record
def get_elements(
self,
file_paths: List[str],
silent_errors: bool,
max_concurrency: int,
use_multithreading: bool,
) -> List[Optional[Record]]:
if use_multithreading:
records = self.parallel_load_records(
file_paths, silent_errors, max_concurrency
)
else:
records = [
self.parse_file_to_record(file_path, silent_errors)
for file_path in file_paths
]
records = list(filter(None, records))
return records
def parallel_load_records(
self, file_paths: List[str], silent_errors: bool, max_concurrency: int
) -> List[Optional[Record]]:
with futures.ThreadPoolExecutor(max_workers=max_concurrency) as executor:
loaded_files = executor.map(
lambda file_path: self.parse_file_to_record(file_path, silent_errors),
file_paths,
)
# loaded_files is an iterator, so we need to convert it to a list
return list(loaded_files)
def build(
self,
path: str,
types: Optional[List[str]] = None,
depth: int = 0,
max_concurrency: int = 2,
load_hidden: bool = False,
recursive: bool = True,
silent_errors: bool = False,
use_multithreading: bool = True,
) -> List[Optional[Record]]:
if types is None:
types = []
resolved_path = self.resolve_path(path)
file_paths = self.retrieve_file_paths(
resolved_path, types, load_hidden, recursive, depth
)
loaded_records = []
if use_multithreading:
loaded_records = self.parallel_load_records(
file_paths, silent_errors, max_concurrency
)
else:
loaded_records = [
self.parse_file_to_record(file_path, silent_errors)
for file_path in file_paths
]
loaded_records = list(filter(None, loaded_records))
self.status = loaded_records
return loaded_records

View file

@ -1,7 +1,9 @@
from typing import Optional
from langchain.embeddings import BedrockEmbeddings
from langchain.embeddings.base import Embeddings
from langchain_community.embeddings import BedrockEmbeddings
from langflow import CustomComponent

View file

@ -9,6 +9,7 @@ class AzureOpenAIEmbeddingsComponent(CustomComponent):
description: str = "Embeddings model from Azure OpenAI."
documentation: str = "https://python.langchain.com/docs/integrations/text_embedding/azureopenai"
beta = False
icon = "Azure"
API_VERSION_OPTIONS = [
"2022-12-01",

View file

@ -9,6 +9,7 @@ class HuggingFaceEmbeddingsComponent(CustomComponent):
documentation = (
"https://python.langchain.com/docs/modules/data_connection/text_embedding/integrations/sentence_transformers"
)
icon = "HuggingFace"
def build_config(self):
return {

View file

@ -9,6 +9,7 @@ class HuggingFaceInferenceAPIEmbeddingsComponent(CustomComponent):
display_name = "HuggingFaceInferenceAPIEmbeddings"
description = "HuggingFace sentence_transformers embedding models, API version."
documentation = "https://github.com/huggingface/text-embeddings-inference"
icon = "HuggingFace"
def build_config(self):
return {

View file

@ -1,5 +1,5 @@
from langflow import CustomComponent
from langchain.embeddings import VertexAIEmbeddings
from langchain_community.embeddings import VertexAIEmbeddings
from typing import Optional, List
@ -9,17 +9,45 @@ class VertexAIEmbeddingsComponent(CustomComponent):
def build_config(self):
return {
"credentials": {"display_name": "Credentials", "value": "", "file_types": [".json"], "field_type": "file"},
"instance": {"display_name": "instance", "advanced": True, "field_type": "dict"},
"location": {"display_name": "Location", "value": "us-central1", "advanced": True},
"credentials": {
"display_name": "Credentials",
"value": "",
"file_types": [".json"],
"field_type": "file",
},
"instance": {
"display_name": "instance",
"advanced": True,
"field_type": "dict",
},
"location": {
"display_name": "Location",
"value": "us-central1",
"advanced": True,
},
"max_output_tokens": {"display_name": "Max Output Tokens", "value": 128},
"max_retries": {"display_name": "Max Retries", "value": 6, "advanced": True},
"model_name": {"display_name": "Model Name", "value": "textembedding-gecko"},
"max_retries": {
"display_name": "Max Retries",
"value": 6,
"advanced": True,
},
"model_name": {
"display_name": "Model Name",
"value": "textembedding-gecko",
},
"n": {"display_name": "N", "value": 1, "advanced": True},
"project": {"display_name": "Project", "advanced": True},
"request_parallelism": {"display_name": "Request Parallelism", "value": 5, "advanced": True},
"request_parallelism": {
"display_name": "Request Parallelism",
"value": 5,
"advanced": True,
},
"stop": {"display_name": "Stop", "advanced": True},
"streaming": {"display_name": "Streaming", "value": False, "advanced": True},
"streaming": {
"display_name": "Streaming",
"value": False,
"advanced": True,
},
"temperature": {"display_name": "Temperature", "value": 0.0},
"top_k": {"display_name": "Top K", "value": 40, "advanced": True},
"top_p": {"display_name": "Top P", "value": 0.95, "advanced": True},

View file

@ -1,61 +1,26 @@
from typing import Optional, Union
from langflow import CustomComponent
from langflow.components.io.base.chat import ChatComponent
from langflow.field_typing import Text
from langflow.schema import Record
class ChatInput(CustomComponent):
class ChatInput(ChatComponent):
display_name = "Chat Input"
description = "Used to get user input from the chat."
def build_config(self):
return {
"message": {
"input_types": ["Text"],
"display_name": "Message",
"multiline": True,
},
"sender": {
"options": ["Machine", "User"],
"display_name": "Sender Type",
},
"sender_name": {"display_name": "Sender Name"},
"session_id": {
"display_name": "Session ID",
"info": "Session ID of the chat history.",
},
"as_record": {
"display_name": "As Record",
"info": "If true, the message will be returned as a Record.",
},
}
def build(
self,
sender: Optional[str] = "User",
sender_name: Optional[str] = "User",
message: Optional[str] = None,
as_record: Optional[bool] = False,
input_value: Optional[str] = None,
session_id: Optional[str] = None,
return_record: Optional[bool] = False,
) -> Union[Text, Record]:
self.status = message
if as_record:
if isinstance(message, Record):
# Update the data of the record
message.data["sender"] = sender
message.data["sender_name"] = sender_name
message.data["session_id"] = session_id
return message
return Record(
text=message,
data={
"sender": sender,
"sender_name": sender_name,
"session_id": session_id,
},
)
if not message:
message = ""
self.status = message
return message
return super().build(
sender=sender,
sender_name=sender_name,
input_value=input_value,
session_id=session_id,
return_record=return_record,
)

View file

@ -1,65 +1,26 @@
from typing import Optional, Union
from langflow import CustomComponent
from langflow.components.io.base.chat import ChatComponent
from langflow.field_typing import Text
from langflow.schema import Record
class ChatOutput(CustomComponent):
class ChatOutput(ChatComponent):
display_name = "Chat Output"
description = "Used to send a message to the chat."
field_config = {
"code": {
"show": True,
}
}
def build_config(self):
return {
"message": {"input_types": ["Text"], "display_name": "Message"},
"sender": {
"options": ["Machine", "User"],
"display_name": "Sender Type",
},
"sender_name": {"display_name": "Sender Name"},
"session_id": {
"display_name": "Session ID",
"info": "Session ID of the chat history.",
"input_types": ["Text"],
},
"as_record": {
"display_name": "As Record",
"info": "If true, the message will be returned as a Record.",
},
}
def build(
self,
sender: Optional[str] = "Machine",
sender_name: Optional[str] = "AI",
input_value: Optional[str] = None,
session_id: Optional[str] = None,
message: Optional[str] = None,
as_record: Optional[bool] = False,
return_record: Optional[bool] = False,
) -> Union[Text, Record]:
self.status = message
if as_record:
if isinstance(message, Record):
# Update the data of the record
message.data["sender"] = sender
message.data["sender_name"] = sender_name
message.data["session_id"] = session_id
return message
return Record(
text=message,
data={
"sender": sender,
"sender_name": sender_name,
"session_id": session_id,
},
)
if not message:
message = ""
self.status = message
return message
return super().build(
sender=sender,
sender_name=sender_name,
input_value=input_value,
session_id=session_id,
return_record=return_record,
)

View file

@ -1,71 +0,0 @@
from typing import List, Optional
from langflow import CustomComponent
from langflow.field_typing import Text
from langflow.memory import add_messages
from langflow.schema import Record
class StoreMessages(CustomComponent):
display_name = "Store Messages"
description = "Used to store messages."
def build_config(self):
return {
"records": {
"display_name": "Records",
"info": "The list of records to store. Each record should contain the keys 'sender', 'sender_name', and 'session_id'.",
},
"texts": {
"display_name": "Texts",
"info": "The list of texts to store. If records is not provided, texts must be provided.",
},
"session_id": {
"display_name": "Session ID",
"info": "The session ID to store.",
},
"sender": {
"display_name": "Sender",
"info": "The sender to store.",
},
"sender_name": {
"display_name": "Sender Name",
"info": "The sender name to store.",
},
}
def build(
self,
records: Optional[List[Record]] = None,
texts: Optional[List[Text]] = None,
session_id: Optional[str] = None,
sender: Optional[str] = None,
sender_name: Optional[str] = None,
) -> List[Record]:
# Records is the main way to store messages
# If records is not provided, we can use texts
# but we need to create the records from the texts
# and the other parameters
if not texts and not records:
raise ValueError("Either texts or records must be provided.")
if not records:
records = []
if not session_id or not sender or not sender_name:
raise ValueError("If passing texts, session_id, sender, and sender_name must be provided.")
for text in texts:
record = Record(
text=text,
data={
"session_id": session_id,
"sender": sender,
"sender_name": sender_name,
},
)
records.append(record)
elif isinstance(records, Record):
records = [records]
self.status = records
records = add_messages(records)
return records

View file

@ -1,22 +1,12 @@
from typing import Optional
from langflow import CustomComponent
from langflow.components.io.base.text import TextComponent
from langflow.field_typing import Text
class TextInput(CustomComponent):
class TextInput(TextComponent):
display_name = "Text Input"
description = "Used to pass text input to the next component."
field_config = {
"code": {
"show": False,
},
"value": {"display_name": "Value"},
}
def build(self, value: Optional[str] = "") -> Text:
self.status = value
if not value:
value = ""
return value
def build(self, input_value: Optional[str] = "") -> Text:
return super().build(input_value=input_value)

View file

@ -0,0 +1,16 @@
from typing import Optional
from langflow.components.io.base.text import TextComponent
from langflow.field_typing import Text
class TextOutput(TextComponent):
display_name = "Text Output"
description = "Used to pass text output to the next component."
field_config = {
"input_value": {"display_name": "Value"},
}
def build(self, input_value: Optional[Text] = "") -> Text:
return super().build(input_value=input_value)

View file

@ -0,0 +1,107 @@
import warnings
from typing import Optional, Union
from langflow import CustomComponent
from langflow.field_typing import Text
from langflow.memory import add_messages
from langflow.schema import Record
class ChatComponent(CustomComponent):
display_name = "Chat Component"
description = "Use as base for chat components."
def build_config(self):
return {
"input_value": {
"input_types": ["Text"],
"display_name": "Message",
"multiline": True,
},
"sender": {
"options": ["Machine", "User"],
"display_name": "Sender Type",
},
"sender_name": {"display_name": "Sender Name"},
"session_id": {
"display_name": "Session ID",
"info": "If provided, the message will be stored in the memory.",
},
"return_record": {
"display_name": "Return Record",
"info": "Return the message as a record containing the sender, sender_name, and session_id.",
},
}
def store_message(
self,
message: Union[str, Text, Record],
session_id: Optional[str] = None,
sender: Optional[str] = None,
sender_name: Optional[str] = None,
) -> list[Record]:
if not message:
warnings.warn("No message provided.")
return []
if not session_id or not sender or not sender_name:
raise ValueError(
"All of session_id, sender, and sender_name must be provided."
)
if isinstance(message, Record):
record = message
record.data.update(
{
"session_id": session_id,
"sender": sender,
"sender_name": sender_name,
}
)
else:
record = Record(
text=message,
data={
"session_id": session_id,
"sender": sender,
"sender_name": sender_name,
},
)
self.status = record
records = add_messages([record])
return records[0]
def build(
self,
sender: Optional[str] = "User",
sender_name: Optional[str] = "User",
input_value: Optional[str] = None,
session_id: Optional[str] = None,
return_record: Optional[bool] = False,
) -> Union[Text, Record]:
input_value_record: Optional[Record] = None
if return_record:
if isinstance(input_value, Record):
# Update the data of the record
input_value.data["sender"] = sender
input_value.data["sender_name"] = sender_name
input_value.data["session_id"] = session_id
else:
input_value_record = Record(
text=input_value,
data={
"sender": sender,
"sender_name": sender_name,
"session_id": session_id,
},
)
if not input_value:
input_value = ""
if return_record and input_value_record:
result: Union[Text, Record] = input_value_record
else:
result = input_value
self.status = result
if session_id:
self.store_message(result, session_id, sender, sender_name)
return result

View file

@ -0,0 +1,19 @@
from typing import Optional
from langflow import CustomComponent
from langflow.field_typing import Text
class TextComponent(CustomComponent):
display_name = "Text Component"
description = "Used to pass text to the next component."
field_config = {
"input_value": {"display_name": "Value", "multiline": True},
}
def build(self, input_value: Optional[str] = "") -> Text:
self.status = input_value
if not input_value:
input_value = ""
return input_value

View file

@ -12,14 +12,10 @@ class MessageHistoryComponent(CustomComponent):
def build_config(self):
return {
"sender": {
"options": ["Machine", "User"],
"options": ["Machine", "User", "Machine and User"],
"display_name": "Sender Type",
},
"sender_name": {"display_name": "Sender Name"},
"file_path": {
"display_name": "File Path",
"info": "Path of the local JSON file to store the messages. It should be a unique path for each chat history.",
},
"n_messages": {
"display_name": "Number of Messages",
"info": "Number of messages to retrieve.",
@ -38,6 +34,8 @@ class MessageHistoryComponent(CustomComponent):
session_id: Optional[str] = None,
n_messages: int = 5,
) -> List[Record]:
if sender == "Machine and User":
sender = None
messages = get_messages(
sender=sender,
sender_name=sender_name,

View file

@ -1,13 +1,16 @@
from typing import Optional
from langchain.llms.base import BaseLLM
from langchain.llms.bedrock import Bedrock
from langchain_community.llms.bedrock import Bedrock
from langflow import CustomComponent
class AmazonBedrockComponent(CustomComponent):
display_name: str = "Amazon Bedrock"
description: str = "LLM model from Amazon Bedrock."
icon = "Amazon"
def build_config(self):
return {

View file

@ -10,6 +10,7 @@ from langflow import CustomComponent
class AnthropicLLM(CustomComponent):
display_name: str = "AnthropicLLM"
description: str = "Anthropic Chat&Completion large language models."
icon = "Anthropic"
def build_config(self):
return {

View file

@ -10,6 +10,7 @@ from langflow.field_typing import BaseLanguageModel, NestedDict
class AnthropicComponent(CustomComponent):
display_name = "Anthropic"
description = "Anthropic large language models."
icon = "Anthropic"
def build_config(self):
return {

View file

@ -1,14 +1,17 @@
from typing import Optional
from langflow import CustomComponent
from langchain.llms.base import BaseLanguageModel
from langchain_community.chat_models.azure_openai import AzureChatOpenAI
from langflow import CustomComponent
class AzureChatOpenAIComponent(CustomComponent):
class AzureChatOpenAISpecsComponent(CustomComponent):
display_name: str = "AzureChatOpenAI"
description: str = "LLM model from Azure OpenAI."
documentation: str = "https://python.langchain.com/docs/integrations/llms/azure_openai"
beta = False
icon = "Azure"
AZURE_OPENAI_MODELS = [
"gpt-35-turbo",

View file

@ -9,6 +9,7 @@ class ChatAnthropicComponent(CustomComponent):
display_name = "ChatAnthropic"
description = "`Anthropic` chat large language models."
documentation = "https://python.langchain.com/docs/modules/model_io/models/chat/integrations/anthropic"
icon = "Anthropic"
def build_config(self):
return {

View file

@ -1,6 +1,7 @@
from typing import Any, Callable, Dict, Optional, Union
from langchain_community.chat_models.litellm import ChatLiteLLM, ChatLiteLLMException
from langflow import CustomComponent
from langflow.field_typing import BaseLanguageModel
@ -137,7 +138,14 @@ class ChatLiteLLMComponent(CustomComponent):
"OpenRouter": "openrouter_api_key",
}
# Set the API key based on the provider
kwarg = {provider_map[provider]: api_key}
api_keys: dict[str, Optional[str]] = {v: None for v in provider_map.values()}
if variable_name := provider_map.get(provider):
api_keys[variable_name] = api_key
else:
raise ChatLiteLLMException(
f"Provider {provider} is not supported. Supported providers are: {', '.join(provider_map.keys())}"
)
LLM = ChatLiteLLM(
model=model,
@ -150,6 +158,11 @@ class ChatLiteLLMComponent(CustomComponent):
n=n,
max_tokens=max_tokens,
max_retries=max_retries,
**kwarg,
openai_api_key=api_keys["openai_api_key"],
azure_api_key=api_keys["azure_api_key"],
anthropic_api_key=api_keys["anthropic_api_key"],
replicate_api_key=api_keys["replicate_api_key"],
cohere_api_key=api_keys["cohere_api_key"],
openrouter_api_key=api_keys["openrouter_api_key"],
)
return LLM

View file

@ -9,6 +9,7 @@ from langflow.field_typing import BaseLanguageModel, NestedDict
class ChatOpenAIComponent(CustomComponent):
display_name = "ChatOpenAI"
description = "`OpenAI` Chat large language models API."
icon = "OpenAI"
def build_config(self):
return {

View file

@ -10,6 +10,7 @@ from langflow.field_typing import BaseLanguageModel
class ChatVertexAIComponent(CustomComponent):
display_name = "ChatVertexAI"
description = "`Vertex AI` Chat large language models API."
icon = "VertexAI"
def build_config(self):
return {

View file

@ -7,6 +7,7 @@ class CohereComponent(CustomComponent):
display_name = "Cohere"
description = "Cohere large language models."
documentation = "https://python.langchain.com/docs/modules/model_io/models/llms/integrations/cohere"
icon = "Cohere"
def build_config(self):
return {

View file

@ -10,6 +10,7 @@ class GoogleGenerativeAIComponent(CustomComponent):
display_name: str = "Google Generative AI"
description: str = "A component that uses Google Generative AI to generate text."
documentation: str = "http://docs.langflow.org/components/custom"
icon = "Google"
def build_config(self):
return {

View file

@ -1,12 +1,14 @@
from typing import Optional
from langflow import CustomComponent
from langchain.llms.huggingface_endpoint import HuggingFaceEndpoint
from langchain.llms.base import BaseLLM
from langchain.llms.huggingface_endpoint import HuggingFaceEndpoint
from langflow import CustomComponent
class HuggingFaceEndpointsComponent(CustomComponent):
display_name: str = "Hugging Face Inference API"
description: str = "LLM model from Hugging Face Inference API."
icon = "HuggingFace"
def build_config(self):
return {
@ -31,11 +33,11 @@ class HuggingFaceEndpointsComponent(CustomComponent):
model_kwargs: Optional[dict] = None,
) -> BaseLLM:
try:
output = HuggingFaceEndpoint(
output = HuggingFaceEndpoint( # type: ignore
endpoint_url=endpoint_url,
task=task,
huggingfacehub_api_token=huggingfacehub_api_token,
model_kwargs=model_kwargs,
model_kwargs=model_kwargs or {},
)
except Exception as e:
raise ValueError("Could not connect to HuggingFace Endpoints API.") from e

View file

@ -7,6 +7,7 @@ from langchain_community.llms.vertexai import VertexAI
class VertexAIComponent(CustomComponent):
display_name = "VertexAI"
description = "Google Vertex AI large language models"
icon = "VertexAI"
def build_config(self):
return {

View file

@ -1,14 +1,15 @@
from typing import Optional
from langchain_community.chat_models.bedrock import BedrockChat
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import Text
from langflow import CustomComponent
class AmazonBedrockComponent(CustomComponent):
display_name: str = "Amazon Bedrock model"
class AmazonBedrockComponent(LCModelComponent):
display_name: str = "Amazon Bedrock Model"
description: str = "Generate text using LLM model from Amazon Bedrock."
icon = "Amazon"
def build_config(self):
return {
@ -34,12 +35,16 @@ class AmazonBedrockComponent(CustomComponent):
"model_kwargs": {"display_name": "Model Kwargs"},
"cache": {"display_name": "Cache"},
"code": {"advanced": True},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
inputs: str,
input_value: Text,
model_id: str = "anthropic.claude-instant-v1",
credentials_profile_name: Optional[str] = None,
region_name: Optional[str] = None,
@ -47,6 +52,7 @@ class AmazonBedrockComponent(CustomComponent):
endpoint_url: Optional[str] = None,
streaming: bool = False,
cache: Optional[bool] = None,
stream: bool = False,
) -> Text:
try:
output = BedrockChat(
@ -60,7 +66,5 @@ class AmazonBedrockComponent(CustomComponent):
) # type: ignore
except Exception as e:
raise ValueError("Could not connect to AmazonBedrock API.") from e
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -2,15 +2,15 @@ from typing import Optional
from langchain_community.chat_models.anthropic import ChatAnthropic
from pydantic.v1 import SecretStr
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import Text
from langflow import CustomComponent
class AnthropicLLM(CustomComponent):
display_name: str = "Anthropic model"
class AnthropicLLM(LCModelComponent):
display_name: str = "AnthropicModel"
description: str = "Generate text using Anthropic Chat&Completion large language models."
icon = "Anthropic"
def build_config(self):
return {
@ -48,17 +48,22 @@ class AnthropicLLM(CustomComponent):
"info": "Endpoint of the Anthropic API. Defaults to 'https://api.anthropic.com' if not specified.",
},
"code": {"show": False},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
model: str,
inputs: str,
input_value: Text,
anthropic_api_key: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
api_endpoint: Optional[str] = None,
stream: bool = False,
) -> Text:
# Set default API endpoint if not provided
if not api_endpoint:
@ -67,14 +72,12 @@ class AnthropicLLM(CustomComponent):
try:
output = ChatAnthropic(
model_name=model,
anthropic_api_key=SecretStr(anthropic_api_key) if anthropic_api_key else None,
anthropic_api_key=(SecretStr(anthropic_api_key) if anthropic_api_key else None),
max_tokens_to_sample=max_tokens, # type: ignore
temperature=temperature,
anthropic_api_url=api_endpoint,
)
except Exception as e:
raise ValueError("Could not connect to Anthropic API.") from e
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -2,15 +2,18 @@ from typing import Optional
from langchain.llms.base import BaseLanguageModel
from langchain_openai import AzureChatOpenAI
from pydantic.v1 import SecretStr
from langflow import CustomComponent
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import Text
class AzureChatOpenAIComponent(CustomComponent):
display_name: str = "AzureOpenAI model"
class AzureChatOpenAIComponent(LCModelComponent):
display_name: str = "AzureOpenAI Model"
description: str = "Generate text using LLM model from Azure OpenAI."
documentation: str = "https://python.langchain.com/docs/integrations/llms/azure_openai"
beta = False
icon = "Azure"
AZURE_OPENAI_MODELS = [
"gpt-35-turbo",
@ -71,33 +74,37 @@ class AzureChatOpenAIComponent(CustomComponent):
"info": "Maximum number of tokens to generate.",
},
"code": {"show": False},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
model: str,
azure_endpoint: str,
inputs: str,
input_value: Text,
azure_deployment: str,
api_key: str,
api_version: str,
temperature: float = 0.7,
max_tokens: Optional[int] = 1000,
stream: bool = False,
) -> BaseLanguageModel:
secret_api_key = SecretStr(api_key)
try:
output = AzureChatOpenAI(
model=model,
azure_endpoint=azure_endpoint,
azure_deployment=azure_deployment,
api_version=api_version,
api_key=api_key,
api_key=secret_api_key,
temperature=temperature,
max_tokens=max_tokens,
)
except Exception as e:
raise ValueError("Could not connect to AzureOpenAI API.") from e
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -3,16 +3,17 @@ from typing import Optional
from langchain_community.chat_models.baidu_qianfan_endpoint import QianfanChatEndpoint
from pydantic.v1 import SecretStr
from langflow import CustomComponent
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import Text
class QianfanChatEndpointComponent(CustomComponent):
class QianfanChatEndpointComponent(LCModelComponent):
display_name: str = "QianfanChat Model"
description: str = (
"Generate text using Baidu Qianfan chat models. Get more detail from "
"https://python.langchain.com/docs/integrations/chat/baidu_qianfan_endpoint."
)
icon = "BaiduQianfan"
def build_config(self):
return {
@ -68,12 +69,16 @@ class QianfanChatEndpointComponent(CustomComponent):
"info": "Endpoint of the Qianfan LLM, required if custom model used.",
},
"code": {"show": False},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
inputs: str,
input_value: Text,
model: str = "ERNIE-Bot-turbo",
qianfan_ak: Optional[str] = None,
qianfan_sk: Optional[str] = None,
@ -81,6 +86,7 @@ class QianfanChatEndpointComponent(CustomComponent):
temperature: Optional[float] = None,
penalty_score: Optional[float] = None,
endpoint: Optional[str] = None,
stream: bool = False,
) -> Text:
try:
output = QianfanChatEndpoint( # type: ignore
@ -94,7 +100,5 @@ class QianfanChatEndpointComponent(CustomComponent):
)
except Exception as e:
raise ValueError("Could not connect to Baidu Qianfan API.") from e
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -1,13 +1,13 @@
from typing import Dict, Optional
from langchain_community.llms.ctransformers import CTransformers
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import Text
from langflow import CustomComponent
class CTransformersComponent(CustomComponent):
display_name = "CTransformers model"
class CTransformersComponent(LCModelComponent):
display_name = "CTransformersModel"
description = "Generate text using CTransformers LLM models"
documentation = "https://python.langchain.com/docs/modules/model_io/models/llms/integrations/ctransformers"
@ -28,12 +28,28 @@ class CTransformersComponent(CustomComponent):
"field_type": "dict",
"value": '{"top_k":40,"top_p":0.95,"temperature":0.8,"repetition_penalty":1.1,"last_n_tokens":64,"seed":-1,"max_new_tokens":256,"stop":"","stream":"False","reset":"True","batch_size":8,"threads":-1,"context_length":-1,"gpu_layers":0}',
},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(self, model: str, model_file: str, inputs: str, model_type: str, config: Optional[Dict] = None) -> Text:
output = CTransformers(model=model, model_file=model_file, model_type=model_type, config=config)
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
def build(
self,
model: str,
model_file: str,
input_value: Text,
model_type: str,
stream: bool = False,
config: Optional[Dict] = None,
) -> Text:
output = CTransformers(
client=None,
model=model,
model_file=model_file,
model_type=model_type,
config=config, # noqa
)
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -1,30 +0,0 @@
from langchain_community.chat_models.cohere import ChatCohere
from langflow import CustomComponent
from langflow.field_typing import Text
class CohereComponent(CustomComponent):
display_name = "Cohere model"
description = "Generate text using Cohere large language models."
documentation = "https://python.langchain.com/docs/modules/model_io/models/llms/integrations/cohere"
def build_config(self):
return {
"cohere_api_key": {"display_name": "Cohere API Key", "type": "password", "password": True},
"max_tokens": {"display_name": "Max Tokens", "default": 256, "type": "int", "show": True},
"temperature": {"display_name": "Temperature", "default": 0.75, "type": "float", "show": True},
"inputs": {"display_name": "Input"},
}
def build(
self,
cohere_api_key: str,
inputs: str,
max_tokens: int = 256,
temperature: float = 0.75,
) -> Text:
output = ChatCohere(cohere_api_key=cohere_api_key, max_tokens=max_tokens, temperature=temperature)
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result

View file

@ -0,0 +1,51 @@
from langchain_community.chat_models.cohere import ChatCohere
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import Text
class CohereComponent(LCModelComponent):
display_name = "CohereModel"
description = "Generate text using Cohere large language models."
documentation = "https://python.langchain.com/docs/modules/model_io/models/llms/integrations/cohere"
icon = "Cohere"
def build_config(self):
return {
"cohere_api_key": {
"display_name": "Cohere API Key",
"type": "password",
"password": True,
},
"max_tokens": {
"display_name": "Max Tokens",
"default": 256,
"type": "int",
"show": True,
},
"temperature": {
"display_name": "Temperature",
"default": 0.75,
"type": "float",
"show": True,
},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
cohere_api_key: str,
input_value: Text,
temperature: float = 0.75,
stream: bool = False,
) -> Text:
output = ChatCohere( # type: ignore
cohere_api_key=cohere_api_key,
temperature=temperature,
)
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -1,16 +1,17 @@
from typing import Optional
from langchain_google_genai import ChatGoogleGenerativeAI # type: ignore
from langflow import CustomComponent
from langflow.field_typing import RangeSpec
from pydantic.v1.types import SecretStr
from langflow.field_typing import Text
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic.v1 import SecretStr
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import RangeSpec, Text
class GoogleGenerativeAIComponent(CustomComponent):
display_name: str = "Google Generative AI model"
class GoogleGenerativeAIComponent(LCModelComponent):
display_name: str = "Google Generative AIModel"
description: str = "Generate text using Google Generative AI to generate text."
documentation: str = "http://docs.langflow.org/components/custom"
icon = "GoogleGenerativeAI"
icon = "Google"
def build_config(self):
return {
@ -50,19 +51,24 @@ class GoogleGenerativeAIComponent(CustomComponent):
"code": {
"advanced": True,
},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input", "info": "The input to the model."},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
google_api_key: str,
model: str,
inputs: str,
input_value: Text,
max_output_tokens: Optional[int] = None,
temperature: float = 0.1,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
n: Optional[int] = 1,
stream: bool = False,
) -> Text:
output = ChatGoogleGenerativeAI(
model=model,
@ -73,7 +79,4 @@ class GoogleGenerativeAIComponent(CustomComponent):
n=n or 1,
google_api_key=SecretStr(google_api_key),
)
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -1,13 +1,16 @@
from typing import Optional
from langflow import CustomComponent
from langchain.llms.huggingface_endpoint import HuggingFaceEndpoint
from langchain_community.chat_models.huggingface import ChatHuggingFace
from langchain_community.llms.huggingface_endpoint import HuggingFaceEndpoint
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import Text
class HuggingFaceEndpointsComponent(CustomComponent):
class HuggingFaceEndpointsComponent(LCModelComponent):
display_name: str = "Hugging Face Inference API models"
description: str = "Generate text using LLM model from Hugging Face Inference API."
icon = "HuggingFace"
def build_config(self):
return {
@ -22,28 +25,32 @@ class HuggingFaceEndpointsComponent(CustomComponent):
"field_type": "code",
},
"code": {"show": False},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
inputs: str,
input_value: Text,
endpoint_url: str,
model: Optional[str] = None,
task: str = "text2text-generation",
huggingfacehub_api_token: Optional[str] = None,
model_kwargs: Optional[dict] = None,
stream: bool = False,
) -> Text:
try:
llm = HuggingFaceEndpoint(
llm = HuggingFaceEndpoint( # type: ignore
endpoint_url=endpoint_url,
task=task,
huggingfacehub_api_token=huggingfacehub_api_token,
model_kwargs=model_kwargs,
model_kwargs=model_kwargs or {},
model=model or "",
)
except Exception as e:
raise ValueError("Could not connect to HuggingFace Endpoints API.") from e
output = ChatHuggingFace(llm=llm)
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -1,11 +1,13 @@
from typing import Optional, List, Dict, Any
from langflow import CustomComponent
from typing import Any, Dict, List, Optional
from langchain_community.llms.llamacpp import LlamaCpp
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import Text
class LlamaCppComponent(CustomComponent):
display_name = "LlamaCpp model"
class LlamaCppComponent(LCModelComponent):
display_name = "LlamaCppModel"
description = "Generate text using llama.cpp model."
documentation = "https://python.langchain.com/docs/modules/model_io/models/llms/integrations/llamacpp"
@ -17,7 +19,10 @@ class LlamaCppComponent(CustomComponent):
"echo": {"display_name": "Echo", "advanced": True},
"f16_kv": {"display_name": "F16 KV", "advanced": True},
"grammar_path": {"display_name": "Grammar Path", "advanced": True},
"last_n_tokens_size": {"display_name": "Last N Tokens Size", "advanced": True},
"last_n_tokens_size": {
"display_name": "Last N Tokens Size",
"advanced": True,
},
"logits_all": {"display_name": "Logits All", "advanced": True},
"logprobs": {"display_name": "Logprobs", "advanced": True},
"lora_base": {"display_name": "Lora Base", "advanced": True},
@ -51,13 +56,17 @@ class LlamaCppComponent(CustomComponent):
"use_mmap": {"display_name": "Use Mmap", "advanced": True},
"verbose": {"display_name": "Verbose", "advanced": True},
"vocab_only": {"display_name": "Vocab Only", "advanced": True},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
model_path: str,
inputs: str,
input_value: Text,
grammar: Optional[str] = None,
cache: Optional[bool] = None,
client: Optional[Any] = None,
@ -92,6 +101,7 @@ class LlamaCppComponent(CustomComponent):
use_mmap: Optional[bool] = True,
verbose: bool = True,
vocab_only: bool = False,
stream: bool = False,
) -> Text:
output = LlamaCpp(
model_path=model_path,
@ -130,7 +140,5 @@ class LlamaCppComponent(CustomComponent):
verbose=verbose,
vocab_only=vocab_only,
)
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -3,17 +3,19 @@ from typing import Any, Dict, List, Optional
# from langchain_community.chat_models import ChatOllama
from langchain_community.chat_models import ChatOllama
from langflow.components.models.base.model import LCModelComponent
# from langchain.chat_models import ChatOllama
from langflow import CustomComponent
from langflow.field_typing import Text
# whe When a callback component is added to Langflow, the comment must be uncommented.
# from langchain.callbacks.manager import CallbackManager
class ChatOllamaComponent(CustomComponent):
display_name = "ChatOllama model"
class ChatOllamaComponent(LCModelComponent):
display_name = "ChatOllamaModel"
description = "Generate text using Local LLM for chat with Ollama."
icon = "Ollama"
def build_config(self) -> dict:
return {
@ -164,14 +166,18 @@ class ChatOllamaComponent(CustomComponent):
"info": "Template to use for generating text.",
"advanced": True,
},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
base_url: Optional[str],
model: str,
inputs: str,
input_value: Text,
mirostat: Optional[str],
mirostat_eta: Optional[float] = None,
mirostat_tau: Optional[float] = None,
@ -197,6 +203,7 @@ class ChatOllamaComponent(CustomComponent):
timeout: Optional[int] = None,
top_k: Optional[int] = None,
top_p: Optional[int] = None,
stream: bool = False,
) -> Text:
if not base_url:
base_url = "http://localhost:11434"
@ -250,7 +257,5 @@ class ChatOllamaComponent(CustomComponent):
output = ChatOllama(**llm_params) # type: ignore
except Exception as e:
raise ValueError("Could not initialize Ollama LLM.") from e
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -1,17 +1,20 @@
from typing import Optional
from langchain_openai import ChatOpenAI
from langflow import CustomComponent
from pydantic.v1 import SecretStr
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import NestedDict, Text
class OpenAIModelComponent(CustomComponent):
class OpenAIModelComponent(LCModelComponent):
display_name = "OpenAI Model"
description = "Generates text using OpenAI's models."
icon = "OpenAI"
def build_config(self):
return {
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"max_tokens": {
"display_name": "Max Tokens",
"advanced": False,
@ -56,30 +59,36 @@ class OpenAIModelComponent(CustomComponent):
"required": False,
"value": 0.7,
},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
inputs: Text,
input_value: Text,
max_tokens: Optional[int] = 256,
model_kwargs: NestedDict = {},
model_name: str = "gpt-4-1106-preview",
openai_api_base: Optional[str] = None,
openai_api_key: Optional[str] = None,
temperature: float = 0.7,
stream: bool = False,
) -> Text:
if not openai_api_base:
openai_api_base = "https://api.openai.com/v1"
model = ChatOpenAI(
if openai_api_key:
secret_key = SecretStr(openai_api_key)
else:
secret_key = None
output = ChatOpenAI(
max_tokens=max_tokens,
model_kwargs=model_kwargs,
model=model_name,
base_url=openai_api_base,
api_key=openai_api_key,
api_key=secret_key,
temperature=temperature,
)
message = model.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -2,13 +2,14 @@ from typing import List, Optional
from langchain_core.messages.base import BaseMessage
from langflow import CustomComponent
from langflow.components.models.base.model import LCModelComponent
from langflow.field_typing import Text
class ChatVertexAIComponent(CustomComponent):
display_name = "ChatVertexAI model"
class ChatVertexAIComponent(LCModelComponent):
display_name = "ChatVertexAIModel"
description = "Generate text using Vertex AI Chat large language models API."
icon = "VertexAI"
def build_config(self):
return {
@ -57,12 +58,16 @@ class ChatVertexAIComponent(CustomComponent):
"value": False,
"advanced": True,
},
"inputs": {"display_name": "Input"},
"input_value": {"display_name": "Input"},
"stream": {
"display_name": "Stream",
"info": "Stream the response from the model.",
},
}
def build(
self,
inputs: str,
input_value: Text,
credentials: Optional[str],
project: str,
examples: Optional[List[BaseMessage]] = [],
@ -73,9 +78,10 @@ class ChatVertexAIComponent(CustomComponent):
top_k: int = 40,
top_p: float = 0.95,
verbose: bool = False,
stream: bool = False,
) -> Text:
try:
from langchain_google_vertexai import ChatVertexAI
from langchain_google_vertexai import ChatVertexAI # type: ignore
except ImportError:
raise ImportError(
"To use the ChatVertexAI model, you need to install the langchain-google-vertexai package."
@ -92,7 +98,5 @@ class ChatVertexAIComponent(CustomComponent):
top_p=top_p,
verbose=verbose,
)
message = output.invoke(inputs)
result = message.content if hasattr(message, "content") else message
self.status = result
return result
return self.get_result(output=output, stream=stream, input_value=input_value)

View file

@ -0,0 +1,28 @@
from langchain_core.runnables import Runnable
from langflow import CustomComponent
class LCModelComponent(CustomComponent):
display_name: str = "Model Name"
description: str = "Model Description"
def get_result(self, output: Runnable, stream: bool, input_value: str):
"""
Retrieves the result from the output of a Runnable object.
Args:
output (Runnable): The output object to retrieve the result from.
stream (bool): Indicates whether to use streaming or invocation mode.
input_value (str): The input value to pass to the output object.
Returns:
The result obtained from the output object.
"""
if stream:
result = output.stream(input_value)
else:
message = output.invoke(input_value)
result = message.content if hasattr(message, "content") else message
self.status = result
return result

View file

@ -1,4 +1,5 @@
from langchain_core.prompts import PromptTemplate
from langflow import CustomComponent
from langflow.field_typing import Prompt, TemplateField, Text
@ -19,10 +20,10 @@ class PromptComponent(CustomComponent):
template: Prompt,
**kwargs,
) -> Text:
prompt_template = PromptTemplate.from_template(template)
prompt_template = PromptTemplate.from_template(Text(template))
attributes_to_check = ["text", "page_content"]
for key, value in kwargs.items():
for key, value in kwargs.copy().items():
for attribute in attributes_to_check:
if hasattr(value, attribute):
kwargs[key] = getattr(value, attribute)

View file

@ -1,12 +1,15 @@
from typing import Optional
from langflow import CustomComponent
from langchain.retrievers import AmazonKendraRetriever
from langchain.schema import BaseRetriever
from langchain_community.retrievers import AmazonKendraRetriever
from langflow import CustomComponent
class AmazonKendraRetrieverComponent(CustomComponent):
display_name: str = "Amazon Kendra Retriever"
description: str = "Retriever that uses the Amazon Kendra API."
icon = "Amazon"
def build_config(self):
return {

View file

@ -1,9 +1,11 @@
from typing import Optional
from langflow import CustomComponent
from langchain.retrievers import MetalRetriever
from langchain.schema import BaseRetriever
from langchain_community.retrievers import MetalRetriever
from metal_sdk.metal import Metal # type: ignore
from langflow import CustomComponent
class MetalRetrieverComponent(CustomComponent):
display_name: str = "Metal Retriever"

View file

@ -17,6 +17,7 @@ class VectaraSelfQueryRetriverComponent(CustomComponent):
description: str = "Implementation of Vectara Self Query Retriever"
documentation = "https://python.langchain.com/docs/integrations/retrievers/self_query/vectara_self_query"
beta = True
icon = "Vectara"
field_config = {
"code": {"show": True},

View file

@ -0,0 +1,51 @@
from langflow import CustomComponent
from langchain.schema import Document
from langflow.services.database.models.base import orjson_dumps
from langchain_community.utilities.searchapi import SearchApiAPIWrapper
from typing import Optional
class SearchApi(CustomComponent):
display_name: str = "SearchApi"
description: str = "Real-time search engine results API."
output_types: list[str] = ["Document"]
documentation: str = "https://www.searchapi.io/docs/google"
field_config = {
"engine": {
"display_name": "Engine",
"field_type": "str",
"info": "The search engine to use.",
},
"params": {
"display_name": "Parameters",
"info": "The parameters to send with the request.",
},
"code": {"show": False},
"api_key": {
"display_name": "API Key",
"field_type": "str",
"required": True,
"password": True,
"info": "The API key to use SearchApi.",
},
}
def build(
self,
engine: str,
api_key: str,
params: Optional[dict] = None,
) -> Document:
if params is None:
params = {}
search_api_wrapper = SearchApiAPIWrapper(engine=engine, searchapi_api_key=api_key)
q = params.pop("q", "SearchApi Langflow")
results = search_api_wrapper.results(q, **params)
result = orjson_dumps(results, indent_2=False)
document = Document(page_content=result)
return document

View file

@ -0,0 +1,22 @@
from typing import List
from langchain_core.documents import Document
from langflow import CustomComponent
from langflow.schema import Record
class DocumentToRecordComponent(CustomComponent):
display_name = "Documents to Records"
description = "Convert documents to records."
field_config = {
"documents": {"display_name": "Documents"},
}
def build(self, documents: List[Document]) -> List[Record]:
if isinstance(documents, Document):
documents = [documents]
records = [Record.from_document(document) for document in documents]
self.status = records
return records

View file

@ -1,7 +1,8 @@
from typing import Optional
from typing import Optional, Text
import requests
from langchain_core.documents import Document
from langflow import CustomComponent
from langflow.services.database.models.base import orjson_dumps
@ -55,7 +56,7 @@ class GetRequest(CustomComponent):
)
except Exception as exc:
return Document(
page_content=str(exc),
page_content=Text(exc),
metadata={"source": url, "headers": headers, "status_code": 500},
)

View file

@ -3,6 +3,7 @@ from typing import Dict
# Assuming the existence of GoogleSerperAPIWrapper class in the serper module
# If this class does not exist, you would need to create it or import the appropriate class from another module
from langchain_community.utilities.google_serper import GoogleSerperAPIWrapper
from langflow import CustomComponent
@ -17,20 +18,23 @@ class GoogleSerperAPIWrapperComponent(CustomComponent):
"show": True,
"multiline": False,
"password": False,
"name": "result_key_for_type",
"advanced": False,
"dynamic": False,
"info": "",
"field_type": "dict",
"list": False,
"value": {"news": "news", "places": "places", "images": "images", "search": "organic"},
"value": {
"news": "news",
"places": "places",
"images": "images",
"search": "organic",
},
},
"serper_api_key": {
"display_name": "Serper API Key",
"show": True,
"multiline": False,
"password": True,
"name": "serper_api_key",
"advanced": False,
"dynamic": False,
"info": "",

View file

@ -1,4 +1,5 @@
import uuid
from typing import Text
from langflow import CustomComponent
@ -9,7 +10,7 @@ class UUIDGeneratorComponent(CustomComponent):
description = "Generates a unique ID."
def generate(self, *args, **kwargs):
return str(uuid.uuid4().hex)
return Text(uuid.uuid4().hex)
def build_config(self):
return {"unique_id": {"display_name": "Value", "value": self.generate}}

View file

@ -1,7 +1,8 @@
from typing import Optional
from typing import Optional, Text
import requests
from langchain_core.documents import Document
from langflow import CustomComponent
from langflow.services.database.models.base import orjson_dumps
@ -47,7 +48,7 @@ class PostRequest(CustomComponent):
)
except Exception as exc:
return Document(
page_content=str(exc),
page_content=Text(exc),
metadata={
"source": url,
"headers": headers,

View file

@ -1,5 +1,6 @@
from langflow import CustomComponent
from langflow.field_typing import Text
from langflow.helpers.record import records_to_text
from langflow.schema import Record
@ -27,7 +28,6 @@ class RecordsAsTextComponent(CustomComponent):
if isinstance(records, Record):
records = [records]
formated_records = [template.format(text=record.text, **record.data) for record in records]
result_string = "\n".join(formated_records)
result_string = records_to_text(template, records)
self.status = result_string
return result_string

View file

@ -15,7 +15,7 @@ class RunnableExecComponent(CustomComponent):
"display_name": "Input Key",
"info": "The key to use for the input.",
},
"inputs": {
"input_value": {
"display_name": "Inputs",
"info": "The inputs to pass to the runnable.",
},
@ -32,11 +32,11 @@ class RunnableExecComponent(CustomComponent):
def build(
self,
input_key: str,
inputs: str,
input_value: Text,
runnable: Runnable,
output_key: str = "output",
) -> Text:
result = runnable.invoke({input_key: inputs})
result = runnable.invoke({input_key: input_value})
result = result.get(output_key)
self.status = result
return result

View file

@ -0,0 +1,22 @@
from langchain_experimental.sql.base import SQLDatabase
from langflow import CustomComponent
class SQLDatabaseComponent(CustomComponent):
display_name = "SQLDatabase"
description = "SQL Database"
def build_config(self):
return {
"uri": {"display_name": "URI", "info": "URI to the database."},
}
def clean_up_uri(self, uri: str) -> str:
if uri.startswith("postgresql://"):
uri = uri.replace("postgresql://", "postgres://")
return uri.strip()
def build(self, uri: str) -> SQLDatabase:
uri = self.clean_up_uri(uri)
return SQLDatabase.from_uri(uri)

View file

@ -0,0 +1,56 @@
from langchain_community.tools.sql_database.tool import QuerySQLDataBaseTool
from langchain_experimental.sql.base import SQLDatabase
from langflow import CustomComponent
from langflow.field_typing import Text
class SQLExecutorComponent(CustomComponent):
display_name = "SQL Executor"
description = "Execute SQL query."
def build_config(self):
return {
"database": {"display_name": "Database"},
"include_columns": {
"display_name": "Include Columns",
"info": "Include columns in the result.",
},
"passthrough": {
"display_name": "Passthrough",
"info": "If an error occurs, return the query instead of raising an exception.",
},
"add_error": {
"display_name": "Add Error",
"info": "Add the error to the result.",
},
}
def build(
self,
query: str,
database: SQLDatabase,
include_columns: bool = False,
passthrough: bool = False,
add_error: bool = False,
) -> Text:
error = None
try:
tool = QuerySQLDataBaseTool(db=database)
result = tool.run(query, include_columns=include_columns)
self.status = result
except Exception as e:
result = Text(e)
self.status = result
if not passthrough:
raise e
error = repr(e)
if add_error and error is not None:
result = f"{result}\n\nError: {error}\n\nQuery: {query}"
elif error is not None:
# Then we won't add the error to the result
# but since we are in passthrough mode, we will return the query
result = query
return result

View file

@ -0,0 +1,38 @@
from typing import Union
from langflow import CustomComponent
from langflow.field_typing import Text
from langflow.schema import Record
class SharedState(CustomComponent):
display_name = "Shared State"
description = "A component to share state between components."
def build_config(self):
return {
"name": {"display_name": "Name", "info": "The name of the state."},
"record": {"display_name": "Record", "info": "The record to store."},
"append": {
"display_name": "Append",
"info": "If True, the record will be appended to the state.",
},
}
def build(
self, name: str, record: Union[Text, Record], append: bool = False
) -> Record:
if append:
self.append_state(name, record)
else:
self.update_state(name, record)
state = self.get_state(name)
if not isinstance(state, Record):
if isinstance(state, str):
state = Record(text=state)
elif isinstance(state, dict):
state = Record(data=state)
else:
state = Record(text=str(state))
return state

Some files were not shown because too many files have changed in this diff Show more