feat(worker.py): add worker module to handle background tasks in LangFlow backend

🔧 chore(worker.py): refactor process_graph_cached function to improve readability and maintainability
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-13 19:25:59 -03:00
commit 304e824ef7

View file

@ -0,0 +1,62 @@
from langflow.core.celery_app import celery_app
from typing import Any, Dict, Optional
@celery_app.task(acks_late=True)
def test_celery(word: str) -> str:
return f"test task return {word}"
@celery_app.task(acks_late=True)
def process_graph_cached(
data_graph: Dict[str, Any], inputs: Optional[dict] = None, clear_cache=False
):
"""
Process graph by extracting input variables and replacing ZeroShotPrompt
with PromptTemplate,then run the graph and return the result and thought.
"""
from langflow.interface.run import build_sorted_vertices_with_caching
from langflow.processing.process import get_result_and_thought
from langchain.chains.base import Chain
from langchain.vectorstores.base import VectorStore
from langflow.utils.logger import logger
# Load langchain object
if clear_cache:
build_sorted_vertices_with_caching.clear_cache()
logger.debug("Cleared cache")
langchain_object, artifacts = build_sorted_vertices_with_caching(data_graph)
logger.debug("Loaded LangChain object")
if inputs is None:
inputs = {}
# Add artifacts to inputs
# artifacts can be documents loaded when building
# the flow
for (
key,
value,
) in artifacts.items():
if key not in inputs or not inputs[key]:
inputs[key] = value
if langchain_object is None:
# Raise user facing error
raise ValueError(
"There was an error loading the langchain_object. Please, check all the nodes and try again."
)
# Generate result and thought
if isinstance(langchain_object, Chain):
if inputs is None:
raise ValueError("Inputs must be provided for a Chain")
logger.debug("Generating result and thought")
result = get_result_and_thought(langchain_object, inputs)
logger.debug("Generated result and thought")
elif isinstance(langchain_object, VectorStore):
result = langchain_object.search(**inputs)
else:
raise ValueError(
f"Unknown langchain_object type: {type(langchain_object).__name__}"
)
return result