From 228f938cd8371ab5ecd5aa2d5b68622ad40ab03b Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 6 Jun 2023 10:05:46 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A8=20refactor(types.py):=20move=20ext?= =?UTF-8?q?ract=5Finput=5Fvariables=5Ffrom=5Fprompt=20import=20to=20interf?= =?UTF-8?q?ace.utils=20module=20=F0=9F=94=A8=20refactor(custom.py,=20loadi?= =?UTF-8?q?ng.py,=20prompts/custom.py,=20run.py):=20update=20import=20stat?= =?UTF-8?q?ements=20to=20use=20extract=5Finput=5Fvariables=5Ffrom=5Fprompt?= =?UTF-8?q?=20from=20interface.utils=20module=20=F0=9F=94=A8=20refactor(ru?= =?UTF-8?q?n.py):=20remove=20unused=20imports=20and=20functions=20?= =?UTF-8?q?=F0=9F=94=A8=20refactor(utils.py):=20add=20type=20hinting=20to?= =?UTF-8?q?=20extract=5Finput=5Fvariables=5Ffrom=5Fprompt=20function=20and?= =?UTF-8?q?=20remove=20unused=20imports=20The=20extract=5Finput=5Fvariable?= =?UTF-8?q?s=5Ffrom=5Fprompt=20function=20has=20been=20moved=20to=20the=20?= =?UTF-8?q?interface.utils=20module=20to=20improve=20code=20organization.?= =?UTF-8?q?=20The=20import=20statements=20in=20the=20affected=20modules=20?= =?UTF-8?q?have=20been=20updated=20to=20reflect=20this=20change.=20Unused?= =?UTF-8?q?=20imports=20and=20functions=20have=20been=20removed=20from=20t?= =?UTF-8?q?he=20run.py=20module.=20Type=20hinting=20has=20been=20added=20t?= =?UTF-8?q?o=20the=20extract=5Finput=5Fvariables=5Ffrom=5Fprompt=20functio?= =?UTF-8?q?n=20in=20the=20interface.utils=20module.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🚀 feat(processing): add processing module with get_result_and_steps and fix_memory_inputs functions The processing module was added to the project with two functions: get_result_and_steps and fix_memory_inputs. The get_result_and_steps function extracts the result and thought from a LangChain object and returns them. The fix_memory_inputs function checks if a LangChain object has a memory attribute and if that memory key exists in the object's input variables. If not, it gets a possible new memory key using the get_memory_key function and updates the memory keys using the update_memory_keys function. --- src/backend/langflow/graph/vertex/types.py | 3 +- .../langflow/interface/chains/custom.py | 2 +- src/backend/langflow/interface/loading.py | 33 --- .../langflow/interface/prompts/custom.py | 2 +- src/backend/langflow/interface/run.py | 191 +----------------- src/backend/langflow/interface/utils.py | 6 + src/backend/langflow/processing/__init__.py | 0 src/backend/langflow/processing/base.py | 55 +++++ src/backend/langflow/processing/process.py | 172 ++++++++++++++++ 9 files changed, 238 insertions(+), 226 deletions(-) create mode 100644 src/backend/langflow/processing/__init__.py create mode 100644 src/backend/langflow/processing/base.py create mode 100644 src/backend/langflow/processing/process.py diff --git a/src/backend/langflow/graph/vertex/types.py b/src/backend/langflow/graph/vertex/types.py index b81e72439..4eb20f416 100644 --- a/src/backend/langflow/graph/vertex/types.py +++ b/src/backend/langflow/graph/vertex/types.py @@ -1,7 +1,8 @@ from typing import Any, Dict, List, Optional, Union from langflow.graph.vertex.base import Vertex -from langflow.graph.utils import extract_input_variables_from_prompt, flatten_list +from langflow.graph.utils import flatten_list +from langflow.interface.utils import extract_input_variables_from_prompt class AgentVertex(Vertex): diff --git a/src/backend/langflow/interface/chains/custom.py b/src/backend/langflow/interface/chains/custom.py index cb76a53c8..ba4ba8b62 100644 --- a/src/backend/langflow/interface/chains/custom.py +++ b/src/backend/langflow/interface/chains/custom.py @@ -5,7 +5,7 @@ from langchain.memory.buffer import ConversationBufferMemory from langchain.schema import BaseMemory from pydantic import Field, root_validator -from langflow.graph.utils import extract_input_variables_from_prompt +from langflow.interface.utils import extract_input_variables_from_prompt DEFAULT_SUFFIX = """" Current conversation: diff --git a/src/backend/langflow/interface/loading.py b/src/backend/langflow/interface/loading.py index 16a7b186c..eb4623f5a 100644 --- a/src/backend/langflow/interface/loading.py +++ b/src/backend/langflow/interface/loading.py @@ -12,7 +12,6 @@ from langchain.agents.load_tools import ( _LLM_TOOLS, ) from langchain.agents.loading import load_agent_from_config -from langflow.graph import Graph from langchain.agents.tools import Tool from langchain.base_language import BaseLanguageModel from langchain.callbacks.base import BaseCallbackManager @@ -22,7 +21,6 @@ from pydantic import ValidationError from langflow.interface.agents.custom import CUSTOM_AGENTS from langflow.interface.importing.utils import get_function, import_by_type -from langflow.interface.run import fix_memory_inputs from langflow.interface.toolkits.base import toolkits_creator from langflow.interface.types import get_type_list from langflow.interface.utils import load_file_into_dict @@ -163,37 +161,6 @@ def instantiate_utility(node_type, class_object, params): return class_object(**params) -def load_flow_from_json(path: str, build=True): - """Load flow from json file""" - # This is done to avoid circular imports - - with open(path, "r", encoding="utf-8") as f: - flow_graph = json.load(f) - data_graph = flow_graph["data"] - nodes = data_graph["nodes"] - # Substitute ZeroShotPrompt with PromptTemplate - # nodes = replace_zero_shot_prompt_with_prompt_template(nodes) - # Add input variables - # nodes = payload.extract_input_variables(nodes) - - # Nodes, edges and root node - edges = data_graph["edges"] - graph = Graph(nodes, edges) - if build: - langchain_object = graph.build() - if hasattr(langchain_object, "verbose"): - langchain_object.verbose = True - - if hasattr(langchain_object, "return_intermediate_steps"): - # https://github.com/hwchase17/langchain/issues/2068 - # Deactivating until we have a frontend solution - # to display intermediate steps - langchain_object.return_intermediate_steps = False - fix_memory_inputs(langchain_object) - return langchain_object - return graph - - def replace_zero_shot_prompt_with_prompt_template(nodes): """Replace ZeroShotPrompt with PromptTemplate""" for node in nodes: diff --git a/src/backend/langflow/interface/prompts/custom.py b/src/backend/langflow/interface/prompts/custom.py index b1dbef370..286210271 100644 --- a/src/backend/langflow/interface/prompts/custom.py +++ b/src/backend/langflow/interface/prompts/custom.py @@ -3,7 +3,7 @@ from typing import Dict, List, Optional, Type from langchain.prompts import PromptTemplate from pydantic import root_validator -from langflow.graph.utils import extract_input_variables_from_prompt +from langflow.interface.utils import extract_input_variables_from_prompt # Steps to create a BaseCustomPrompt: # 1. Create a prompt template that endes with: diff --git a/src/backend/langflow/interface/run.py b/src/backend/langflow/interface/run.py index c2483416f..89f71fd8b 100644 --- a/src/backend/langflow/interface/run.py +++ b/src/backend/langflow/interface/run.py @@ -1,10 +1,3 @@ -import contextlib -import io -from typing import Any, Dict, List, Tuple - -from langchain.schema import AgentAction - -from langflow.api.callback import AsyncStreamingLLMCallbackHandler, StreamingLLMCallbackHandler # type: ignore from langflow.cache.base import compute_dict_hash, load_cache, memoize_dict from langflow.graph import Graph from langflow.utils.logger import logger @@ -24,15 +17,6 @@ def load_langchain_object(data_graph, is_first_message=False): return computed_hash, langchain_object -def load_or_build_langchain_object(data_graph, is_first_message=False): - """ - Load langchain object from cache if it exists, otherwise build it. - """ - if is_first_message: - build_langchain_object_with_caching.clear_cache() - return build_langchain_object_with_caching(data_graph) - - @memoize_dict(maxsize=10) def build_langchain_object_with_caching(data_graph): """ @@ -40,16 +24,10 @@ def build_langchain_object_with_caching(data_graph): """ logger.debug("Building langchain object") - graph = build_graph(data_graph) + graph = Graph.from_payload(data_graph) return graph.build() -def build_graph(data_graph): - nodes = data_graph["nodes"] - edges = data_graph["edges"] - return Graph(nodes, edges) - - def build_langchain_object(data_graph): """ Build langchain object from data_graph. @@ -66,29 +44,6 @@ def build_langchain_object(data_graph): return graph.build() -def process_graph_cached(data_graph: Dict[str, Any], message: str): - """ - Process graph by extracting input variables and replacing ZeroShotPrompt - with PromptTemplate,then run the graph and return the result and thought. - """ - # Load langchain object - is_first_message = len(data_graph.get("chatHistory", [])) == 0 - langchain_object = load_or_build_langchain_object(data_graph, is_first_message) - logger.debug("Loaded langchain object") - - if langchain_object is None: - # Raise user facing error - raise ValueError( - "There was an error loading the langchain_object. Please, check all the nodes and try again." - ) - - # Generate result and thought - logger.debug("Generating result and thought") - result, thought = get_result_and_thought(langchain_object, message) - logger.debug("Generated result and thought") - return {"result": str(result), "thought": thought.strip()} - - def get_memory_key(langchain_object): """ Given a LangChain object, this function retrieves the current memory key from the object's memory attribute. @@ -124,147 +79,3 @@ def update_memory_keys(langchain_object, possible_new_mem_key): langchain_object.memory.input_key = input_key langchain_object.memory.output_key = output_key langchain_object.memory.memory_key = possible_new_mem_key - - -def fix_memory_inputs(langchain_object): - """ - Given a LangChain object, this function checks if it has a memory attribute and if that memory key exists in the - object's input variables. If so, it does nothing. Otherwise, it gets a possible new memory key using the - get_memory_key function and updates the memory keys using the update_memory_keys function. - """ - if hasattr(langchain_object, "memory") and langchain_object.memory is not None: - try: - if langchain_object.memory.memory_key in langchain_object.input_variables: - return - except AttributeError: - input_variables = ( - langchain_object.prompt.input_variables - if hasattr(langchain_object, "prompt") - else langchain_object.input_keys - ) - if langchain_object.memory.memory_key in input_variables: - return - - possible_new_mem_key = get_memory_key(langchain_object) - if possible_new_mem_key is not None: - update_memory_keys(langchain_object, possible_new_mem_key) - - -async def get_result_and_steps(langchain_object, message: str, **kwargs): - """Get result and thought from extracted json""" - - try: - if hasattr(langchain_object, "verbose"): - langchain_object.verbose = True - chat_input = None - memory_key = "" - if hasattr(langchain_object, "memory") and langchain_object.memory is not None: - memory_key = langchain_object.memory.memory_key - - if hasattr(langchain_object, "input_keys"): - for key in langchain_object.input_keys: - if key not in [memory_key, "chat_history"]: - chat_input = {key: message} - else: - chat_input = message # type: ignore - - if hasattr(langchain_object, "return_intermediate_steps"): - # https://github.com/hwchase17/langchain/issues/2068 - # Deactivating until we have a frontend solution - # to display intermediate steps - langchain_object.return_intermediate_steps = True - - fix_memory_inputs(langchain_object) - try: - async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)] - output = await langchain_object.acall(chat_input, callbacks=async_callbacks) - except Exception as exc: - # make the error message more informative - logger.debug(f"Error: {str(exc)}") - sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)] - output = langchain_object(chat_input, callbacks=sync_callbacks) - - intermediate_steps = ( - output.get("intermediate_steps", []) if isinstance(output, dict) else [] - ) - - result = ( - output.get(langchain_object.output_keys[0]) - if isinstance(output, dict) - else output - ) - thought = format_actions(intermediate_steps) if intermediate_steps else "" - except Exception as exc: - raise ValueError(f"Error: {str(exc)}") from exc - return result, thought - - -def get_result_and_thought(langchain_object, message: str): - """Get result and thought from extracted json""" - try: - if hasattr(langchain_object, "verbose"): - langchain_object.verbose = True - chat_input = None - memory_key = "" - if hasattr(langchain_object, "memory") and langchain_object.memory is not None: - memory_key = langchain_object.memory.memory_key - - if hasattr(langchain_object, "input_keys"): - for key in langchain_object.input_keys: - if key not in [memory_key, "chat_history"]: - chat_input = {key: message} - else: - chat_input = message # type: ignore - - if hasattr(langchain_object, "return_intermediate_steps"): - # https://github.com/hwchase17/langchain/issues/2068 - # Deactivating until we have a frontend solution - # to display intermediate steps - langchain_object.return_intermediate_steps = False - - fix_memory_inputs(langchain_object) - - with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer): - try: - # if hasattr(langchain_object, "acall"): - # output = await langchain_object.acall(chat_input) - # else: - output = langchain_object(chat_input) - except ValueError as exc: - # make the error message more informative - logger.debug(f"Error: {str(exc)}") - output = langchain_object.run(chat_input) - - intermediate_steps = ( - output.get("intermediate_steps", []) if isinstance(output, dict) else [] - ) - - result = ( - output.get(langchain_object.output_keys[0]) - if isinstance(output, dict) - else output - ) - if intermediate_steps: - thought = format_actions(intermediate_steps) - else: - thought = output_buffer.getvalue() - - except Exception as exc: - raise ValueError(f"Error: {str(exc)}") from exc - return result, thought - - -def format_actions(actions: List[Tuple[AgentAction, str]]) -> str: - """Format a list of (AgentAction, answer) tuples into a string.""" - output = [] - for action, answer in actions: - log = action.log - tool = action.tool - tool_input = action.tool_input - output.append(f"Log: {log}") - if "Action" not in log and "Action Input" not in log: - output.append(f"Tool: {tool}") - output.append(f"Tool Input: {tool_input}") - output.append(f"Answer: {answer}") - output.append("") # Add a blank line - return "\n".join(output) diff --git a/src/backend/langflow/interface/utils.py b/src/backend/langflow/interface/utils.py index 2b7c5acd1..32c605654 100644 --- a/src/backend/langflow/interface/utils.py +++ b/src/backend/langflow/interface/utils.py @@ -2,6 +2,7 @@ import base64 import json import os from io import BytesIO +import re import yaml from langchain.base_language import BaseLanguageModel @@ -48,3 +49,8 @@ def try_setting_streaming_options(langchain_object, websocket): llm.streaming = True return langchain_object + + +def extract_input_variables_from_prompt(prompt: str) -> list[str]: + """Extract input variables from prompt.""" + return re.findall(r"{(.*?)}", prompt) diff --git a/src/backend/langflow/processing/__init__.py b/src/backend/langflow/processing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/langflow/processing/base.py b/src/backend/langflow/processing/base.py new file mode 100644 index 000000000..97b0d5be0 --- /dev/null +++ b/src/backend/langflow/processing/base.py @@ -0,0 +1,55 @@ +from langflow.api.v1.callback import ( + AsyncStreamingLLMCallbackHandler, + StreamingLLMCallbackHandler, +) +from langflow.processing.process import fix_memory_inputs, format_actions +from langflow.utils.logger import logger + + +async def get_result_and_steps(langchain_object, message: str, **kwargs): + """Get result and thought from extracted json""" + + try: + if hasattr(langchain_object, "verbose"): + langchain_object.verbose = True + chat_input = None + memory_key = "" + if hasattr(langchain_object, "memory") and langchain_object.memory is not None: + memory_key = langchain_object.memory.memory_key + + if hasattr(langchain_object, "input_keys"): + for key in langchain_object.input_keys: + if key not in [memory_key, "chat_history"]: + chat_input = {key: message} + else: + chat_input = message # type: ignore + + if hasattr(langchain_object, "return_intermediate_steps"): + # https://github.com/hwchase17/langchain/issues/2068 + # Deactivating until we have a frontend solution + # to display intermediate steps + langchain_object.return_intermediate_steps = True + + fix_memory_inputs(langchain_object) + try: + async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)] + output = await langchain_object.acall(chat_input, callbacks=async_callbacks) + except Exception as exc: + # make the error message more informative + logger.debug(f"Error: {str(exc)}") + sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)] + output = langchain_object(chat_input, callbacks=sync_callbacks) + + intermediate_steps = ( + output.get("intermediate_steps", []) if isinstance(output, dict) else [] + ) + + result = ( + output.get(langchain_object.output_keys[0]) + if isinstance(output, dict) + else output + ) + thought = format_actions(intermediate_steps) if intermediate_steps else "" + except Exception as exc: + raise ValueError(f"Error: {str(exc)}") from exc + return result, thought diff --git a/src/backend/langflow/processing/process.py b/src/backend/langflow/processing/process.py new file mode 100644 index 000000000..3b8852e00 --- /dev/null +++ b/src/backend/langflow/processing/process.py @@ -0,0 +1,172 @@ +import contextlib +import io +from langchain.schema import AgentAction +import json +from langflow.interface.run import ( + build_langchain_object_with_caching, + get_memory_key, + update_memory_keys, +) +from langflow.utils.logger import logger +from langflow.graph import Graph + + +from typing import Any, Dict, List, Tuple + + +def fix_memory_inputs(langchain_object): + """ + Given a LangChain object, this function checks if it has a memory attribute and if that memory key exists in the + object's input variables. If so, it does nothing. Otherwise, it gets a possible new memory key using the + get_memory_key function and updates the memory keys using the update_memory_keys function. + """ + if hasattr(langchain_object, "memory") and langchain_object.memory is not None: + try: + if langchain_object.memory.memory_key in langchain_object.input_variables: + return + except AttributeError: + input_variables = ( + langchain_object.prompt.input_variables + if hasattr(langchain_object, "prompt") + else langchain_object.input_keys + ) + if langchain_object.memory.memory_key in input_variables: + return + + possible_new_mem_key = get_memory_key(langchain_object) + if possible_new_mem_key is not None: + update_memory_keys(langchain_object, possible_new_mem_key) + + +def format_actions(actions: List[Tuple[AgentAction, str]]) -> str: + """Format a list of (AgentAction, answer) tuples into a string.""" + output = [] + for action, answer in actions: + log = action.log + tool = action.tool + tool_input = action.tool_input + output.append(f"Log: {log}") + if "Action" not in log and "Action Input" not in log: + output.append(f"Tool: {tool}") + output.append(f"Tool Input: {tool_input}") + output.append(f"Answer: {answer}") + output.append("") # Add a blank line + return "\n".join(output) + + +def get_result_and_thought(langchain_object, message: str): + """Get result and thought from extracted json""" + try: + if hasattr(langchain_object, "verbose"): + langchain_object.verbose = True + chat_input = None + memory_key = "" + if hasattr(langchain_object, "memory") and langchain_object.memory is not None: + memory_key = langchain_object.memory.memory_key + + if hasattr(langchain_object, "input_keys"): + for key in langchain_object.input_keys: + if key not in [memory_key, "chat_history"]: + chat_input = {key: message} + else: + chat_input = message # type: ignore + + if hasattr(langchain_object, "return_intermediate_steps"): + # https://github.com/hwchase17/langchain/issues/2068 + # Deactivating until we have a frontend solution + # to display intermediate steps + langchain_object.return_intermediate_steps = False + + fix_memory_inputs(langchain_object) + + with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer): + try: + # if hasattr(langchain_object, "acall"): + # output = await langchain_object.acall(chat_input) + # else: + output = langchain_object(chat_input) + except ValueError as exc: + # make the error message more informative + logger.debug(f"Error: {str(exc)}") + output = langchain_object.run(chat_input) + + intermediate_steps = ( + output.get("intermediate_steps", []) if isinstance(output, dict) else [] + ) + + result = ( + output.get(langchain_object.output_keys[0]) + if isinstance(output, dict) + else output + ) + if intermediate_steps: + thought = format_actions(intermediate_steps) + else: + thought = output_buffer.getvalue() + + except Exception as exc: + raise ValueError(f"Error: {str(exc)}") from exc + return result, thought + + +def load_or_build_langchain_object(data_graph, is_first_message=False): + """ + Load langchain object from cache if it exists, otherwise build it. + """ + if is_first_message: + build_langchain_object_with_caching.clear_cache() + return build_langchain_object_with_caching(data_graph) + + +def process_graph_cached(data_graph: Dict[str, Any], message: str): + """ + Process graph by extracting input variables and replacing ZeroShotPrompt + with PromptTemplate,then run the graph and return the result and thought. + """ + # Load langchain object + is_first_message = len(data_graph.get("chatHistory", [])) == 0 + langchain_object = load_or_build_langchain_object(data_graph, is_first_message) + logger.debug("Loaded langchain object") + + if langchain_object is None: + # Raise user facing error + raise ValueError( + "There was an error loading the langchain_object. Please, check all the nodes and try again." + ) + + # Generate result and thought + logger.debug("Generating result and thought") + result, thought = get_result_and_thought(langchain_object, message) + logger.debug("Generated result and thought") + return {"result": str(result), "thought": thought.strip()} + + +def load_flow_from_json(path: str, build=True): + """Load flow from json file""" + # This is done to avoid circular imports + + with open(path, "r", encoding="utf-8") as f: + flow_graph = json.load(f) + data_graph = flow_graph["data"] + nodes = data_graph["nodes"] + # Substitute ZeroShotPrompt with PromptTemplate + # nodes = replace_zero_shot_prompt_with_prompt_template(nodes) + # Add input variables + # nodes = payload.extract_input_variables(nodes) + + # Nodes, edges and root node + edges = data_graph["edges"] + graph = Graph(nodes, edges) + if build: + langchain_object = graph.build() + if hasattr(langchain_object, "verbose"): + langchain_object.verbose = True + + if hasattr(langchain_object, "return_intermediate_steps"): + # https://github.com/hwchase17/langchain/issues/2068 + # Deactivating until we have a frontend solution + # to display intermediate steps + langchain_object.return_intermediate_steps = False + fix_memory_inputs(langchain_object) + return langchain_object + return graph