Merge branch 'dev' into chain_loader

This commit is contained in:
Ibis Prevedello 2023-04-03 16:41:05 -03:00
commit ed3acfd15f
9 changed files with 184 additions and 61 deletions

View file

@ -1,4 +1,3 @@
import logging
import multiprocessing
import platform
from pathlib import Path
@ -8,8 +7,9 @@ from fastapi.staticfiles import StaticFiles
from langflow.main import create_app
from langflow.settings import settings
from langflow.utils.logger import configure, logger
logger = logging.getLogger(__name__)
app = typer.Typer()
def get_number_of_workers(workers=None):
@ -24,14 +24,21 @@ def update_settings(config: str):
settings.update_from_yaml(config)
@app.command()
def serve(
host: str = "127.0.0.1",
workers: int = 1,
timeout: int = 60,
port: int = 7860,
config: str = "config.yaml",
log_level: str = "info",
host: str = typer.Option("127.0.0.1", help="Host to bind the server to."),
workers: int = typer.Option(1, help="Number of worker processes."),
timeout: int = typer.Option(60, help="Worker timeout in seconds."),
port: int = typer.Option(7860, help="Port to listen on."),
config: str = typer.Option("config.yaml", help="Path to the configuration file."),
log_level: str = typer.Option("info", help="Logging level."),
log_file: Path = typer.Option("logs/langflow.log", help="Path to the log file."),
):
"""
Run the Langflow server.
"""
configure(log_level=log_level, log_file=log_file)
update_settings(config)
app = create_app()
# get the directory of the current file
@ -52,7 +59,7 @@ def serve(
if platform.system() in ["Darwin", "Windows"]:
# Run using uvicorn on MacOS and Windows
# Windows doesn't support gunicorn
# MacOS requires a env variable to be set to use gunicorn
# MacOS requires an env variable to be set to use gunicorn
import uvicorn
uvicorn.run(app, host=host, port=port, log_level=log_level)
@ -63,7 +70,7 @@ def serve(
def main():
typer.run(serve)
app()
if __name__ == "__main__":

View file

@ -10,7 +10,7 @@ import dill # type: ignore
PREFIX = "langflow_cache"
def clear_old_cache_files(max_cache_size: int = 10):
def clear_old_cache_files(max_cache_size: int = 3):
cache_dir = Path(tempfile.gettempdir())
cache_files = list(cache_dir.glob(f"{PREFIX}_*.dill"))
@ -24,23 +24,45 @@ def clear_old_cache_files(max_cache_size: int = 10):
os.remove(cache_file)
def remove_position_info(node):
node.pop("position", None)
def filter_json(json_data):
filtered_data = json_data.copy()
# Remove 'viewport' and 'chatHistory' keys
if "viewport" in filtered_data:
del filtered_data["viewport"]
if "chatHistory" in filtered_data:
del filtered_data["chatHistory"]
# Filter nodes
if "nodes" in filtered_data:
for node in filtered_data["nodes"]:
if "position" in node:
del node["position"]
if "positionAbsolute" in node:
del node["positionAbsolute"]
if "selected" in node:
del node["selected"]
if "dragging" in node:
del node["dragging"]
return filtered_data
def compute_hash(graph_data):
for node in graph_data["nodes"]:
remove_position_info(node)
graph_data = filter_json(graph_data)
cleaned_graph_json = json.dumps(graph_data, sort_keys=True)
return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest()
def save_cache(hash_val, chat_data):
def save_cache(hash_val: str, chat_data, clean_old_cache_files: bool):
cache_path = Path(tempfile.gettempdir()) / f"{PREFIX}_{hash_val}.dill"
with cache_path.open("wb") as cache_file:
dill.dump(chat_data, cache_file)
if clean_old_cache_files:
clear_old_cache_files()
def load_cache(hash_val):
cache_path = Path(tempfile.gettempdir()) / f"{PREFIX}_{hash_val}.dill"

View file

@ -3,7 +3,6 @@
# - Defer prompts building to the last moment or when they have all the tools
# - Build each inner agent first, then build the outer agent
import logging
import types
from copy import deepcopy
from typing import Any, Dict, List
@ -12,8 +11,7 @@ from langflow.graph.constants import DIRECT_TYPES
from langflow.graph.utils import load_file
from langflow.interface import loading
from langflow.interface.listing import ALL_TYPES_DICT
logger = logging.getLogger(__name__)
from langflow.utils.logger import logger
class Node:

View file

@ -108,7 +108,13 @@ class InitializeAgent(AgentExecutor):
def initialize(
cls, llm: BaseLLM, tools: List[Tool], agent: str, memory: BaseChatMemory
):
return initialize_agent(tools=tools, llm=llm, agent=agent, memory=memory)
return initialize_agent(
tools=tools,
llm=llm,
agent=agent,
memory=memory,
return_intermediate_steps=True,
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

View file

@ -1,7 +1,15 @@
from typing import Any
## LLM
from langchain import chains, document_loaders, embeddings, llms, memory, requests, vectorstores
from langchain import (
chains,
document_loaders,
embeddings,
llms,
memory,
requests,
vectorstores,
)
from langchain.agents import agent_toolkits
from langchain.chat_models import ChatOpenAI

View file

@ -71,9 +71,10 @@ def load_flow_from_json(path: str):
data_graph = flow_graph["data"]
nodes = data_graph["nodes"]
# Substitute ZeroShotPrompt with PromptTemplate
nodes = replace_zero_shot_prompt_with_prompt_template(nodes)
# nodes = replace_zero_shot_prompt_with_prompt_template(nodes)
# Add input variables
nodes = payload.extract_input_variables(nodes)
# nodes = payload.extract_input_variables(nodes)
# Nodes, edges and root node
edges = data_graph["edges"]
graph = Graph(nodes, edges)

View file

@ -2,34 +2,45 @@ import contextlib
import io
import logging
import re
from typing import Any, Dict
from typing import Any, Dict, List, Tuple
from langflow.cache.utils import compute_hash, load_cache, save_cache
from langflow.graph.graph import Graph
from langflow.interface import loading
from langflow.utils import payload
logger = logging.getLogger(__name__)
from langflow.utils.logger import logger
def load_langchain_object(data_graph):
def load_langchain_object(data_graph, is_first_message=False):
"""
Load langchain object from cache if it exists, otherwise build it.
"""
computed_hash = compute_hash(data_graph)
# Load langchain_object from cache if it exists
langchain_object = load_cache(computed_hash)
if langchain_object is None:
nodes = data_graph["nodes"]
# Add input variables
nodes = payload.extract_input_variables(nodes)
# Nodes, edges and root node
edges = data_graph["edges"]
graph = Graph(nodes, edges)
langchain_object = graph.build()
if is_first_message:
langchain_object = build_langchain_object(data_graph)
else:
logger.debug("Loading langchain object from cache")
langchain_object = load_cache(computed_hash)
return computed_hash, langchain_object
def build_langchain_object(data_graph):
"""
Build langchain object from data_graph.
"""
logger.debug("Building langchain object")
nodes = data_graph["nodes"]
# Add input variables
# nodes = payload.extract_input_variables(nodes)
# Nodes, edges and root node
edges = data_graph["edges"]
graph = Graph(nodes, edges)
return graph.build()
def process_graph(data_graph: Dict[str, Any]):
"""
Process graph by extracting input variables and replacing ZeroShotPrompt
@ -38,7 +49,10 @@ def process_graph(data_graph: Dict[str, Any]):
# Load langchain object
logger.debug("Loading langchain object")
message = data_graph.pop("message", "")
computed_hash, langchain_object = load_langchain_object(data_graph)
is_first_message = len(data_graph.get("chatHistory", [])) == 0
computed_hash, langchain_object = load_langchain_object(
data_graph, is_first_message
)
logger.debug("Loaded langchain object")
# Generate result and thought
@ -50,7 +64,7 @@ def process_graph(data_graph: Dict[str, Any]):
# We have to save it here because if the
# memory is updated we need to keep the new values
logger.debug("Saving langchain object to cache")
save_cache(computed_hash, langchain_object)
save_cache(computed_hash, langchain_object, is_first_message)
logger.debug("Saved langchain object to cache")
return {
"result": str(result),
@ -62,27 +76,39 @@ def process_graph(data_graph: Dict[str, Any]):
def get_result_and_thought_using_graph(loaded_langchain, message: str):
"""Get result and thought from extracted json"""
loaded_langchain.verbose = True
try:
loaded_langchain.verbose = True
with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer):
chat_input = {}
chat_input = None
for key in loaded_langchain.input_keys:
if key == "chat_history":
if hasattr(loaded_langchain, "memory"):
loaded_langchain.memory.memory_key = "chat_history"
if key == "chat_history" and hasattr(loaded_langchain, "memory"):
loaded_langchain.memory.memory_key = "chat_history"
else:
chat_input[key] = message
chat_input = {key: message}
if hasattr(loaded_langchain, "run"):
loaded_langchain = loaded_langchain.run
result = loaded_langchain(**chat_input)
if hasattr(loaded_langchain, "return_intermediate_steps"):
# https://github.com/hwchase17/langchain/issues/2068
loaded_langchain.return_intermediate_steps = False
try:
output = loaded_langchain(chat_input)
except ValueError as exc:
logger.debug("Error: %s", str(exc))
output = loaded_langchain.run(chat_input)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []
)
result = (
result.get(loaded_langchain.output_keys[0])
if isinstance(result, dict)
else result
output.get(loaded_langchain.output_keys[0])
if isinstance(output, dict)
else output
)
thought = output_buffer.getvalue()
if intermediate_steps:
thought = format_intermediate_steps(intermediate_steps)
else:
thought = output_buffer.getvalue()
except Exception as exc:
raise ValueError(f"Error: {str(exc)}") from exc
@ -96,15 +122,40 @@ def get_result_and_thought(extracted_json: Dict[str, Any], message: str):
config=extracted_json
)
with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer):
result = loaded_langchain(message)
result = (
result.get(loaded_langchain.output_keys[0])
if isinstance(result, dict)
else result
output = loaded_langchain(message)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []
)
thought = output_buffer.getvalue()
result = (
output.get(loaded_langchain.output_keys[0])
if isinstance(output, dict)
else output
)
if intermediate_steps:
thought = format_intermediate_steps(intermediate_steps)
else:
thought = output_buffer.getvalue()
except Exception as e:
result = f"Error: {str(e)}"
thought = ""
return result, thought
def format_intermediate_steps(intermediate_steps):
formatted_chain = "> Entering new AgentExecutor chain...\n"
for step in intermediate_steps:
action = step[0]
observation = step[1]
formatted_chain += (
f" {action.log}\nAction: {action.tool}\nAction Input: {action.tool_input}\n"
)
formatted_chain += f"Observation: {observation}\n"
final_answer = f"Final Answer: {observation}\n"
formatted_chain += f"Thought: I now know the final answer\n{final_answer}\n"
formatted_chain += "> Finished chain.\n"
return formatted_chain

View file

@ -23,7 +23,7 @@ def get_type_list():
return all_types
def build_langchain_types_dict():
def build_langchain_types_dict(): # sourcery skip: dict-assign-update-to-union
"""Build a dictionary of all langchain types"""
all_types = {}

View file

@ -0,0 +1,30 @@
import logging
from pathlib import Path
from rich.logging import RichHandler
logger = logging.getLogger("langflow")
def configure(log_level: str = "INFO", log_file: Path = None): # type: ignore
log_format = "%(asctime)s - %(levelname)s - %(message)s"
log_level_value = getattr(logging, log_level.upper(), logging.INFO)
logging.basicConfig(
level=log_level_value,
format=log_format,
datefmt="[%X]",
handlers=[RichHandler(rich_tracebacks=True)],
)
if log_file:
log_file = Path(log_file)
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(file_handler)
logger.info(f"Logger set up with log level: {log_level_value}({log_level})")
if log_file:
logger.info(f"Log file: {log_file}")