🐛 fix(flows.py): change json.loads to orjson.loads for improved performance and compatibility with orjson library

🐛 fix(schemas.py): change json.dumps to orjson_dumps for improved performance and compatibility with orjson library
🐛 fix(utils.py): change json.loads to orjson.loads for improved performance and compatibility with orjson library
🐛 fix(loading.py): change json.loads to orjson.loads for improved performance and compatibility with orjson library
🐛 fix(utils.py): change json.loads to orjson.loads for improved performance and compatibility with orjson library
🐛 fix(vector_store.py): change json.loads to orjson.loads for improved performance and compatibility with orjson library
🐛 fix(types.py): change json.loads to orjson.loads for improved performance and compatibility with orjson library
🐛 fix(process.py): change json.loads to orjson.loads for improved performance and compatibility with orjson library
 feat(server.ts): change port variable case from lowercase port to uppercase PORT to improve semantics
 feat(server.ts): add support for process.env.PORT environment variable to be able to run app on a configurable port

🔧 fix(base.py): import orjson instead of json to improve performance and compatibility
🔧 fix(frontend_node/llms.py): use orjson_dumps instead of json.dumps to improve performance and compatibility
🔧 fix(frontend_node/utilities.py): use orjson_dumps instead of json.dumps to improve performance and compatibility
🔧 fix(test_cache.py): import orjson and use orjson_dumps instead of json.dumps to improve performance and compatibility

🔧 fix(test_database.py): import correct json encoder and decoder functions to fix import errors
🔧 fix(test_database.py): replace json.dumps and json.loads with orjson_dumps and orjson.loads for better performance and compatibility
🔧 fix(test_loading.py): remove unused import statement
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-22 11:04:41 -03:00
commit d0aa3261f1
18 changed files with 291 additions and 55 deletions

View file

@ -1,5 +1,6 @@
from typing import List
from uuid import UUID
from fastapi.encoders import jsonable_encoder
from langflow.settings import settings
from langflow.api.utils import remove_api_keys
from langflow.api.v1.schemas import FlowListCreate, FlowListRead
@ -11,12 +12,11 @@ from langflow.database.models.flow import (
FlowUpdate,
)
from langflow.database.base import get_session
import orjson
from sqlmodel import Session, select
from fastapi import APIRouter, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from fastapi import File, UploadFile
import json
# build router
router = APIRouter(prefix="/flows", tags=["Flows"])
@ -105,7 +105,7 @@ async def upload_file(
):
"""Upload flows from a file."""
contents = await file.read()
data = json.loads(contents)
data = orjson.loads(contents)
if "flows" in data:
flow_list = FlowListCreate(**data)
else:

View file

@ -1,9 +1,9 @@
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from langflow.database.models.base import orjson_dumps
from langflow.database.models.flow import FlowCreate, FlowRead
from pydantic import BaseModel, Field, validator
import json
class BuildStatus(Enum):
@ -115,7 +115,7 @@ class StreamData(BaseModel):
data: dict
def __str__(self) -> str:
return f"event: {self.event}\ndata: {json.dumps(self.data)}\n\n"
return f"event: {self.event}\ndata: {orjson_dumps(self.data)}\n\n"
class CustomComponentCode(BaseModel):

View file

@ -2,13 +2,13 @@ import base64
import contextlib
import functools
import hashlib
import json
import os
import tempfile
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict
from appdirs import user_cache_dir
from langflow.database.models.base import orjson_dumps
CACHE: Dict[str, Any] = {}
@ -76,7 +76,8 @@ def clear_old_cache_files(max_cache_size: int = 3):
def compute_dict_hash(graph_data):
graph_data = filter_json(graph_data)
cleaned_graph_json = json.dumps(graph_data, sort_keys=True)
cleaned_graph_json = orjson_dumps(graph_data, sort_keys=True)
return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest()

View file

