diff --git a/src/backend/langflow/__main__.py b/src/backend/langflow/__main__.py index 030a41ed0..312285a68 100644 --- a/src/backend/langflow/__main__.py +++ b/src/backend/langflow/__main__.py @@ -1,4 +1,3 @@ -import logging import multiprocessing import platform from pathlib import Path @@ -8,8 +7,9 @@ from fastapi.staticfiles import StaticFiles from langflow.main import create_app from langflow.settings import settings +from langflow.utils.logger import configure, logger -logger = logging.getLogger(__name__) +app = typer.Typer() def get_number_of_workers(workers=None): @@ -24,14 +24,21 @@ def update_settings(config: str): settings.update_from_yaml(config) +@app.command() def serve( - host: str = "127.0.0.1", - workers: int = 1, - timeout: int = 60, - port: int = 7860, - config: str = "config.yaml", - log_level: str = "info", + host: str = typer.Option("127.0.0.1", help="Host to bind the server to."), + workers: int = typer.Option(1, help="Number of worker processes."), + timeout: int = typer.Option(60, help="Worker timeout in seconds."), + port: int = typer.Option(7860, help="Port to listen on."), + config: str = typer.Option("config.yaml", help="Path to the configuration file."), + log_level: str = typer.Option("info", help="Logging level."), + log_file: Path = typer.Option("logs/langflow.log", help="Path to the log file."), ): + """ + Run the Langflow server. + """ + + configure(log_level=log_level, log_file=log_file) update_settings(config) app = create_app() # get the directory of the current file @@ -52,7 +59,7 @@ def serve( if platform.system() in ["Darwin", "Windows"]: # Run using uvicorn on MacOS and Windows # Windows doesn't support gunicorn - # MacOS requires a env variable to be set to use gunicorn + # MacOS requires an env variable to be set to use gunicorn import uvicorn uvicorn.run(app, host=host, port=port, log_level=log_level) @@ -63,7 +70,7 @@ def serve( def main(): - typer.run(serve) + app() if __name__ == "__main__": diff --git a/src/backend/langflow/cache/utils.py b/src/backend/langflow/cache/utils.py index 514d991e5..3c416f4d7 100644 --- a/src/backend/langflow/cache/utils.py +++ b/src/backend/langflow/cache/utils.py @@ -10,7 +10,7 @@ import dill # type: ignore PREFIX = "langflow_cache" -def clear_old_cache_files(max_cache_size: int = 10): +def clear_old_cache_files(max_cache_size: int = 3): cache_dir = Path(tempfile.gettempdir()) cache_files = list(cache_dir.glob(f"{PREFIX}_*.dill")) @@ -24,23 +24,45 @@ def clear_old_cache_files(max_cache_size: int = 10): os.remove(cache_file) -def remove_position_info(node): - node.pop("position", None) +def filter_json(json_data): + filtered_data = json_data.copy() + + # Remove 'viewport' and 'chatHistory' keys + if "viewport" in filtered_data: + del filtered_data["viewport"] + if "chatHistory" in filtered_data: + del filtered_data["chatHistory"] + + # Filter nodes + if "nodes" in filtered_data: + for node in filtered_data["nodes"]: + if "position" in node: + del node["position"] + if "positionAbsolute" in node: + del node["positionAbsolute"] + if "selected" in node: + del node["selected"] + if "dragging" in node: + del node["dragging"] + + return filtered_data def compute_hash(graph_data): - for node in graph_data["nodes"]: - remove_position_info(node) + graph_data = filter_json(graph_data) cleaned_graph_json = json.dumps(graph_data, sort_keys=True) return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest() -def save_cache(hash_val, chat_data): +def save_cache(hash_val: str, chat_data, clean_old_cache_files: bool): cache_path = Path(tempfile.gettempdir()) / f"{PREFIX}_{hash_val}.dill" with cache_path.open("wb") as cache_file: dill.dump(chat_data, cache_file) + if clean_old_cache_files: + clear_old_cache_files() + def load_cache(hash_val): cache_path = Path(tempfile.gettempdir()) / f"{PREFIX}_{hash_val}.dill" diff --git a/src/backend/langflow/graph/base.py b/src/backend/langflow/graph/base.py index ded74ee91..837dd4735 100644 --- a/src/backend/langflow/graph/base.py +++ b/src/backend/langflow/graph/base.py @@ -3,7 +3,6 @@ # - Defer prompts building to the last moment or when they have all the tools # - Build each inner agent first, then build the outer agent -import logging import types from copy import deepcopy from typing import Any, Dict, List @@ -12,8 +11,7 @@ from langflow.graph.constants import DIRECT_TYPES from langflow.graph.utils import load_file from langflow.interface import loading from langflow.interface.listing import ALL_TYPES_DICT - -logger = logging.getLogger(__name__) +from langflow.utils.logger import logger class Node: diff --git a/src/backend/langflow/interface/agents/custom.py b/src/backend/langflow/interface/agents/custom.py index 64930b28a..ad9f6c918 100644 --- a/src/backend/langflow/interface/agents/custom.py +++ b/src/backend/langflow/interface/agents/custom.py @@ -108,7 +108,13 @@ class InitializeAgent(AgentExecutor): def initialize( cls, llm: BaseLLM, tools: List[Tool], agent: str, memory: BaseChatMemory ): - return initialize_agent(tools=tools, llm=llm, agent=agent, memory=memory) + return initialize_agent( + tools=tools, + llm=llm, + agent=agent, + memory=memory, + return_intermediate_steps=True, + ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/src/backend/langflow/interface/custom_lists.py b/src/backend/langflow/interface/custom_lists.py index e813a5830..746c58325 100644 --- a/src/backend/langflow/interface/custom_lists.py +++ b/src/backend/langflow/interface/custom_lists.py @@ -1,7 +1,15 @@ from typing import Any ## LLM -from langchain import chains, document_loaders, embeddings, llms, memory, requests, vectorstores +from langchain import ( + chains, + document_loaders, + embeddings, + llms, + memory, + requests, + vectorstores, +) from langchain.agents import agent_toolkits from langchain.chat_models import ChatOpenAI diff --git a/src/backend/langflow/interface/loading.py b/src/backend/langflow/interface/loading.py index 98506f73d..aca826502 100644 --- a/src/backend/langflow/interface/loading.py +++ b/src/backend/langflow/interface/loading.py @@ -71,9 +71,10 @@ def load_flow_from_json(path: str): data_graph = flow_graph["data"] nodes = data_graph["nodes"] # Substitute ZeroShotPrompt with PromptTemplate - nodes = replace_zero_shot_prompt_with_prompt_template(nodes) + # nodes = replace_zero_shot_prompt_with_prompt_template(nodes) # Add input variables - nodes = payload.extract_input_variables(nodes) + # nodes = payload.extract_input_variables(nodes) + # Nodes, edges and root node edges = data_graph["edges"] graph = Graph(nodes, edges) diff --git a/src/backend/langflow/interface/run.py b/src/backend/langflow/interface/run.py index 33183fb73..7c375471a 100644 --- a/src/backend/langflow/interface/run.py +++ b/src/backend/langflow/interface/run.py @@ -2,34 +2,45 @@ import contextlib import io import logging import re -from typing import Any, Dict +from typing import Any, Dict, List, Tuple from langflow.cache.utils import compute_hash, load_cache, save_cache from langflow.graph.graph import Graph from langflow.interface import loading from langflow.utils import payload - -logger = logging.getLogger(__name__) +from langflow.utils.logger import logger -def load_langchain_object(data_graph): +def load_langchain_object(data_graph, is_first_message=False): + """ + Load langchain object from cache if it exists, otherwise build it. + """ computed_hash = compute_hash(data_graph) - - # Load langchain_object from cache if it exists - langchain_object = load_cache(computed_hash) - if langchain_object is None: - nodes = data_graph["nodes"] - # Add input variables - nodes = payload.extract_input_variables(nodes) - # Nodes, edges and root node - edges = data_graph["edges"] - graph = Graph(nodes, edges) - - langchain_object = graph.build() + if is_first_message: + langchain_object = build_langchain_object(data_graph) + else: + logger.debug("Loading langchain object from cache") + langchain_object = load_cache(computed_hash) return computed_hash, langchain_object +def build_langchain_object(data_graph): + """ + Build langchain object from data_graph. + """ + + logger.debug("Building langchain object") + nodes = data_graph["nodes"] + # Add input variables + # nodes = payload.extract_input_variables(nodes) + # Nodes, edges and root node + edges = data_graph["edges"] + graph = Graph(nodes, edges) + + return graph.build() + + def process_graph(data_graph: Dict[str, Any]): """ Process graph by extracting input variables and replacing ZeroShotPrompt @@ -38,7 +49,10 @@ def process_graph(data_graph: Dict[str, Any]): # Load langchain object logger.debug("Loading langchain object") message = data_graph.pop("message", "") - computed_hash, langchain_object = load_langchain_object(data_graph) + is_first_message = len(data_graph.get("chatHistory", [])) == 0 + computed_hash, langchain_object = load_langchain_object( + data_graph, is_first_message + ) logger.debug("Loaded langchain object") # Generate result and thought @@ -50,7 +64,7 @@ def process_graph(data_graph: Dict[str, Any]): # We have to save it here because if the # memory is updated we need to keep the new values logger.debug("Saving langchain object to cache") - save_cache(computed_hash, langchain_object) + save_cache(computed_hash, langchain_object, is_first_message) logger.debug("Saved langchain object to cache") return { "result": str(result), @@ -62,27 +76,39 @@ def process_graph(data_graph: Dict[str, Any]): def get_result_and_thought_using_graph(loaded_langchain, message: str): """Get result and thought from extracted json""" - loaded_langchain.verbose = True try: + loaded_langchain.verbose = True with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer): - chat_input = {} + chat_input = None for key in loaded_langchain.input_keys: - if key == "chat_history": - if hasattr(loaded_langchain, "memory"): - loaded_langchain.memory.memory_key = "chat_history" + if key == "chat_history" and hasattr(loaded_langchain, "memory"): + loaded_langchain.memory.memory_key = "chat_history" else: - chat_input[key] = message + chat_input = {key: message} - if hasattr(loaded_langchain, "run"): - loaded_langchain = loaded_langchain.run - result = loaded_langchain(**chat_input) + if hasattr(loaded_langchain, "return_intermediate_steps"): + # https://github.com/hwchase17/langchain/issues/2068 + loaded_langchain.return_intermediate_steps = False + + try: + output = loaded_langchain(chat_input) + except ValueError as exc: + logger.debug("Error: %s", str(exc)) + output = loaded_langchain.run(chat_input) + + intermediate_steps = ( + output.get("intermediate_steps", []) if isinstance(output, dict) else [] + ) result = ( - result.get(loaded_langchain.output_keys[0]) - if isinstance(result, dict) - else result + output.get(loaded_langchain.output_keys[0]) + if isinstance(output, dict) + else output ) - thought = output_buffer.getvalue() + if intermediate_steps: + thought = format_intermediate_steps(intermediate_steps) + else: + thought = output_buffer.getvalue() except Exception as exc: raise ValueError(f"Error: {str(exc)}") from exc @@ -96,15 +122,40 @@ def get_result_and_thought(extracted_json: Dict[str, Any], message: str): config=extracted_json ) with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer): - result = loaded_langchain(message) - result = ( - result.get(loaded_langchain.output_keys[0]) - if isinstance(result, dict) - else result + output = loaded_langchain(message) + intermediate_steps = ( + output.get("intermediate_steps", []) if isinstance(output, dict) else [] ) - thought = output_buffer.getvalue() + result = ( + output.get(loaded_langchain.output_keys[0]) + if isinstance(output, dict) + else output + ) + + if intermediate_steps: + thought = format_intermediate_steps(intermediate_steps) + else: + thought = output_buffer.getvalue() except Exception as e: result = f"Error: {str(e)}" thought = "" return result, thought + + +def format_intermediate_steps(intermediate_steps): + formatted_chain = "> Entering new AgentExecutor chain...\n" + for step in intermediate_steps: + action = step[0] + observation = step[1] + + formatted_chain += ( + f" {action.log}\nAction: {action.tool}\nAction Input: {action.tool_input}\n" + ) + formatted_chain += f"Observation: {observation}\n" + + final_answer = f"Final Answer: {observation}\n" + formatted_chain += f"Thought: I now know the final answer\n{final_answer}\n" + formatted_chain += "> Finished chain.\n" + + return formatted_chain diff --git a/src/backend/langflow/interface/types.py b/src/backend/langflow/interface/types.py index 307d430f0..bf3cea372 100644 --- a/src/backend/langflow/interface/types.py +++ b/src/backend/langflow/interface/types.py @@ -23,7 +23,7 @@ def get_type_list(): return all_types -def build_langchain_types_dict(): +def build_langchain_types_dict(): # sourcery skip: dict-assign-update-to-union """Build a dictionary of all langchain types""" all_types = {} diff --git a/src/backend/langflow/utils/logger.py b/src/backend/langflow/utils/logger.py new file mode 100644 index 000000000..b70a451d4 --- /dev/null +++ b/src/backend/langflow/utils/logger.py @@ -0,0 +1,30 @@ +import logging +from pathlib import Path + +from rich.logging import RichHandler + +logger = logging.getLogger("langflow") + + +def configure(log_level: str = "INFO", log_file: Path = None): # type: ignore + log_format = "%(asctime)s - %(levelname)s - %(message)s" + log_level_value = getattr(logging, log_level.upper(), logging.INFO) + + logging.basicConfig( + level=log_level_value, + format=log_format, + datefmt="[%X]", + handlers=[RichHandler(rich_tracebacks=True)], + ) + + if log_file: + log_file = Path(log_file) + log_file.parent.mkdir(parents=True, exist_ok=True) + + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(logging.Formatter(log_format)) + logger.addHandler(file_handler) + + logger.info(f"Logger set up with log level: {log_level_value}({log_level})") + if log_file: + logger.info(f"Log file: {log_file}")