This commit is contained in:
Cristhian Zanforlin Lousa 2023-09-27 14:35:53 -03:00
commit b4ea2478e8
29 changed files with 494 additions and 181 deletions

View file

@ -20,7 +20,7 @@ coverage:
tests:
@make install_backend
poetry run pytest tests -n auto
poetry run pytest tests
format:
poetry run black .

View file

@ -1,4 +1,5 @@
version: "3.8"
services:
proxy:
ports:

View file

@ -1,85 +0,0 @@
version: "3.8"
services:
proxy:
ports:
- "80:80"
- "8090:8080"
command:
# Enable Docker in Traefik, so that it reads labels from Docker services
- --providers.docker
# Add a constraint to only use services with the label for this stack
# from the env var TRAEFIK_TAG
- --providers.docker.constraints=Label(`traefik.constraint-label-stack`, `${TRAEFIK_TAG?Variable not set}`)
# Do not expose all Docker services, only the ones explicitly exposed
- --providers.docker.exposedbydefault=false
# Disable Docker Swarm mode for local development
# - --providers.docker.swarmmode
# Enable the access log, with HTTP requests
- --accesslog
# Enable the Traefik log, for configurations and errors
- --log
# Enable the Dashboard and API
- --api
# Enable the Dashboard and API in insecure mode for local development
- --api.insecure=true
labels:
- traefik.enable=true
- traefik.http.routers.${STACK_NAME?Variable not set}-traefik-public-http.rule=Host(`${DOMAIN?Variable not set}`)
- traefik.http.services.${STACK_NAME?Variable not set}-traefik-public.loadbalancer.server.port=80
result_backend:
ports:
- "6379:6379"
pgadmin:
ports:
- "5050:5050"
flower:
ports:
- "5555:5555"
backend:
labels:
- traefik.enable=true
- traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-backend-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`)
- traefik.http.services.${STACK_NAME?Variable not set}-backend.loadbalancer.server.port=7860
frontend:
labels:
- traefik.enable=true
- traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-frontend-http.rule=PathPrefix(`/`)
- traefik.http.services.${STACK_NAME?Variable not set}-frontend.loadbalancer.server.port=80
celeryworker:
labels:
- traefik.enable=true
- traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-celeryworker-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`)
- traefik.http.services.${STACK_NAME?Variable not set}-celeryworker.loadbalancer.server.port=7860
tests:
extends:
file: docker-compose.yml
service: backend
env_file:
- .env
build:
context: ../
dockerfile: base.Dockerfile
command: pytest -vv
healthcheck:
test: "exit 0"
# override deploy labels to avoid conflicts with the backend service
labels:
- traefik.enable=true
- traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-tests-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`)
- traefik.http.services.${STACK_NAME?Variable not set}-tests.loadbalancer.server.port=7861
networks:
traefik-public:
# For local dev, don't expect an external Traefik network
external: false

View file

@ -0,0 +1,277 @@
version: "3.8"
services:
proxy:
image: traefik:v3.0
env_file:
- .env
networks:
- ${TRAEFIK_PUBLIC_NETWORK?Variable not set}
- default
volumes:
- /var/run/docker.sock:/var/run/docker.sock
command:
# Enable Docker in Traefik, so that it reads labels from Docker services
- --providers.docker
# Add a constraint to only use services with the label for this stack
# from the env var TRAEFIK_TAG
- --providers.docker.constraints=Label(`traefik.constraint-label-stack`, `${TRAEFIK_TAG?Variable not set}`)
# Do not expose all Docker services, only the ones explicitly exposed
- --providers.docker.exposedbydefault=false
# Enable the access log, with HTTP requests
- --accesslog
# Enable the Traefik log, for configurations and errors
- --log
# Enable the Dashboard and API
- --api
deploy:
placement:
constraints:
- node.role == manager
labels:
# Enable Traefik for this service, to make it available in the public network
- traefik.enable=true
# Use the traefik-public network (declared below)
- traefik.docker.network=${TRAEFIK_PUBLIC_NETWORK?Variable not set}
# Use the custom label "traefik.constraint-label=traefik-public"
# This public Traefik will only use services with this label
- traefik.constraint-label=${TRAEFIK_PUBLIC_TAG?Variable not set}
# traefik-http set up only to use the middleware to redirect to https
- traefik.http.middlewares.${STACK_NAME?Variable not set}-https-redirect.redirectscheme.scheme=https
- traefik.http.middlewares.${STACK_NAME?Variable not set}-https-redirect.redirectscheme.permanent=true
# Handle host with and without "www" to redirect to only one of them
# Uses environment variable DOMAIN
# To disable www redirection remove the Host() you want to discard, here and
# below for HTTPS
- traefik.http.routers.${STACK_NAME?Variable not set}-proxy-http.rule=Host(`${DOMAIN?Variable not set}`) || Host(`www.${DOMAIN?Variable not set}`)
- traefik.http.routers.${STACK_NAME?Variable not set}-proxy-http.entrypoints=http
# traefik-https the actual router using HTTPS
- traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.rule=Host(`${DOMAIN?Variable not set}`) || Host(`www.${DOMAIN?Variable not set}`)
- traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.entrypoints=https
- traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.tls=true
# Use the "le" (Let's Encrypt) resolver created below
- traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.tls.certresolver=le
# Define the port inside of the Docker service to use
- traefik.http.services.${STACK_NAME?Variable not set}-proxy.loadbalancer.server.port=80
# Handle domain with and without "www" to redirect to only one
# To disable www redirection remove the next line
- traefik.http.middlewares.${STACK_NAME?Variable not set}-www-redirect.redirectregex.regex=^https?://(www.)?(${DOMAIN?Variable not set})/(.*)
# Redirect a domain with www to non-www
# To disable it remove the next line
- traefik.http.middlewares.${STACK_NAME?Variable not set}-www-redirect.redirectregex.replacement=https://${DOMAIN?Variable not set}/$${3}
# Redirect a domain without www to www
# To enable it remove the previous line and uncomment the next
# - traefik.http.middlewares.${STACK_NAME}-www-redirect.redirectregex.replacement=https://www.${DOMAIN}/$${3}
# Middleware to redirect www, to disable it remove the next line
- traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.middlewares=${STACK_NAME?Variable not set}-www-redirect
# Middleware to redirect www, and redirect HTTP to HTTPS
# to disable www redirection remove the section: ${STACK_NAME?Variable not set}-www-redirect,
- traefik.http.routers.${STACK_NAME?Variable not set}-proxy-http.middlewares=${STACK_NAME?Variable not set}-www-redirect,${STACK_NAME?Variable not set}-https-redirect
backend: &backend
image: "ogabrielluiz/langflow:latest"
build:
context: ../
dockerfile: base.Dockerfile
depends_on:
- db
- broker
- result_backend
env_file:
- .env
volumes:
- ../:/app
- ./startup-backend.sh:/startup-backend.sh # Ensure the paths match
command: /startup-backend.sh # Fixed the path
healthcheck:
test: "exit 0"
deploy:
labels:
- traefik.enable=true
- traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-backend-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`)
- traefik.http.services.${STACK_NAME?Variable not set}-backend.loadbalancer.server.port=7860
db:
image: postgres:15.4
volumes:
- app-db-data:/var/lib/postgresql/data/pgdata
environment:
- PGDATA=/var/lib/postgresql/data/pgdata
deploy:
placement:
constraints:
- node.labels.app-db-data == true
healthcheck:
test: "exit 0"
env_file:
- .env
pgadmin:
image: dpage/pgadmin4
networks:
- ${TRAEFIK_PUBLIC_NETWORK?Variable not set}
- default
volumes:
- pgadmin-data:/var/lib/pgadmin
env_file:
- .env
deploy:
labels:
- traefik.enable=true
- traefik.docker.network=${TRAEFIK_PUBLIC_NETWORK?Variable not set}
- traefik.constraint-label=${TRAEFIK_PUBLIC_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-http.rule=Host(`pgadmin.${DOMAIN?Variable not set}`)
- traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-http.entrypoints=http
- traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-http.middlewares=${STACK_NAME?Variable not set}-https-redirect
- traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-https.rule=Host(`pgadmin.${DOMAIN?Variable not set}`)
- traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-https.entrypoints=https
- traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-https.tls=true
- traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-https.tls.certresolver=le
- traefik.http.services.${STACK_NAME?Variable not set}-pgadmin.loadbalancer.server.port=5050
result_backend:
image: redis:6.2.5
env_file:
- .env
# ports:
# - 6379:6379
healthcheck:
test: "exit 0"
celeryworker:
<<: *backend
env_file:
- .env
build:
context: ../
dockerfile: base.Dockerfile
command: celery -A langflow.worker.celery_app worker --loglevel=INFO --concurrency=1 -n lf-worker@%h
healthcheck:
test: "exit 0"
deploy:
replicas: 1
flower:
<<: *backend
env_file:
- .env
networks:
- default
build:
context: ../
dockerfile: base.Dockerfile
environment:
- FLOWER_PORT=5555
command: /bin/sh -c "celery -A langflow.worker.celery_app --broker=${BROKER_URL?Variable not set} flower --port=5555"
deploy:
labels:
- traefik.enable=true
- traefik.docker.network=${TRAEFIK_PUBLIC_NETWORK?Variable not set}
- traefik.constraint-label=${TRAEFIK_PUBLIC_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-flower-http.rule=Host(`flower.${DOMAIN?Variable not set}`)
- traefik.http.routers.${STACK_NAME?Variable not set}-flower-http.entrypoints=http
- traefik.http.routers.${STACK_NAME?Variable not set}-flower-http.middlewares=${STACK_NAME?Variable not set}-https-redirect
- traefik.http.routers.${STACK_NAME?Variable not set}-flower-https.rule=Host(`flower.${DOMAIN?Variable not set}`)
- traefik.http.routers.${STACK_NAME?Variable not set}-flower-https.entrypoints=https
- traefik.http.routers.${STACK_NAME?Variable not set}-flower-https.tls=true
- traefik.http.routers.${STACK_NAME?Variable not set}-flower-https.tls.certresolver=le
- traefik.http.services.${STACK_NAME?Variable not set}-flower.loadbalancer.server.port=5555
frontend:
image: "ogabrielluiz/langflow_frontend:latest"
env_file:
- .env
# user: your-non-root-user
build:
context: ../src/frontend
dockerfile: Dockerfile
args:
- BACKEND_URL=http://backend:7860
restart: on-failure
deploy:
labels:
- traefik.enable=true
- traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-frontend-http.rule=PathPrefix(`/`)
- traefik.http.services.${STACK_NAME?Variable not set}-frontend.loadbalancer.server.port=80
broker:
# RabbitMQ management console
image: rabbitmq:3-management
environment:
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER:-admin}
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS:-admin}
volumes:
- rabbitmq_data:/etc/rabbitmq/
- rabbitmq_data:/var/lib/rabbitmq/
- rabbitmq_log:/var/log/rabbitmq/
ports:
- 5672:5672
- 15672:15672
prometheus:
image: prom/prometheus:v2.37.9
env_file:
- .env
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- "--config.file=/etc/prometheus/prometheus.yml"
# ports:
# - 9090:9090
healthcheck:
test: "exit 0"
deploy:
labels:
- traefik.enable=true
- traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-prometheus-http.rule=PathPrefix(`/metrics`)
- traefik.http.services.${STACK_NAME?Variable not set}-prometheus.loadbalancer.server.port=9090
grafana:
image: grafana/grafana:8.2.6
env_file:
- .env
# ports:
# - 3000:3000
volumes:
- grafana_data:/var/lib/grafana
deploy:
labels:
- traefik.enable=true
- traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-grafana-http.rule=PathPrefix(`/grafana`)
- traefik.http.services.${STACK_NAME?Variable not set}-grafana.loadbalancer.server.port=3000
tests:
extends:
file: docker-compose.yml
service: backend
env_file:
- .env
build:
context: ../
dockerfile: base.Dockerfile
command: pytest -vv
healthcheck:
test: "exit 0"
# override deploy labels to avoid conflicts with the backend service
labels:
- traefik.enable=true
- traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set}
- traefik.http.routers.${STACK_NAME?Variable not set}-tests-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`)
- traefik.http.services.${STACK_NAME?Variable not set}-tests.loadbalancer.server.port=7861
volumes:
grafana_data:
app-db-data:
rabbitmq_data:
rabbitmq_log:
pgadmin-data:
networks:
traefik-public:
# Allow setting it to false for testing
external: false # ${TRAEFIK_PUBLIC_NETWORK_IS_EXTERNAL-true}

View file

@ -27,13 +27,29 @@ curl -X POST \
-d '{"inputs": {"text": ""}, "tweaks": {}, "sync": false}'
```
Response:
```json
{
"result": {
"output": "..."
},
"task": {
"id": "...",
"href": "api/v1/task/<task_id>"
},
"session_id": "...",
"backend": "..." // celery or anyio
}
```
## Checking Task Status
You can check the status of an asynchronous task by making a GET request to the `/task/{task_id}/status` endpoint.
You can check the status of an asynchronous task by making a GET request to the `/task/{task_id}` endpoint.
```bash
curl -X GET \
http://localhost:3000/api/v1/task/<task_id>/status \
http://localhost:3000/api/v1/task/<task_id> \
-H 'x-api-key: <your_api_key>'
```