@ -9,7 +9,6 @@ from langflow.utils.logger import logger
import asyncio
import json
from typing import Any, Dict, List
from langflow.cache.flow import InMemoryCache
@ -186,7 +185,7 @@ class ChatManager:
while True:
json_payload = await websocket.receive_json()
try:
payload = json.loads(json_payload)
payload = orjson.loads(json_payload)
except TypeError:
payload = json_payload
if "clear_history" in payload:

View file

@ -2,9 +2,20 @@ from sqlmodel import SQLModel
import orjson
def orjson_dumps(v, *, default):
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
return orjson.dumps(v, default=default).decode()
def orjson_dumps(v, *, default=None, sort_keys=False, indent_2=True):
option = orjson.OPT_SORT_KEYS if sort_keys else None
if indent_2:
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
# option
# To modify how data is serialized, specify option. Each option is an integer constant in orjson.
# To specify multiple options, mask them together, e.g., option=orjson.OPT_STRICT_INTEGER | orjson.OPT_NAIVE_UTC
if option is None:
option = orjson.OPT_INDENT_2
else:
option |= orjson.OPT_INDENT_2
if default is None:
return orjson.dumps(v, option=option).decode()
return orjson.dumps(v, default=default, option=option).decode()
class SQLModelSerializable(SQLModel):

View file

@ -1,7 +1,6 @@
from langchain import LLMChain
from langchain.agents import AgentExecutor, ZeroShotAgent
from langchain.agents.agent_toolkits.json.prompt import JSON_PREFIX, JSON_SUFFIX
from langchain.agents.agent_toolkits.json.toolkit import JsonToolkit
from langchain.agents.agent_toolkits.json.prompt import JSON_SUFFIX
from langchain.agents.mrkl.prompt import FORMAT_INSTRUCTIONS
from langchain.base_language import BaseLanguageModel

View file

