diff --git a/Makefile b/Makefile index 32198730f..1fbd7403c 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ coverage: tests: @make install_backend - poetry run pytest tests -n auto + poetry run pytest tests format: poetry run black . diff --git a/deploy/docker-compose.override.yml b/deploy/docker-compose.override.yml index e5c0d9dd7..1992916fb 100644 --- a/deploy/docker-compose.override.yml +++ b/deploy/docker-compose.override.yml @@ -1,4 +1,5 @@ version: "3.8" + services: proxy: ports: diff --git a/deploy/docker-compose.test.yml b/deploy/docker-compose.test.yml deleted file mode 100644 index 829aae883..000000000 --- a/deploy/docker-compose.test.yml +++ /dev/null @@ -1,85 +0,0 @@ -version: "3.8" -services: - proxy: - ports: - - "80:80" - - "8090:8080" - command: - # Enable Docker in Traefik, so that it reads labels from Docker services - - --providers.docker - # Add a constraint to only use services with the label for this stack - # from the env var TRAEFIK_TAG - - --providers.docker.constraints=Label(`traefik.constraint-label-stack`, `${TRAEFIK_TAG?Variable not set}`) - # Do not expose all Docker services, only the ones explicitly exposed - - --providers.docker.exposedbydefault=false - # Disable Docker Swarm mode for local development - # - --providers.docker.swarmmode - # Enable the access log, with HTTP requests - - --accesslog - # Enable the Traefik log, for configurations and errors - - --log - # Enable the Dashboard and API - - --api - # Enable the Dashboard and API in insecure mode for local development - - --api.insecure=true - labels: - - traefik.enable=true - - traefik.http.routers.${STACK_NAME?Variable not set}-traefik-public-http.rule=Host(`${DOMAIN?Variable not set}`) - - traefik.http.services.${STACK_NAME?Variable not set}-traefik-public.loadbalancer.server.port=80 - - result_backend: - ports: - - "6379:6379" - - pgadmin: - ports: - - "5050:5050" - - flower: - ports: - - "5555:5555" - - backend: - labels: - - traefik.enable=true - - traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set} - - traefik.http.routers.${STACK_NAME?Variable not set}-backend-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`) - - traefik.http.services.${STACK_NAME?Variable not set}-backend.loadbalancer.server.port=7860 - - frontend: - labels: - - traefik.enable=true - - traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set} - - traefik.http.routers.${STACK_NAME?Variable not set}-frontend-http.rule=PathPrefix(`/`) - - traefik.http.services.${STACK_NAME?Variable not set}-frontend.loadbalancer.server.port=80 - - celeryworker: - labels: - - traefik.enable=true - - traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set} - - traefik.http.routers.${STACK_NAME?Variable not set}-celeryworker-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`) - - traefik.http.services.${STACK_NAME?Variable not set}-celeryworker.loadbalancer.server.port=7860 - - tests: - extends: - file: docker-compose.yml - service: backend - env_file: - - .env - build: - context: ../ - dockerfile: base.Dockerfile - command: pytest -vv - healthcheck: - test: "exit 0" - # override deploy labels to avoid conflicts with the backend service - labels: - - traefik.enable=true - - traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set} - - traefik.http.routers.${STACK_NAME?Variable not set}-tests-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`) - - traefik.http.services.${STACK_NAME?Variable not set}-tests.loadbalancer.server.port=7861 - -networks: - traefik-public: - # For local dev, don't expect an external Traefik network - external: false diff --git a/deploy/docker-compose.with_tests.yml b/deploy/docker-compose.with_tests.yml new file mode 100644 index 000000000..c16c14197 --- /dev/null +++ b/deploy/docker-compose.with_tests.yml @@ -0,0 +1,277 @@ +version: "3.8" + +services: + proxy: + image: traefik:v3.0 + env_file: + - .env + networks: + - ${TRAEFIK_PUBLIC_NETWORK?Variable not set} + - default + volumes: + - /var/run/docker.sock:/var/run/docker.sock + command: + # Enable Docker in Traefik, so that it reads labels from Docker services + - --providers.docker + # Add a constraint to only use services with the label for this stack + # from the env var TRAEFIK_TAG + - --providers.docker.constraints=Label(`traefik.constraint-label-stack`, `${TRAEFIK_TAG?Variable not set}`) + # Do not expose all Docker services, only the ones explicitly exposed + - --providers.docker.exposedbydefault=false + # Enable the access log, with HTTP requests + - --accesslog + # Enable the Traefik log, for configurations and errors + - --log + # Enable the Dashboard and API + - --api + deploy: + placement: + constraints: + - node.role == manager + labels: + # Enable Traefik for this service, to make it available in the public network + - traefik.enable=true + # Use the traefik-public network (declared below) + - traefik.docker.network=${TRAEFIK_PUBLIC_NETWORK?Variable not set} + # Use the custom label "traefik.constraint-label=traefik-public" + # This public Traefik will only use services with this label + - traefik.constraint-label=${TRAEFIK_PUBLIC_TAG?Variable not set} + # traefik-http set up only to use the middleware to redirect to https + - traefik.http.middlewares.${STACK_NAME?Variable not set}-https-redirect.redirectscheme.scheme=https + - traefik.http.middlewares.${STACK_NAME?Variable not set}-https-redirect.redirectscheme.permanent=true + # Handle host with and without "www" to redirect to only one of them + # Uses environment variable DOMAIN + # To disable www redirection remove the Host() you want to discard, here and + # below for HTTPS + - traefik.http.routers.${STACK_NAME?Variable not set}-proxy-http.rule=Host(`${DOMAIN?Variable not set}`) || Host(`www.${DOMAIN?Variable not set}`) + - traefik.http.routers.${STACK_NAME?Variable not set}-proxy-http.entrypoints=http + # traefik-https the actual router using HTTPS + - traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.rule=Host(`${DOMAIN?Variable not set}`) || Host(`www.${DOMAIN?Variable not set}`) + - traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.entrypoints=https + - traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.tls=true + # Use the "le" (Let's Encrypt) resolver created below + - traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.tls.certresolver=le + # Define the port inside of the Docker service to use + - traefik.http.services.${STACK_NAME?Variable not set}-proxy.loadbalancer.server.port=80 + # Handle domain with and without "www" to redirect to only one + # To disable www redirection remove the next line + - traefik.http.middlewares.${STACK_NAME?Variable not set}-www-redirect.redirectregex.regex=^https?://(www.)?(${DOMAIN?Variable not set})/(.*) + # Redirect a domain with www to non-www + # To disable it remove the next line + - traefik.http.middlewares.${STACK_NAME?Variable not set}-www-redirect.redirectregex.replacement=https://${DOMAIN?Variable not set}/$${3} + # Redirect a domain without www to www + # To enable it remove the previous line and uncomment the next + # - traefik.http.middlewares.${STACK_NAME}-www-redirect.redirectregex.replacement=https://www.${DOMAIN}/$${3} + # Middleware to redirect www, to disable it remove the next line + - traefik.http.routers.${STACK_NAME?Variable not set}-proxy-https.middlewares=${STACK_NAME?Variable not set}-www-redirect + # Middleware to redirect www, and redirect HTTP to HTTPS + # to disable www redirection remove the section: ${STACK_NAME?Variable not set}-www-redirect, + - traefik.http.routers.${STACK_NAME?Variable not set}-proxy-http.middlewares=${STACK_NAME?Variable not set}-www-redirect,${STACK_NAME?Variable not set}-https-redirect + + backend: &backend + image: "ogabrielluiz/langflow:latest" + build: + context: ../ + dockerfile: base.Dockerfile + depends_on: + - db + - broker + - result_backend + env_file: + - .env + volumes: + - ../:/app + - ./startup-backend.sh:/startup-backend.sh # Ensure the paths match + command: /startup-backend.sh # Fixed the path + healthcheck: + test: "exit 0" + deploy: + labels: + - traefik.enable=true + - traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set} + - traefik.http.routers.${STACK_NAME?Variable not set}-backend-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`) + - traefik.http.services.${STACK_NAME?Variable not set}-backend.loadbalancer.server.port=7860 + + db: + image: postgres:15.4 + volumes: + - app-db-data:/var/lib/postgresql/data/pgdata + environment: + - PGDATA=/var/lib/postgresql/data/pgdata + deploy: + placement: + constraints: + - node.labels.app-db-data == true + healthcheck: + test: "exit 0" + env_file: + - .env + + pgadmin: + image: dpage/pgadmin4 + networks: + - ${TRAEFIK_PUBLIC_NETWORK?Variable not set} + - default + volumes: + - pgadmin-data:/var/lib/pgadmin + env_file: + - .env + deploy: + labels: + - traefik.enable=true + - traefik.docker.network=${TRAEFIK_PUBLIC_NETWORK?Variable not set} + - traefik.constraint-label=${TRAEFIK_PUBLIC_TAG?Variable not set} + - traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-http.rule=Host(`pgadmin.${DOMAIN?Variable not set}`) + - traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-http.entrypoints=http + - traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-http.middlewares=${STACK_NAME?Variable not set}-https-redirect + - traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-https.rule=Host(`pgadmin.${DOMAIN?Variable not set}`) + - traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-https.entrypoints=https + - traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-https.tls=true + - traefik.http.routers.${STACK_NAME?Variable not set}-pgadmin-https.tls.certresolver=le + - traefik.http.services.${STACK_NAME?Variable not set}-pgadmin.loadbalancer.server.port=5050 + + result_backend: + image: redis:6.2.5 + env_file: + - .env + # ports: + # - 6379:6379 + healthcheck: + test: "exit 0" + + celeryworker: + <<: *backend + env_file: + - .env + build: + context: ../ + dockerfile: base.Dockerfile + command: celery -A langflow.worker.celery_app worker --loglevel=INFO --concurrency=1 -n lf-worker@%h + healthcheck: + test: "exit 0" + deploy: + replicas: 1 + + flower: + <<: *backend + env_file: + - .env + networks: + - default + build: + context: ../ + dockerfile: base.Dockerfile + environment: + - FLOWER_PORT=5555 + + command: /bin/sh -c "celery -A langflow.worker.celery_app --broker=${BROKER_URL?Variable not set} flower --port=5555" + deploy: + labels: + - traefik.enable=true + - traefik.docker.network=${TRAEFIK_PUBLIC_NETWORK?Variable not set} + - traefik.constraint-label=${TRAEFIK_PUBLIC_TAG?Variable not set} + - traefik.http.routers.${STACK_NAME?Variable not set}-flower-http.rule=Host(`flower.${DOMAIN?Variable not set}`) + - traefik.http.routers.${STACK_NAME?Variable not set}-flower-http.entrypoints=http + - traefik.http.routers.${STACK_NAME?Variable not set}-flower-http.middlewares=${STACK_NAME?Variable not set}-https-redirect + - traefik.http.routers.${STACK_NAME?Variable not set}-flower-https.rule=Host(`flower.${DOMAIN?Variable not set}`) + - traefik.http.routers.${STACK_NAME?Variable not set}-flower-https.entrypoints=https + - traefik.http.routers.${STACK_NAME?Variable not set}-flower-https.tls=true + - traefik.http.routers.${STACK_NAME?Variable not set}-flower-https.tls.certresolver=le + - traefik.http.services.${STACK_NAME?Variable not set}-flower.loadbalancer.server.port=5555 + + frontend: + image: "ogabrielluiz/langflow_frontend:latest" + env_file: + - .env + # user: your-non-root-user + build: + context: ../src/frontend + dockerfile: Dockerfile + args: + - BACKEND_URL=http://backend:7860 + restart: on-failure + deploy: + labels: + - traefik.enable=true + - traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set} + - traefik.http.routers.${STACK_NAME?Variable not set}-frontend-http.rule=PathPrefix(`/`) + - traefik.http.services.${STACK_NAME?Variable not set}-frontend.loadbalancer.server.port=80 + + broker: + # RabbitMQ management console + image: rabbitmq:3-management + environment: + - RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER:-admin} + - RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS:-admin} + volumes: + - rabbitmq_data:/etc/rabbitmq/ + - rabbitmq_data:/var/lib/rabbitmq/ + - rabbitmq_log:/var/log/rabbitmq/ + ports: + - 5672:5672 + - 15672:15672 + + prometheus: + image: prom/prometheus:v2.37.9 + env_file: + - .env + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + command: + - "--config.file=/etc/prometheus/prometheus.yml" + # ports: + # - 9090:9090 + healthcheck: + test: "exit 0" + deploy: + labels: + - traefik.enable=true + - traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set} + - traefik.http.routers.${STACK_NAME?Variable not set}-prometheus-http.rule=PathPrefix(`/metrics`) + - traefik.http.services.${STACK_NAME?Variable not set}-prometheus.loadbalancer.server.port=9090 + + grafana: + image: grafana/grafana:8.2.6 + env_file: + - .env + # ports: + # - 3000:3000 + volumes: + - grafana_data:/var/lib/grafana + deploy: + labels: + - traefik.enable=true + - traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set} + - traefik.http.routers.${STACK_NAME?Variable not set}-grafana-http.rule=PathPrefix(`/grafana`) + - traefik.http.services.${STACK_NAME?Variable not set}-grafana.loadbalancer.server.port=3000 + + tests: + extends: + file: docker-compose.yml + service: backend + env_file: + - .env + build: + context: ../ + dockerfile: base.Dockerfile + command: pytest -vv + healthcheck: + test: "exit 0" + # override deploy labels to avoid conflicts with the backend service + labels: + - traefik.enable=true + - traefik.constraint-label-stack=${TRAEFIK_TAG?Variable not set} + - traefik.http.routers.${STACK_NAME?Variable not set}-tests-http.rule=PathPrefix(`/api/v1`) || PathPrefix(`/docs`) || PathPrefix(`/health`) + - traefik.http.services.${STACK_NAME?Variable not set}-tests.loadbalancer.server.port=7861 + +volumes: + grafana_data: + app-db-data: + rabbitmq_data: + rabbitmq_log: + pgadmin-data: + +networks: + traefik-public: + # Allow setting it to false for testing + external: false # ${TRAEFIK_PUBLIC_NETWORK_IS_EXTERNAL-true} diff --git a/docs/docs/guidelines/async-api.mdx b/docs/docs/guidelines/async-api.mdx index 582a5e1c8..c5473812e 100644 --- a/docs/docs/guidelines/async-api.mdx +++ b/docs/docs/guidelines/async-api.mdx @@ -27,13 +27,29 @@ curl -X POST \ -d '{"inputs": {"text": ""}, "tweaks": {}, "sync": false}' ``` +Response: + +```json +{ + "result": { + "output": "..." + }, + "task": { + "id": "...", + "href": "api/v1/task/" + }, + "session_id": "...", + "backend": "..." // celery or anyio +} +``` + ## Checking Task Status -You can check the status of an asynchronous task by making a GET request to the `/task/{task_id}/status` endpoint. +You can check the status of an asynchronous task by making a GET request to the `/task/{task_id}` endpoint. ```bash curl -X GET \ - http://localhost:3000/api/v1/task//status \ + http://localhost:3000/api/v1/task/ \ -H 'x-api-key: ' ``` diff --git a/docs/docs/guides/async-tasks.mdx b/docs/docs/guides/async-tasks.mdx index 5cccc8675..e865fe4b6 100644 --- a/docs/docs/guides/async-tasks.mdx +++ b/docs/docs/guides/async-tasks.mdx @@ -40,5 +40,5 @@ This will set up the following containers: To run the tests for the Async API, you can run the following command: ```bash -docker compose -f docker-compose.test.yml up --exit-code-from tests tests result_backend broker celeryworker db --build +docker compose -f docker-compose.with_tests.yml up --exit-code-from tests tests result_backend broker celeryworker db --build ``` diff --git a/src/backend/langflow/__main__.py b/src/backend/langflow/__main__.py index 6701d27bc..f42f44a9e 100644 --- a/src/backend/langflow/__main__.py +++ b/src/backend/langflow/__main__.py @@ -23,7 +23,7 @@ from rich.table import Table console = Console() -app = typer.Typer() +app = typer.Typer(no_args_is_help=True) def get_number_of_workers(workers=None): @@ -141,7 +141,7 @@ def run( ), ): """ - Run the Langflow server. + Run the Langflow. """ # override env variables with .env file if env_file: @@ -299,7 +299,14 @@ def superuser( password: str = typer.Option( ..., prompt=True, hide_input=True, help="Password for the superuser." ), + log_level: str = typer.Option( + "critical", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL" + ), ): + """ + Create a superuser. + """ + configure(log_level=log_level) initialize_services() db_service = get_db_service() with session_getter(db_service) as session: @@ -321,7 +328,10 @@ def superuser( @app.command() -def migration(test: bool = typer.Option(False, help="Run migrations in test mode.")): +def migration(test: bool = typer.Option(True, help="Run migrations in test mode.")): + """ + Run or test migrations. + """ initialize_services() db_service = get_db_service() if not test: diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index a7a50a39f..c25c4bde2 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -13,6 +13,7 @@ from langflow.api.v1.schemas import BuildStatus, BuiltResponse, InitResponse, St from langflow.graph.graph.base import Graph from langflow.services.auth.utils import get_current_active_user, get_current_user +from langflow.services.cache.utils import update_build_status from loguru import logger from langflow.services.getters import get_chat_service, get_session, get_cache_service from sqlmodel import Session @@ -157,7 +158,7 @@ async def stream_build( graph = Graph.from_payload(graph_data) number_of_nodes = len(graph.nodes) - cache_service[flow_id]["status"] = BuildStatus.IN_PROGRESS + update_build_status(cache_service, flow_id, BuildStatus.IN_PROGRESS) for i, vertex in enumerate(graph.generator_build(), 1): try: @@ -184,7 +185,7 @@ async def stream_build( logger.exception(exc) params = str(exc) valid = False - cache_service[flow_id]["status"] = BuildStatus.FAILURE + update_build_status(cache_service, flow_id, BuildStatus.FAILURE) response = { "valid": valid, @@ -211,11 +212,12 @@ async def stream_build( chat_service.set_cache(flow_id, langchain_object) # We need to reset the chat history chat_service.chat_history.empty_history(flow_id) - cache_service[flow_id]["status"] = BuildStatus.SUCCESS + update_build_status(cache_service, flow_id, BuildStatus.SUCCESS) except Exception as exc: logger.exception(exc) logger.error("Error while building the flow: %s", exc) - cache_service[flow_id]["status"] = BuildStatus.FAILURE + + update_build_status(cache_service, flow_id, BuildStatus.FAILURE) yield str(StreamData(event="error", data={"error": str(exc)})) finally: yield str(StreamData(event="message", data=final_response)) diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index ff5e0541e..864467f9b 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -20,6 +20,7 @@ from langflow.interface.custom.custom_component import CustomComponent from langflow.api.v1.schemas import ( ProcessResponse, + TaskResponse, TaskStatusResponse, UploadFileResponse, CustomComponentCode, @@ -145,9 +146,15 @@ async def process_flow( session_id, ) task_result = task.status + + if task_id: + task_response = TaskResponse(id=task_id, href=f"api/v1/task/{task_id}") + else: + task_response = None + return ProcessResponse( result=task_result, - id=task_id, + task=task_response, session_id=session_id, backend=str(type(task_service.backend)), ) @@ -173,7 +180,7 @@ async def process_flow( raise HTTPException(status_code=500, detail=str(e)) from e -@router.get("/task/{task_id}/status", response_model=TaskStatusResponse) +@router.get("/task/{task_id}", response_model=TaskStatusResponse) async def get_task_status(task_id: str): task_service = get_task_service() task = task_service.get_task(task_id) diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 519bc534e..9c6ac6d60 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -47,11 +47,18 @@ class UpdateTemplateRequest(BaseModel): template: dict +class TaskResponse(BaseModel): + """Task response schema.""" + + id: Optional[str] = Field(None) + href: Optional[str] = Field(None) + + class ProcessResponse(BaseModel): """Process response schema.""" result: Any - id: Optional[str] = None + task: Optional[TaskResponse] = None session_id: Optional[str] = None backend: Optional[str] = None diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index 3877e59b1..51fa8d815 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -185,9 +185,10 @@ class Vertex: # Load the type in value.get('suffixes') using # what is inside value.get('content') # value.get('value') is the file name - file_path = value.get("file_path") - - params[key] = file_path + if file_path := value.get("file_path"): + params[key] = file_path + else: + raise ValueError(f"File path not found for {self.vertex_type}") elif value.get("type") in DIRECT_TYPES and params.get(key) is None: if value.get("type") == "code": try: diff --git a/src/backend/langflow/interface/custom/custom_component.py b/src/backend/langflow/interface/custom/custom_component.py index b0c4f8752..ccc0b08b2 100644 --- a/src/backend/langflow/interface/custom/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component.py @@ -102,7 +102,10 @@ class CustomComponent(Component, extra=Extra.allow): status_code=400, detail={ "error": "Type hint Error", - "traceback": "Prompt type is not supported in the build method. Try using PromptTemplate instead.", + "traceback": ( + "Prompt type is not supported in the build method." + " Try using PromptTemplate instead." + ), }, ) return args diff --git a/src/backend/langflow/interface/importing/utils.py b/src/backend/langflow/interface/importing/utils.py index d07222dd1..000eed205 100644 --- a/src/backend/langflow/interface/importing/utils.py +++ b/src/backend/langflow/interface/importing/utils.py @@ -144,6 +144,8 @@ def import_chain(chain: str) -> Type[Chain]: if chain in CUSTOM_CHAINS: return CUSTOM_CHAINS[chain] + if chain == "SQLDatabaseChain": + return import_class("langchain_experimental.sql.SQLDatabaseChain") return import_class(f"langchain.chains.{chain}") diff --git a/src/backend/langflow/services/cache/manager.py b/src/backend/langflow/services/cache/manager.py index 19f5034cf..da76a2b5c 100644 --- a/src/backend/langflow/services/cache/manager.py +++ b/src/backend/langflow/services/cache/manager.py @@ -74,13 +74,17 @@ class InMemoryCache(BaseCacheService, Service): ): # Move the key to the end to make it recently used self._cache.move_to_end(key) - unpickled = pickle.loads(item["value"]) - return unpickled + # Check if the value is pickled + if isinstance(item["value"], bytes): + value = pickle.loads(item["value"]) + else: + value = item["value"] + return value else: self.delete(key) return None - def set(self, key, value): + def set(self, key, value, pickle=False): """ Add an item to the cache. @@ -98,8 +102,10 @@ class InMemoryCache(BaseCacheService, Service): # Remove least recently used item self._cache.popitem(last=False) # pickle locally to mimic Redis - pickled = pickle.dumps(value) - self._cache[key] = {"value": pickled, "time": time.time()} + if pickle: + value = pickle.dumps(value) + + self._cache[key] = {"value": value, "time": time.time()} def upsert(self, key, value): """ diff --git a/src/backend/langflow/services/cache/utils.py b/src/backend/langflow/services/cache/utils.py index 83cde2240..bd6b4fb0a 100644 --- a/src/backend/langflow/services/cache/utils.py +++ b/src/backend/langflow/services/cache/utils.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Dict from appdirs import user_cache_dir from fastapi import UploadFile +from langflow.api.v1.schemas import BuildStatus from langflow.services.database.models.base import orjson_dumps if TYPE_CHECKING: @@ -202,3 +203,11 @@ def save_uploaded_file(file: UploadFile, folder_name): new_file.write(chunk) return file_path + + +def update_build_status(cache_service, flow_id: str, status: BuildStatus): + cached_flow = cache_service[flow_id] + if cached_flow is None: + raise ValueError(f"Flow {flow_id} not found in cache") + cached_flow["status"] = status + cache_service[flow_id] = cached_flow diff --git a/src/backend/langflow/services/database/manager.py b/src/backend/langflow/services/database/manager.py index ca7f34d10..3a388f694 100644 --- a/src/backend/langflow/services/database/manager.py +++ b/src/backend/langflow/services/database/manager.py @@ -94,9 +94,7 @@ class DatabaseService(Service): return True def run_migrations(self): - logger.info( - f"Running DB migrations in {self.script_location} on {self.database_url}" - ) + logger.info(f"Running DB migrations in {self.script_location}") alembic_cfg = Config() alembic_cfg.set_main_option("script_location", str(self.script_location)) alembic_cfg.set_main_option("sqlalchemy.url", self.database_url) diff --git a/src/backend/langflow/services/task/manager.py b/src/backend/langflow/services/task/manager.py index ab74d916e..422b34faa 100644 --- a/src/backend/langflow/services/task/manager.py +++ b/src/backend/langflow/services/task/manager.py @@ -1,4 +1,5 @@ from typing import Any, Callable, Coroutine, Union +from langflow.utils.logger import configure from loguru import logger from langflow.services.base import Service from langflow.services.task.backends.anyio import AnyIOBackend @@ -7,18 +8,19 @@ from langflow.services.task.utils import get_celery_worker_status def check_celery_availability(): - from langflow.worker import celery_app - try: + from langflow.worker import celery_app + status = get_celery_worker_status(celery_app) logger.debug(f"Celery status: {status}") - except Exception as e: - logger.error(f"An error occurred: {e}") + except Exception as exc: + logger.debug(f"Celery not available: {exc}") status = {"availability": None} return status try: + configure() status = check_celery_availability() USE_CELERY = status.get("availability") is not None diff --git a/src/backend/langflow/template/frontend_node/documentloaders.py b/src/backend/langflow/template/frontend_node/documentloaders.py index 8118593f3..0be2ebe98 100644 --- a/src/backend/langflow/template/frontend_node/documentloaders.py +++ b/src/backend/langflow/template/frontend_node/documentloaders.py @@ -171,7 +171,7 @@ class DocumentLoaderFrontNode(FrontendNode): self.template.add_field( TemplateField( field_type="dict", - required=True, + required=False, show=True, name="metadata", value={}, diff --git a/src/backend/langflow/utils/logger.py b/src/backend/langflow/utils/logger.py index 1f616486b..b08621410 100644 --- a/src/backend/langflow/utils/logger.py +++ b/src/backend/langflow/utils/logger.py @@ -2,12 +2,42 @@ from typing import Optional from loguru import logger from pathlib import Path from rich.logging import RichHandler +import os +import orjson +import appdirs -def configure(log_level: str = "DEBUG", log_file: Optional[Path] = None): - log_format = "{time:HH:mm:ss} - {level: <8} - {message}" +VALID_LOG_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + + +def serialize(record): + subset = { + "timestamp": record["time"].timestamp(), + "message": record["message"], + "level": record["level"].name, + "module": record["module"], + } + return orjson.dumps(subset) + + +def patching(record): + record["extra"]["serialized"] = serialize(record) + + +def configure(log_level: Optional[str] = None, log_file: Optional[Path] = None): + if os.getenv("LANGFLOW_LOG_LEVEL") in VALID_LOG_LEVELS and log_level is None: + log_level = os.getenv("LANGFLOW_LOG_LEVEL") + if log_level is None: + log_level = "INFO" + # Human-readable + log_format = ( + "{time:YYYY-MM-DD HH:mm:ss} - " + "{level: <8} - {module} - {message}" + ) + + # log_format = log_format_dev if log_level.upper() == "DEBUG" else log_format_prod logger.remove() # Remove default handlers - + logger.patch(patching) # Configure loguru to use RichHandler logger.configure( handlers=[ @@ -19,17 +49,21 @@ def configure(log_level: str = "DEBUG", log_file: Optional[Path] = None): ] ) - if log_file: - log_file = Path(log_file) - log_file.parent.mkdir(parents=True, exist_ok=True) + if not log_file: + cache_dir = Path(appdirs.user_cache_dir("langflow")) + log_file = cache_dir / "langflow.log" - logger.add( - sink=str(log_file), - level=log_level.upper(), - format=log_format, - rotation="10 MB", # Log rotation based on file size - ) + log_file = Path(log_file) + log_file.parent.mkdir(parents=True, exist_ok=True) - logger.info(f"Logger set up with log level: {log_level}") + logger.add( + sink=str(log_file), + level=log_level.upper(), + format=log_format, + rotation="10 MB", # Log rotation based on file size + serialize=True, + ) + + logger.debug(f"Logger set up with log level: {log_level}") if log_file: logger.info(f"Log file: {log_file}") diff --git a/src/frontend/src/CustomNodes/GenericNode/index.tsx b/src/frontend/src/CustomNodes/GenericNode/index.tsx index 5f3a99d47..524b54ead 100644 --- a/src/frontend/src/CustomNodes/GenericNode/index.tsx +++ b/src/frontend/src/CustomNodes/GenericNode/index.tsx @@ -107,7 +107,6 @@ export default function GenericNode({ setValidationStatus(null); } }, [sseData, data.id]); - return ( <> diff --git a/src/frontend/src/components/chatComponent/buildTrigger/index.tsx b/src/frontend/src/components/chatComponent/buildTrigger/index.tsx index 66bc0d371..ec15b6404 100644 --- a/src/frontend/src/components/chatComponent/buildTrigger/index.tsx +++ b/src/frontend/src/components/chatComponent/buildTrigger/index.tsx @@ -37,7 +37,10 @@ export default function BuildTrigger({ if (isBuilding) { return; } - const errors = validateNodes(reactFlowInstance!); + const errors = validateNodes( + reactFlowInstance!.getNodes(), + reactFlowInstance!.getEdges() + ); if (errors.length > 0) { setErrorData({ title: "Oops! Looks like you missed something", diff --git a/src/frontend/src/modals/EditNodeModal/index.tsx b/src/frontend/src/modals/EditNodeModal/index.tsx index 02753cc44..6d6c5cfda 100644 --- a/src/frontend/src/modals/EditNodeModal/index.tsx +++ b/src/frontend/src/modals/EditNodeModal/index.tsx @@ -256,7 +256,7 @@ const EditNodeModal = forwardRef( ) : myData.current.node?.template[templateParam] .type === "dict" ? ( -
+
!prev); updateNodeInternals(data.id); } - if(event.includes("disabled")){ + if (event.includes("disabled")) { return; } }; @@ -156,8 +156,20 @@ export default function NodeToolbarComponent({ - -
+ +