View file

@ -40,5 +40,5 @@ This will set up the following containers:
To run the tests for the Async API, you can run the following command:
```bash
docker compose -f docker-compose.test.yml up --exit-code-from tests tests result_backend broker celeryworker db --build
docker compose -f docker-compose.with_tests.yml up --exit-code-from tests tests result_backend broker celeryworker db --build
```

View file

@ -23,7 +23,7 @@ from rich.table import Table
console = Console()
app = typer.Typer()
app = typer.Typer(no_args_is_help=True)
def get_number_of_workers(workers=None):
@ -141,7 +141,7 @@ def run(
),
):
"""
Run the Langflow server.
Run the Langflow.
"""
# override env variables with .env file
if env_file:
@ -299,7 +299,14 @@ def superuser(
password: str = typer.Option(
..., prompt=True, hide_input=True, help="Password for the superuser."
),
log_level: str = typer.Option(
"critical", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"
),
):
"""
Create a superuser.
"""
configure(log_level=log_level)
initialize_services()
db_service = get_db_service()
with session_getter(db_service) as session:
@ -321,7 +328,10 @@ def superuser(
@app.command()
def migration(test: bool = typer.Option(False, help="Run migrations in test mode.")):
def migration(test: bool = typer.Option(True, help="Run migrations in test mode.")):
"""
Run or test migrations.
"""
initialize_services()
db_service = get_db_service()
if not test:

View file

@ -13,6 +13,7 @@ from langflow.api.v1.schemas import BuildStatus, BuiltResponse, InitResponse, St
from langflow.graph.graph.base import Graph
from langflow.services.auth.utils import get_current_active_user, get_current_user
from langflow.services.cache.utils import update_build_status
from loguru import logger
from langflow.services.getters import get_chat_service, get_session, get_cache_service
from sqlmodel import Session
@ -157,7 +158,7 @@ async def stream_build(
graph = Graph.from_payload(graph_data)
number_of_nodes = len(graph.nodes)
cache_service[flow_id]["status"] = BuildStatus.IN_PROGRESS
update_build_status(cache_service, flow_id, BuildStatus.IN_PROGRESS)
for i, vertex in enumerate(graph.generator_build(), 1):
try:
@ -184,7 +185,7 @@ async def stream_build(
logger.exception(exc)
params = str(exc)
valid = False
cache_service[flow_id]["status"] = BuildStatus.FAILURE
update_build_status(cache_service, flow_id, BuildStatus.FAILURE)
response = {
"valid": valid,
@ -211,11 +212,12 @@ async def stream_build(
chat_service.set_cache(flow_id, langchain_object)
# We need to reset the chat history
chat_service.chat_history.empty_history(flow_id)
cache_service[flow_id]["status"] = BuildStatus.SUCCESS
update_build_status(cache_service, flow_id, BuildStatus.SUCCESS)
except Exception as exc:
logger.exception(exc)
logger.error("Error while building the flow: %s", exc)
cache_service[flow_id]["status"] = BuildStatus.FAILURE
update_build_status(cache_service, flow_id, BuildStatus.FAILURE)
yield str(StreamData(event="error", data={"error": str(exc)}))
finally:
yield str(StreamData(event="message", data=final_response))

View file

@ -20,6 +20,7 @@ from langflow.interface.custom.custom_component import CustomComponent
from langflow.api.v1.schemas import (
ProcessResponse,
TaskResponse,
TaskStatusResponse,
UploadFileResponse,
CustomComponentCode,
@ -145,9 +146,15 @@ async def process_flow(
session_id,
)
task_result = task.status
if task_id:
task_response = TaskResponse(id=task_id, href=f"api/v1/task/{task_id}")
else:
task_response = None
return ProcessResponse(
result=task_result,
id=task_id,
task=task_response,
session_id=session_id,
backend=str(type(task_service.backend)),
)
@ -173,7 +180,7 @@ async def process_flow(
raise HTTPException(status_code=500, detail=str(e)) from e
@router.get("/task/{task_id}/status", response_model=TaskStatusResponse)
@router.get("/task/{task_id}", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
task_service = get_task_service()
task = task_service.get_task(task_id)

View file

@ -47,11 +47,18 @@ class UpdateTemplateRequest(BaseModel):
template: dict
class TaskResponse(BaseModel):
"""Task response schema."""
id: Optional[str] = Field(None)
href: Optional[str] = Field(None)
class ProcessResponse(BaseModel):
"""Process response schema."""
result: Any
id: Optional[str] = None
task: Optional[TaskResponse] = None
session_id: Optional[str] = None
backend: Optional[str] = None

View file

@ -185,9 +185,10 @@ class Vertex:
# Load the type in value.get('suffixes') using
# what is inside value.get('content')
# value.get('value') is the file name
file_path = value.get("file_path")
params[key] = file_path
if file_path := value.get("file_path"):
params[key] = file_path
else:
raise ValueError(f"File path not found for {self.vertex_type}")
elif value.get("type") in DIRECT_TYPES and params.get(key) is None:
if value.get("type") == "code":
try:

View file

@ -102,7 +102,10 @@ class CustomComponent(Component, extra=Extra.allow):
status_code=400,
detail={
"error": "Type hint Error",
"traceback": "Prompt type is not supported in the build method. Try using PromptTemplate instead.",
"traceback": (
"Prompt type is not supported in the build method."
" Try using PromptTemplate instead."
),
},
)
return args

View file

@ -144,6 +144,8 @@ def import_chain(chain: str) -> Type[Chain]:
if chain in CUSTOM_CHAINS:
return CUSTOM_CHAINS[chain]
if chain == "SQLDatabaseChain":
return import_class("langchain_experimental.sql.SQLDatabaseChain")
return import_class(f"langchain.chains.{chain}")

View file

@ -74,13 +74,17 @@ class InMemoryCache(BaseCacheService, Service):
):
# Move the key to the end to make it recently used
self._cache.move_to_end(key)
unpickled = pickle.loads(item["value"])
return unpickled
# Check if the value is pickled
if isinstance(item["value"], bytes):
value = pickle.loads(item["value"])
else:
value = item["value"]
return value
else:
self.delete(key)
return None
def set(self, key, value):
def set(self, key, value, pickle=False):
"""
Add an item to the cache.
@ -98,8 +102,10 @@ class InMemoryCache(BaseCacheService, Service):
# Remove least recently used item
self._cache.popitem(last=False)
# pickle locally to mimic Redis
pickled = pickle.dumps(value)
self._cache[key] = {"value": pickled, "time": time.time()}
if pickle:
value = pickle.dumps(value)
self._cache[key] = {"value": value, "time": time.time()}
def upsert(self, key, value):
"""

View file

@ -9,6 +9,7 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict
from appdirs import user_cache_dir
from fastapi import UploadFile
from langflow.api.v1.schemas import BuildStatus
from langflow.services.database.models.base import orjson_dumps
if TYPE_CHECKING:
@ -202,3 +203,11 @@ def save_uploaded_file(file: UploadFile, folder_name):
new_file.write(chunk)
return file_path
def update_build_status(cache_service, flow_id: str, status: BuildStatus):
cached_flow = cache_service[flow_id]
if cached_flow is None:
raise ValueError(f"Flow {flow_id} not found in cache")
cached_flow["status"] = status
cache_service[flow_id] = cached_flow

View file

@ -94,9 +94,7 @@ class DatabaseService(Service):
return True
def run_migrations(self):
logger.info(
f"Running DB migrations in {self.script_location} on {self.database_url}"
)
logger.info(f"Running DB migrations in {self.script_location}")
alembic_cfg = Config()
alembic_cfg.set_main_option("script_location", str(self.script_location))
alembic_cfg.set_main_option("sqlalchemy.url", self.database_url)

View file

@ -1,4 +1,5 @@
from typing import Any, Callable, Coroutine, Union
from langflow.utils.logger import configure
from loguru import logger
from langflow.services.base import Service
from langflow.services.task.backends.anyio import AnyIOBackend
@ -7,18 +8,19 @@ from langflow.services.task.utils import get_celery_worker_status
def check_celery_availability():
from langflow.worker import celery_app
try:
from langflow.worker import celery_app
status = get_celery_worker_status(celery_app)
logger.debug(f"Celery status: {status}")
except Exception as e:
logger.error(f"An error occurred: {e}")
except Exception as exc:
logger.debug(f"Celery not available: {exc}")
status = {"availability": None}
return status
try:
configure()
status = check_celery_availability()
USE_CELERY = status.get("availability") is not None

View file

@ -171,7 +171,7 @@ class DocumentLoaderFrontNode(FrontendNode):
self.template.add_field(
TemplateField(
field_type="dict",
required=True,
required=False,
show=True,
name="metadata",
value={},

View file

@ -2,12 +2,42 @@ from typing import Optional
from loguru import logger
from pathlib import Path
from rich.logging import RichHandler
import os
import orjson
import appdirs
def configure(log_level: str = "DEBUG", log_file: Optional[Path] = None):
log_format = "<green>{time:HH:mm:ss}</green> - <level>{level: <8}</level> - <level>{message}</level>"
VALID_LOG_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
def serialize(record):
subset = {
"timestamp": record["time"].timestamp(),
"message": record["message"],
"level": record["level"].name,
"module": record["module"],
}
return orjson.dumps(subset)
def patching(record):
record["extra"]["serialized"] = serialize(record)
def configure(log_level: Optional[str] = None, log_file: Optional[Path] = None):
if os.getenv("LANGFLOW_LOG_LEVEL") in VALID_LOG_LEVELS and log_level is None:
log_level = os.getenv("LANGFLOW_LOG_LEVEL")
if log_level is None:
log_level = "INFO"
# Human-readable
log_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> - <level>"
"{level: <8}</level> - {module} - <level>{message}</level>"
)
# log_format = log_format_dev if log_level.upper() == "DEBUG" else log_format_prod
logger.remove() # Remove default handlers
logger.patch(patching)
# Configure loguru to use RichHandler
logger.configure(
handlers=[
@ -19,17 +49,21 @@ def configure(log_level: str = "DEBUG", log_file: Optional[Path] = None):
]
)
if log_file:
log_file = Path(log_file)
log_file.parent.mkdir(parents=True, exist_ok=True)
if not log_file:
cache_dir = Path(appdirs.user_cache_dir("langflow"))
log_file = cache_dir / "langflow.log"
logger.add(
sink=str(log_file),
level=log_level.upper(),
format=log_format,
rotation="10 MB", # Log rotation based on file size
)
log_file = Path(log_file)
log_file.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Logger set up with log level: {log_level}")
logger.add(
sink=str(log_file),
level=log_level.upper(),
format=log_format,
rotation="10 MB", # Log rotation based on file size
serialize=True,
)
logger.debug(f"Logger set up with log level: {log_level}")
if log_file:
logger.info(f"Log file: {log_file}")

View file

@ -107,7 +107,6 @@ export default function GenericNode({
setValidationStatus(null);
}
}, [sseData, data.id]);
return (
<>
<NodeToolbar>

View file

@ -37,7 +37,10 @@ export default function BuildTrigger({
if (isBuilding) {
return;
}
const errors = validateNodes(reactFlowInstance!);
const errors = validateNodes(
reactFlowInstance!.getNodes(),
reactFlowInstance!.getEdges()
);
if (errors.length > 0) {
setErrorData({
title: "Oops! Looks like you missed something",

View file

@ -256,7 +256,7 @@ const EditNodeModal = forwardRef(
</div>
) : myData.current.node?.template[templateParam]
.type === "dict" ? (
<div className="mt-2 w-full max-h-48 overflow-auto custom-scroll">
<div className="mt-2 max-h-48 w-full overflow-auto custom-scroll">
<KeypairListComponent
disabled={disabled}
editNode={true}

View file

@ -356,7 +356,10 @@ export default function FormModal({
}, [open]);
function sendMessage(): void {
let nodeValidationErrors = validateNodes(reactFlowInstance!);
let nodeValidationErrors = validateNodes(
reactFlowInstance!.getNodes(),
reactFlowInstance!.getEdges()
);
if (nodeValidationErrors.length === 0) {
setLockChat(true);
let inputs = tabsState[id.current].formKeysData.input_keys;

View file

@ -62,7 +62,7 @@ export default function NodeToolbarComponent({
setShowNode((prev) => !prev);
updateNodeInternals(data.id);
}
if(event.includes("disabled")){
if (event.includes("disabled")) {
return;
}
};
@ -156,8 +156,20 @@ export default function NodeToolbarComponent({
</SelectTrigger>
</ShadTooltip>
<SelectContent>
<SelectItem value={getRandomKeyByssmm() + (nodeLength==0?"disabled":"advanced")}>
<div className={"flex "+(nodeLength==0?"text-muted-foreground":"text-primary")}>
<SelectItem
value={
getRandomKeyByssmm() +
(nodeLength == 0 ? "disabled" : "advanced")
}
>
<div
className={
"flex " +
(nodeLength == 0
? "text-muted-foreground"
: "text-primary")
}
>
<IconComponent
name="Settings2"
className="relative top-0.5 mr-2 h-4 w-4"
@ -182,7 +194,7 @@ export default function NodeToolbarComponent({
<ShadTooltip content="Edit" side="top">
<div>
<button
disabled={nodeLength === 0}
disabled={nodeLength === 0}
onClick={() => setShowModalAdvanced(true)}
className={classNames(
"relative -ml-px inline-flex items-center rounded-r-md bg-background px-2 py-2 text-foreground shadow-md ring-1 ring-inset ring-ring transition-all duration-500 ease-in-out hover:bg-muted focus:z-10" +

View file

@ -41,13 +41,13 @@ export default function HomePage(): JSX.Element {
const dragOver = (e) => {
e.preventDefault();
if(e.dataTransfer.types.some((types) => types === "Files")){
if (e.dataTransfer.types.some((types) => types === "Files")) {
setIsDragging(true);
}
};
const dragEnter = (e) => {
if(e.dataTransfer.types.some((types) => types === "Files")){
if (e.dataTransfer.types.some((types) => types === "Files")) {
setIsDragging(true);
}
e.preventDefault();

View file

@ -134,18 +134,18 @@ export type TooltipComponentType = {
children: ReactElement;
title: string | ReactElement;
placement?:
| "bottom-end"
| "bottom-start"
| "bottom"
| "left-end"
| "left-start"
| "left"
| "right-end"
| "right-start"
| "right"
| "top-end"
| "top-start"
| "top";
| "bottom-end"
| "bottom-start"
| "bottom"
| "left-end"
| "left-start"
| "left"
| "right-end"
| "right-start"
| "right"
| "top-end"
| "top-start"
| "top";
};
export type ProgressBarType = {

View file

@ -189,10 +189,7 @@ export function buildTweaks(flow: FlowType) {
}, {});
}
export function validateNode(
node: NodeType,
reactFlowInstance: ReactFlowInstance
): Array<string> {
export function validateNode(node: NodeType, edges: Edge[]): Array<string> {
if (!node.data?.node?.template || !Object.keys(node.data.node.template)) {
return [
"We've noticed a potential issue with a node in the flow. Please review it and, if necessary, submit a bug report with your exported flow file. Thank you for your help!",
@ -211,13 +208,11 @@ export function validateNode(
(template[t].value === undefined ||
template[t].value === null ||
template[t].value === "") &&
!reactFlowInstance
.getEdges()
.some(
(edge) =>
edge.targetHandle?.split("|")[1] === t &&
edge.targetHandle.split("|")[2] === node.id
)
!edges.some(
(edge) =>
edge.targetHandle?.split("|")[1] === t &&
edge.targetHandle.split("|")[2] === node.id
)
) {
errors.push(
`${type} is missing ${
@ -249,15 +244,13 @@ export function validateNode(
}, [] as string[]);
}
export function validateNodes(reactFlowInstance: ReactFlowInstance) {
if (reactFlowInstance.getNodes().length === 0) {
export function validateNodes(nodes: Node[], edges: Edge[]) {
if (nodes.length === 0) {
return [
"No nodes found in the flow. Please add at least one node to the flow.",
];
}
return reactFlowInstance
.getNodes()
.flatMap((n: NodeType) => validateNode(n, reactFlowInstance));
return nodes.flatMap((n: NodeType) => validateNode(n, edges));
}
export function addVersionToDuplicates(flow: FlowType, flows: FlowType[]) {
@ -311,15 +304,20 @@ export function getConnectedNodes(
return nodes.filter((node) => node.id === targetId || node.id === sourceId);
}
export function convertObjToArray(singleObject) {
export function convertObjToArray(singleObject: object | string) {
if (typeof singleObject === "string") {
singleObject = JSON.parse(singleObject);
}
if (Array.isArray(singleObject)) return singleObject;
let arrConverted: any = [];
for (const key in singleObject) {
if (singleObject.hasOwnProperty(key)) {
const newObj = {};
newObj[key] = singleObject[key];
arrConverted.push(newObj);
let arrConverted: any[] = [];
if (typeof singleObject === "object") {
for (const key in singleObject) {
if (Object.prototype.hasOwnProperty.call(singleObject, key)) {
const newObj = {};
newObj[key] = singleObject[key];
arrConverted.push(newObj);
}
}
}
return arrConverted;

View file

@ -19,7 +19,7 @@ class NameTest(FastHttpUser):
while True:
with self.rest(
"GET",
f"/task/{task_id}/status",
f"/task/{task_id}",
name="task_status",
headers=self.headers,
) as response:

View file

@ -25,10 +25,10 @@ def run_post(client, flow_id, headers, post_data):
# Helper function to poll task status
def poll_task_status(client, headers, task_id, max_attempts=20, sleep_time=1):
def poll_task_status(client, headers, href, max_attempts=20, sleep_time=1):
for _ in range(max_attempts):
task_status_response = client.get(
f"api/v1/task/{task_id}/status",
href,
headers=headers,
)
if (
@ -544,11 +544,15 @@ def test_async_task_processing(distributed_client, added_flow, created_api_key):
assert response.status_code == 200, response.json()
# Extract the task ID from the response
task_id = response.json().get("id")
task = response.json().get("task")
task_id = task.get("id")
task_href = task.get("href")
assert task_id is not None
assert task_href is not None
assert task_href == f"api/v1/task/{task_id}"
# Polling the task status using the helper function
task_status_json = poll_task_status(distributed_client, headers, task_id)
task_status_json = poll_task_status(distributed_client, headers, task_href)
assert task_status_json is not None, "Task did not complete in time"
# Validate that the task completed successfully and the result is as expected
@ -576,11 +580,15 @@ def test_async_task_processing_vector_store(
assert "FAILURE" not in response.json()["result"]
# Extract the task ID from the response
task_id = response.json().get("id")
task = response.json().get("task")
task_id = task.get("id")
task_href = task.get("href")
assert task_id is not None
assert task_href is not None
assert task_href == f"api/v1/task/{task_id}"
# Polling the task status using the helper function
task_status_json = poll_task_status(client, headers, task_id, max_attempts=40)
task_status_json = poll_task_status(client, headers, task_href)
assert task_status_json is not None, "Task did not complete in time"
# Validate that the task completed successfully and the result is as expected