@ -1,4 +1,4 @@
import json
import orjson
from typing import Any, Callable, Dict, Sequence, Type
from langchain.agents import agent as agent_module
@ -66,7 +66,7 @@ def convert_kwargs(params):
for key in kwargs_keys:
if isinstance(params[key], str):
try:
params[key] = json.loads(params[key])
params[key] = orjson.loads(params[key])
except json.JSONDecodeError:
# if the string is not a valid json string, we will
# remove the key from the params
@ -306,7 +306,7 @@ def instantiate_documentloader(class_object: Type[BaseLoader], params: Dict):
metadata = params.pop("metadata", None)
if metadata and isinstance(metadata, str):
try:
metadata = json.loads(metadata)
metadata = orjson.loads(metadata)
except json.JSONDecodeError as exc:
raise ValueError(
"The metadata you provided is not a valid JSON string."

View file

@ -1,5 +1,7 @@
import contextlib
import json
from langflow.database.models.base import orjson_dumps
import orjson
from typing import Any, Dict, List
from langchain.agents import ZeroShotAgent
@ -95,9 +97,11 @@ def format_content(variable):
def try_to_load_json(content):
with contextlib.suppress(json.JSONDecodeError):
content = json.loads(content)
content = orjson.loads(content)
if isinstance(content, list):
content = ",".join([str(item) for item in content])
else:
content = orjson_dumps(content)
return content

View file

@ -1,4 +1,3 @@
import json
from typing import Any, Callable, Dict, Type
from langchain.vectorstores import (
Pinecone,
@ -92,7 +91,7 @@ def initialize_weaviate(class_object: Type[Weaviate], params: dict):
import weaviate # type: ignore
client_kwargs_json = params.get("client_kwargs", "{}")
client_kwargs = json.loads(client_kwargs_json)
client_kwargs = orjson.loads(client_kwargs_json)
client_params = {
"url": params.get("weaviate_url"),
}

View file

@ -190,17 +190,16 @@ def build_frontend_node(custom_component: CustomComponent):
def update_attributes(frontend_node, template_config):
"""Update the display name and description of a frontend node"""
if "display_name" in template_config:
frontend_node["display_name"] = template_config["display_name"]
if "description" in template_config:
frontend_node["description"] = template_config["description"]
if "beta" in template_config:
frontend_node["beta"] = template_config["beta"]
if "documentation" in template_config:
frontend_node["documentation"] = template_config["documentation"]
attributes = [
"display_name",
"description",
"beta",
"documentation",
"output_types",
]
for attribute in attributes:
if attribute in template_config:
frontend_node[attribute] = template_config[attribute]
def build_field_config(custom_component: CustomComponent):

View file

@ -1,6 +1,6 @@
import json
from pathlib import Path
from langchain.schema import AgentAction
import json
from langflow.interface.run import (
build_sorted_vertices_with_caching,
get_memory_key,

View file

@ -0,0 +1,222 @@
import contextlib
import orjson
import os
from shutil import copy2
from typing import Optional, List
from pathlib import Path
import yaml
from pydantic import BaseSettings, root_validator, validator
from langflow.utils.logger import logger
# BASE_COMPONENTS_PATH = str(Path(__file__).parent / "components")
BASE_COMPONENTS_PATH = str(Path(__file__).parent.parent.parent / "components")
class Settings(BaseSettings):
CHAINS: dict = {}
AGENTS: dict = {}
PROMPTS: dict = {}
LLMS: dict = {}
TOOLS: dict = {}
MEMORIES: dict = {}
EMBEDDINGS: dict = {}
VECTORSTORES: dict = {}
DOCUMENTLOADERS: dict = {}
WRAPPERS: dict = {}
RETRIEVERS: dict = {}
TOOLKITS: dict = {}
TEXTSPLITTERS: dict = {}
UTILITIES: dict = {}
OUTPUT_PARSERS: dict = {}
CUSTOM_COMPONENTS: dict = {}
# Define the default LANGFLOW_DIR
CONFIG_DIR: Optional[str] = None
DEV: bool = False
DATABASE_URL: Optional[str] = None
CACHE: str = "InMemoryCache"
REMOVE_API_KEYS: bool = False
COMPONENTS_PATH: List[str] = []
@validator("CONFIG_DIR", pre=True, allow_reuse=True)
def set_langflow_dir(cls, value):
if not value:
import appdirs
# Define the app name and author
app_name = "langflow"
app_author = "logspace"
# Get the cache directory for the application
cache_dir = appdirs.user_cache_dir(app_name, app_author)
# Create a .langflow directory inside the cache directory
value = Path(cache_dir)
value.mkdir(parents=True, exist_ok=True)
if isinstance(value, str):
value = Path(value)
if not value.exists():
value.mkdir(parents=True, exist_ok=True)
return str(value)
@validator("DATABASE_URL", pre=True)
def set_database_url(cls, value, values):
if not value:
logger.debug(
"No database_url provided, trying LANGFLOW_DATABASE_URL env variable"
)
if langflow_database_url := os.getenv("LANGFLOW_DATABASE_URL"):
value = langflow_database_url
logger.debug("Using LANGFLOW_DATABASE_URL env variable.")
else:
logger.debug("No DATABASE_URL env variable, using sqlite database")
# Originally, we used sqlite:///./langflow.db
# so we need to migrate to the new format
# if there is a database in that location
if not values["CONFIG_DIR"]:
raise ValueError(
"CONFIG_DIR not set, please set it or provide a DATABASE_URL"
)
new_path = f"{values['CONFIG_DIR']}/langflow.db"
if Path("./langflow.db").exists():
if Path(new_path).exists():
logger.debug(f"Database already exists at {new_path}, using it")
else:
try:
logger.debug("Copying existing database to new location")
copy2("./langflow.db", new_path)
logger.debug(f"Copied existing database to {new_path}")
except Exception:
logger.error("Failed to copy database, using default path")
new_path = "./langflow.db"
value = f"sqlite:///{new_path}"
return value
@validator("COMPONENTS_PATH", pre=True)
def set_components_path(cls, value):
if os.getenv("LANGFLOW_COMPONENTS_PATH"):
logger.debug("Adding LANGFLOW_COMPONENTS_PATH to components_path")
langflow_component_path = os.getenv("LANGFLOW_COMPONENTS_PATH")
if (
Path(langflow_component_path).exists()
and langflow_component_path not in value
):
if isinstance(langflow_component_path, list):
for path in langflow_component_path:
if path not in value:
value.append(path)
logger.debug(
f"Extending {langflow_component_path} to components_path"
)
elif langflow_component_path not in value:
value.append(langflow_component_path)
logger.debug(
f"Appending {langflow_component_path} to components_path"
)
if not value:
value = [BASE_COMPONENTS_PATH]
logger.debug("Setting default components path to components_path")
elif BASE_COMPONENTS_PATH not in value:
value.append(BASE_COMPONENTS_PATH)
logger.debug("Adding default components path to components_path")
logger.debug(f"Components path: {value}")
return value
class Config:
validate_assignment = True
extra = "ignore"
env_prefix = "LANGFLOW_"
@root_validator(allow_reuse=True)
def validate_lists(cls, values):
for key, value in values.items():
if key != "dev" and not value:
values[key] = []
return values
def update_from_yaml(self, file_path: str, dev: bool = False):
new_settings = load_settings_from_yaml(file_path)
self.CHAINS = new_settings.CHAINS or {}
self.AGENTS = new_settings.AGENTS or {}
self.PROMPTS = new_settings.PROMPTS or {}
self.LLMS = new_settings.LLMS or {}
self.TOOLS = new_settings.TOOLS or {}
self.MEMORIES = new_settings.MEMORIES or {}
self.WRAPPERS = new_settings.WRAPPERS or {}
self.TOOLKITS = new_settings.TOOLKITS or {}
self.TEXTSPLITTERS = new_settings.TEXTSPLITTERS or {}
self.UTILITIES = new_settings.UTILITIES or {}
self.EMBEDDINGS = new_settings.EMBEDDINGS or {}
self.VECTORSTORES = new_settings.VECTORSTORES or {}
self.DOCUMENTLOADERS = new_settings.DOCUMENTLOADERS or {}
self.RETRIEVERS = new_settings.RETRIEVERS or {}
self.OUTPUT_PARSERS = new_settings.OUTPUT_PARSERS or {}
self.CUSTOM_COMPONENTS = new_settings.CUSTOM_COMPONENTS or {}
self.COMPONENTS_PATH = new_settings.COMPONENTS_PATH or []
self.DEV = dev
def update_settings(self, **kwargs):
logger.debug("Updating settings")
for key, value in kwargs.items():
# value may contain sensitive information, so we don't want to log it
if not hasattr(self, key):
logger.debug(f"Key {key} not found in settings")
continue
logger.debug(f"Updating {key}")
if isinstance(getattr(self, key), list):
# value might be a '[something]' string
with contextlib.suppress(json.decoder.JSONDecodeError):
value = orjson.loads(str(value))
if isinstance(value, list):
for item in value:
if isinstance(item, Path):
item = str(item)
if item not in getattr(self, key):
getattr(self, key).append(item)
logger.debug(f"Extended {key}")
else:
if isinstance(value, Path):
value = str(value)
if value not in getattr(self, key):
getattr(self, key).append(value)
logger.debug(f"Appended {key}")
else:
setattr(self, key, value)
logger.debug(f"Updated {key}")
logger.debug(f"{key}: {getattr(self, key)}")
def save_settings_to_yaml(settings: Settings, file_path: str):
with open(file_path, "w") as f:
settings_dict = settings.dict()
yaml.dump(settings_dict, f)
def load_settings_from_yaml(file_path: str) -> Settings:
# Check if a string is a valid path or a file name
if "/" not in file_path:
# Get current path
current_path = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(current_path, file_path)
with open(file_path, "r") as f:
settings_dict = yaml.safe_load(f)
settings_dict = {k.upper(): v for k, v in settings_dict.items()}
for key in settings_dict:
if key not in Settings.__fields__.keys():
raise KeyError(f"Key {key} not found in settings")
logger.debug(f"Loading {len(settings_dict[key])} {key} from {file_path}")
return Settings(**settings_dict)

View file

@ -1,5 +1,5 @@
import contextlib
import json
import orjson
import os
from typing import Optional, List
from pathlib import Path
@ -126,7 +126,7 @@ class Settings(BaseSettings):
if isinstance(getattr(self, key), list):
# value might be a '[something]' string
with contextlib.suppress(json.decoder.JSONDecodeError):
value = json.loads(str(value))
value = orjson.loads(str(value))
if isinstance(value, list):
for item in value:
if isinstance(item, Path):

View file

@ -1,5 +1,5 @@
import json
from typing import Optional
from langflow.database.models.base import orjson_dumps
from langflow.template.field.base import TemplateField
from langflow.template.frontend_node.base import FrontendNode
@ -89,7 +89,7 @@ class LLMFrontendNode(FrontendNode):
if field.name == "config":
field.show = True
field.advanced = True
field.value = json.dumps(CTRANSFORMERS_DEFAULT_CONFIG, indent=2)
field.value = orjson_dumps(CTRANSFORMERS_DEFAULT_CONFIG, indent_2=True)
@staticmethod
def format_field(field: TemplateField, name: Optional[str] = None) -> None:

View file

@ -1,6 +1,6 @@
import ast
import json
from typing import Optional
from langflow.database.models.base import orjson_dumps
from langflow.template.field.base import TemplateField
from langflow.template.frontend_node.base import FrontendNode
@ -22,4 +22,4 @@ class UtilitiesFrontendNode(FrontendNode):
if isinstance(field.value, dict):
field.field_type = "code"
field.value = json.dumps(field.value, indent=4)
field.value = orjson_dumps(field.value)

View file

@ -1,4 +1,6 @@
import json
from langflow.database.models.base import orjson_dumps
import orjson
from langflow.graph import Graph
import pytest
@ -63,9 +65,9 @@ def test_cache_size_limit(basic_data_graph):
nodes = modified_data_graph["nodes"]
node_id = nodes[0]["id"]
# Now we replace all instances ode node_id with a new id in the json
json_string = json.dumps(modified_data_graph)
json_string = orjson_dumps(modified_data_graph)
modified_json_string = json_string.replace(node_id, f"{node_id}_{i}")
modified_data_graph_new_id = json.loads(modified_json_string)
modified_data_graph_new_id = orjson.loads(modified_json_string)
build_langchain_object_with_caching(modified_data_graph_new_id)
assert len(build_langchain_object_with_caching.cache) == 10

View file

@ -1,11 +1,12 @@
import json
from fastapi.encoders import jsonable_encoder
from langflow.database.models.base import orjson_dumps
import orjson
import pytest
from uuid import UUID, uuid4
from sqlalchemy.orm import Session
from fastapi.testclient import TestClient
from fastapi.encoders import jsonable_encoder
from langflow.api.v1.schemas import FlowListCreate
from langflow.database.models.flow import Flow, FlowCreate, FlowUpdate
@ -23,7 +24,7 @@ def json_style():
# color: str = Field(index=True)
# emoji: str = Field(index=False)
# flow_id: UUID = Field(default=None, foreign_key="flow.id")
return json.dumps(
return orjson_dumps(
{
"color": "red",
"emoji": "👍",
@ -32,7 +33,7 @@ def json_style():
def test_create_flow(client: TestClient, json_flow: str):
flow = json.loads(json_flow)
flow = orjson.loads(json_flow)
data = flow["data"]
flow = FlowCreate(name="Test Flow", description="description", data=data)
response = client.post("api/v1/flows/", json=flow.dict())
@ -48,7 +49,7 @@ def test_create_flow(client: TestClient, json_flow: str):
def test_read_flows(client: TestClient, json_flow: str):
flow_data = json.loads(json_flow)
flow_data = orjson.loads(json_flow)
data = flow_data["data"]
flow = FlowCreate(name="Test Flow", description="description", data=data)
response = client.post("api/v1/flows/", json=flow.dict())
@ -89,7 +90,7 @@ def test_read_flows(client: TestClient, json_flow: str):
def test_read_flow(client: TestClient, json_flow: str):
flow = json.loads(json_flow)
flow = orjson.loads(json_flow)
data = flow["data"]
flow = FlowCreate(name="Test Flow", description="description", data=data)
response = client.post("api/v1/flows/", json=flow.dict())
@ -115,7 +116,7 @@ def test_read_flow(client: TestClient, json_flow: str):
def test_update_flow(client: TestClient, json_flow: str):
flow = json.loads(json_flow)
flow = orjson.loads(json_flow)
data = flow["data"]
flow = FlowCreate(name="Test Flow", description="description", data=data)
@ -136,7 +137,7 @@ def test_update_flow(client: TestClient, json_flow: str):
def test_delete_flow(client: TestClient, json_flow: str):
flow = json.loads(json_flow)
flow = orjson.loads(json_flow)
data = flow["data"]
flow = FlowCreate(name="Test Flow", description="description", data=data)
response = client.post("api/v1/flows/", json=flow.dict())
@ -147,7 +148,7 @@ def test_delete_flow(client: TestClient, json_flow: str):
def test_create_flows(client: TestClient, session: Session, json_flow: str):
flow = json.loads(json_flow)
flow = orjson.loads(json_flow)
data = flow["data"]
# Create test data
flow_list = FlowListCreate(
@ -172,7 +173,7 @@ def test_create_flows(client: TestClient, session: Session, json_flow: str):
def test_upload_file(client: TestClient, session: Session, json_flow: str):
flow = json.loads(json_flow)
flow = orjson.loads(json_flow)
data = flow["data"]
# Create test data
flow_list = FlowListCreate(
@ -181,7 +182,7 @@ def test_upload_file(client: TestClient, session: Session, json_flow: str):
FlowCreate(name="Flow 2", description="description", data=data),
]
)
file_contents = json.dumps(flow_list.dict())
file_contents = orjson_dumps(flow_list.dict())
response = client.post(
"api/v1/flows/upload/",
files={"file": ("examples.json", file_contents, "application/json")},
@ -200,7 +201,7 @@ def test_upload_file(client: TestClient, session: Session, json_flow: str):
def test_download_file(client: TestClient, session: Session, json_flow):
flow = json.loads(json_flow)
flow = orjson.loads(json_flow)
data = flow["data"]
# Create test data
flow_list = FlowListCreate(
@ -241,7 +242,7 @@ def test_get_nonexistent_flow(client: TestClient):
def test_update_flow_idempotency(client: TestClient, json_flow: str):
flow_data = json.loads(json_flow)
flow_data = orjson.loads(json_flow)
data = flow_data["data"]
flow_data = FlowCreate(name="Test Flow", description="description", data=data)
response = client.post("api/v1/flows/", json=flow_data.dict())
@ -253,7 +254,7 @@ def test_update_flow_idempotency(client: TestClient, json_flow: str):
def test_update_nonexistent_flow(client: TestClient, json_flow: str):
flow_data = json.loads(json_flow)
flow_data = orjson.loads(json_flow)
data = flow_data["data"]
uuid = uuid4()
updated_flow = FlowCreate(

View file

@ -1,5 +1,4 @@
import json
import pytest
from langchain.chains.base import Chain
from langflow.processing.process import load_flow_from_json