feat: clean messages table after run in flow_runner (#8773)

* feat: flow_runner cleanup messages table

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes

* fix lint

* lint

* fix lint

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Bar Nuri 2025-07-01 14:49:03 +03:00 committed by GitHub
commit 087fa34a57
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 29 additions and 44 deletions

View file

@ -26,9 +26,9 @@
"id": "reactflow__edge-ChatInput-et7o5{œdataTypeœ:œChatInputœ,œidœ:œChatInput-et7o5œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-Prompt-V3tlJ{œfieldNameœ:œquestionœ,œidœ:œPrompt-V3tlJœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}",
"selected": false,
"source": "ChatInput-et7o5",
"sourceHandle": "{œdataTypeœ:œChatInputœ,œidœ:œChatInput-et7o5œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}",
"sourceHandle": "{œdataTypeœ: œChatInputœ, œidœ: œChatInput-et7o5œ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}",
"target": "Prompt-V3tlJ",
"targetHandle": "{œfieldNameœ:œquestionœ,œidœ:œPrompt-V3tlJœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}"
"targetHandle": "{œfieldNameœ: œquestionœ, œidœ: œPrompt-V3tlJœ, œinputTypesœ: [œMessageœ, œTextœ], œtypeœ: œstrœ}"
},
{
"animated": false,
@ -55,9 +55,9 @@
"id": "reactflow__edge-parser-WUXPk{œdataTypeœ:œparserœ,œidœ:œparser-WUXPkœ,œnameœ:œparsed_textœ,œoutput_typesœ:[œMessageœ]}-Prompt-V3tlJ{œfieldNameœ:œcontextœ,œidœ:œPrompt-V3tlJœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}",
"selected": false,
"source": "parser-WUXPk",
"sourceHandle": "{œdataTypeœ:œparserœ,œidœ:œparser-WUXPkœ,œnameœ:œparsed_textœ,œoutput_typesœ:[œMessageœ]}",
"sourceHandle": "{œdataTypeœ: œparserœ, œidœ: œparser-WUXPkœ, œnameœ: œparsed_textœ, œoutput_typesœ: [œMessageœ]}",
"target": "Prompt-V3tlJ",
"targetHandle": "{œfieldNameœ:œcontextœ,œidœ:œPrompt-V3tlJœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}"
"targetHandle": "{œfieldNameœ: œcontextœ, œidœ: œPrompt-V3tlJœ, œinputTypesœ: [œMessageœ, œTextœ], œtypeœ: œstrœ}"
},
{
"animated": false,
@ -83,9 +83,9 @@
"id": "reactflow__edge-OpenAIEmbeddings-oFtHy{œdataTypeœ:œOpenAIEmbeddingsœ,œidœ:œOpenAIEmbeddings-oFtHyœ,œnameœ:œembeddingsœ,œoutput_typesœ:[œEmbeddingsœ]}-AstraDB-W6NB4{œfieldNameœ:œembedding_modelœ,œidœ:œAstraDB-W6NB4œ,œinputTypesœ:[œEmbeddingsœ],œtypeœ:œotherœ}",
"selected": false,
"source": "OpenAIEmbeddings-oFtHy",
"sourceHandle": "{œdataTypeœ:œOpenAIEmbeddingsœ,œidœ:œOpenAIEmbeddings-oFtHyœ,œnameœ:œembeddingsœ,œoutput_typesœ:[œEmbeddingsœ]}",
"sourceHandle": "{œdataTypeœ: œOpenAIEmbeddingsœ, œidœ: œOpenAIEmbeddings-oFtHyœ, œnameœ: œembeddingsœ, œoutput_typesœ: [œEmbeddingsœ]}",
"target": "AstraDB-W6NB4",
"targetHandle": "{œfieldNameœ:œembedding_modelœ,œidœ:œAstraDB-W6NB4œ,œinputTypesœ:[œEmbeddingsœ],œtypeœ:œotherœ}"
"targetHandle": "{œfieldNameœ: œembedding_modelœ, œidœ: œAstraDB-W6NB4œ, œinputTypesœ: [œEmbeddingsœ], œtypeœ: œotherœ}"
},
{
"animated": false,
@ -111,9 +111,9 @@
"id": "reactflow__edge-OpenAIEmbeddings-v0rcw{œdataTypeœ:œOpenAIEmbeddingsœ,œidœ:œOpenAIEmbeddings-v0rcwœ,œnameœ:œembeddingsœ,œoutput_typesœ:[œEmbeddingsœ]}-AstraDB-JsRrT{œfieldNameœ:œembedding_modelœ,œidœ:œAstraDB-JsRrTœ,œinputTypesœ:[œEmbeddingsœ],œtypeœ:œotherœ}",
"selected": false,
"source": "OpenAIEmbeddings-v0rcw",
"sourceHandle": "{œdataTypeœ:œOpenAIEmbeddingsœ,œidœ:œOpenAIEmbeddings-v0rcwœ,œnameœ:œembeddingsœ,œoutput_typesœ:[œEmbeddingsœ]}",
"sourceHandle": "{œdataTypeœ: œOpenAIEmbeddingsœ, œidœ: œOpenAIEmbeddings-v0rcwœ, œnameœ: œembeddingsœ, œoutput_typesœ: [œEmbeddingsœ]}",
"target": "AstraDB-JsRrT",
"targetHandle": "{œfieldNameœ:œembedding_modelœ,œidœ:œAstraDB-JsRrTœ,œinputTypesœ:[œEmbeddingsœ],œtypeœ:œotherœ}"
"targetHandle": "{œfieldNameœ: œembedding_modelœ, œidœ: œAstraDB-JsRrTœ, œinputTypesœ: [œEmbeddingsœ], œtypeœ: œotherœ}"
},
{
"animated": false,
@ -139,9 +139,9 @@
"id": "reactflow__edge-ChatInput-et7o5{œdataTypeœ:œChatInputœ,œidœ:œChatInput-et7o5œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-AstraDB-JsRrT{œfieldNameœ:œsearch_queryœ,œidœ:œAstraDB-JsRrTœ,œinputTypesœ:[œMessageœ],œtypeœ:œqueryœ}",
"selected": false,
"source": "ChatInput-et7o5",
"sourceHandle": "{œdataTypeœ:œChatInputœ,œidœ:œChatInput-et7o5œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}",
"sourceHandle": "{œdataTypeœ: œChatInputœ, œidœ: œChatInput-et7o5œ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}",
"target": "AstraDB-JsRrT",
"targetHandle": "{œfieldNameœ:œsearch_queryœ,œidœ:œAstraDB-JsRrTœ,œinputTypesœ:[œMessageœ],œtypeœ:œqueryœ}"
"targetHandle": "{œfieldNameœ: œsearch_queryœ, œidœ: œAstraDB-JsRrTœ, œinputTypesœ: [œMessageœ], œtypeœ: œqueryœ}"
},
{
"animated": false,
@ -168,9 +168,9 @@
"id": "reactflow__edge-AstraDB-JsRrT{œdataTypeœ:œAstraDBœ,œidœ:œAstraDB-JsRrTœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-parser-WUXPk{œfieldNameœ:œinput_dataœ,œidœ:œparser-WUXPkœ,œinputTypesœ:[œDataFrameœ,œDataœ],œtypeœ:œotherœ}",
"selected": false,
"source": "AstraDB-JsRrT",
"sourceHandle": "{œdataTypeœ:œAstraDBœ,œidœ:œAstraDB-JsRrTœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}",
"sourceHandle": "{œdataTypeœ: œAstraDBœ, œidœ: œAstraDB-JsRrTœ, œnameœ: œdataframeœ, œoutput_typesœ: [œDataFrameœ]}",
"target": "parser-WUXPk",
"targetHandle": "{œfieldNameœ:œinput_dataœ,œidœ:œparser-WUXPkœ,œinputTypesœ:[œDataFrameœ,œDataœ],œtypeœ:œotherœ}"
"targetHandle": "{œfieldNameœ: œinput_dataœ, œidœ: œparser-WUXPkœ, œinputTypesœ: [œDataFrameœ, œDataœ], œtypeœ: œotherœ}"
},
{
"animated": false,
@ -197,9 +197,9 @@
"id": "reactflow__edge-SplitText-6H5cD{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-6H5cDœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-AstraDB-W6NB4{œfieldNameœ:œingest_dataœ,œidœ:œAstraDB-W6NB4œ,œinputTypesœ:[œDataœ,œDataFrameœ],œtypeœ:œotherœ}",
"selected": false,
"source": "SplitText-6H5cD",
"sourceHandle": "{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-6H5cDœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}",
"sourceHandle": "{œdataTypeœ: œSplitTextœ, œidœ: œSplitText-6H5cDœ, œnameœ: œdataframeœ, œoutput_typesœ: [œDataFrameœ]}",
"target": "AstraDB-W6NB4",
"targetHandle": "{œfieldNameœ:œingest_dataœ,œidœ:œAstraDB-W6NB4œ,œinputTypesœ:[œDataœ,œDataFrameœ],œtypeœ:œotherœ}"
"targetHandle": "{œfieldNameœ: œingest_dataœ, œidœ: œAstraDB-W6NB4œ, œinputTypesœ: [œDataœ, œDataFrameœ], œtypeœ: œotherœ}"
},
{
"animated": false,
@ -227,9 +227,9 @@
"id": "reactflow__edge-File-vusZ2{œdataTypeœ:œFileœ,œidœ:œFile-vusZ2œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-SplitText-6H5cD{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-6H5cDœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}",
"selected": false,
"source": "File-vusZ2",
"sourceHandle": "{œdataTypeœ:œFileœ,œidœ:œFile-vusZ2œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}",
"sourceHandle": "{œdataTypeœ: œFileœ, œidœ: œFile-vusZ2œ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}",
"target": "SplitText-6H5cD",
"targetHandle": "{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-6H5cDœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}"
"targetHandle": "{œfieldNameœ: œdata_inputsœ, œidœ: œSplitText-6H5cDœ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œotherœ}"
},
{
"animated": false,
@ -255,9 +255,9 @@
"id": "reactflow__edge-Prompt-V3tlJ{œdataTypeœ:œPromptœ,œidœ:œPrompt-V3tlJœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}-LanguageModelComponent-1uhUK{œfieldNameœ:œinput_valueœ,œidœ:œLanguageModelComponent-1uhUKœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}",
"selected": false,
"source": "Prompt-V3tlJ",
"sourceHandle": "{œdataTypeœ:œPromptœ,œidœ:œPrompt-V3tlJœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}",
"sourceHandle": "{œdataTypeœ: œPromptœ, œidœ: œPrompt-V3tlJœ, œnameœ: œpromptœ, œoutput_typesœ: [œMessageœ]}",
"target": "LanguageModelComponent-1uhUK",
"targetHandle": "{œfieldNameœ:œinput_valueœ,œidœ:œLanguageModelComponent-1uhUKœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}"
"targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œLanguageModelComponent-1uhUKœ, œinputTypesœ: [œMessageœ], œtypeœ: œstrœ}"
},
{
"animated": false,
@ -285,9 +285,9 @@
"id": "reactflow__edge-LanguageModelComponent-1uhUK{œdataTypeœ:œLanguageModelComponentœ,œidœ:œLanguageModelComponent-1uhUKœ,œnameœ:œtext_outputœ,œoutput_typesœ:[œMessageœ]}-ChatOutput-ZaYDW{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-ZaYDWœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œstrœ}",
"selected": false,
"source": "LanguageModelComponent-1uhUK",
"sourceHandle": "{œdataTypeœ:œLanguageModelComponentœ,œidœ:œLanguageModelComponent-1uhUKœ,œnameœ:œtext_outputœ,œoutput_typesœ:[œMessageœ]}",
"sourceHandle": "{œdataTypeœ: œLanguageModelComponentœ, œidœ: œLanguageModelComponent-1uhUKœ, œnameœ: œtext_outputœ, œoutput_typesœ: [œMessageœ]}",
"target": "ChatOutput-ZaYDW",
"targetHandle": "{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-ZaYDWœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œstrœ}"
"targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œChatOutput-ZaYDWœ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œstrœ}"
}
],
"nodes": [
@ -2701,8 +2701,6 @@
"group_outputs": false,
"method": "search_documents",
"name": "search_results",
"options": null,
"required_inputs": null,
"selected": "Data",
"tool_mode": true,
"types": [
@ -2717,8 +2715,6 @@
"group_outputs": false,
"method": "as_dataframe",
"name": "dataframe",
"options": null,
"required_inputs": null,
"selected": "DataFrame",
"tool_mode": true,
"types": [
@ -2734,8 +2730,6 @@
"hidden": true,
"method": "as_vector_store",
"name": "vectorstoreconnection",
"options": null,
"required_inputs": null,
"selected": "VectorStore",
"tool_mode": true,
"types": [
@ -3071,9 +3065,7 @@
"dynamic": false,
"info": "The Database name for the Astra DB instance.",
"name": "database_name",
"options": [
"us-east-2"
],
"options": [],
"options_metadata": [
{
"api_endpoint": "https://5b8bb22c-4a38-4f0a-865c-a18ed7590bd1-us-east-2.apps.astra.datastax.com",
@ -3482,8 +3474,6 @@
"group_outputs": false,
"method": "search_documents",
"name": "search_results",
"options": null,
"required_inputs": null,
"selected": "Data",
"tool_mode": true,
"types": [
@ -3498,8 +3488,6 @@
"group_outputs": false,
"method": "as_dataframe",
"name": "dataframe",
"options": null,
"required_inputs": null,
"selected": "DataFrame",
"tool_mode": true,
"types": [
@ -3515,8 +3503,6 @@
"hidden": true,
"method": "as_vector_store",
"name": "vectorstoreconnection",
"options": null,
"required_inputs": null,
"selected": "VectorStore",
"tool_mode": true,
"types": [
@ -3851,9 +3837,7 @@
"dynamic": false,
"info": "The Database name for the Astra DB instance.",
"name": "database_name",
"options": [
"us-east-2"
],
"options": [],
"options_metadata": [
{
"api_endpoint": "https://5b8bb22c-4a38-4f0a-865c-a18ed7590bd1-us-east-2.apps.astra.datastax.com",

View file

@ -5,7 +5,7 @@ from uuid import UUID, uuid4
from aiofile import async_open
from loguru import logger
from sqlmodel import delete, text
from sqlmodel import delete, select, text
from langflow.api.utils import cascade_delete_flow
from langflow.graph import Graph
@ -17,9 +17,7 @@ from langflow.services.auth.utils import (
get_password_hash,
)
from langflow.services.cache.service import AsyncBaseCacheService
from langflow.services.database.models.flow import Flow
from langflow.services.database.models.user import User
from langflow.services.database.models.variable import Variable
from langflow.services.database.models import Flow, User, Variable
from langflow.services.database.utils import initialize_database
from langflow.services.deps import get_cache_service, get_storage_service, session_scope
from langflow.utils.util import update_settings
@ -228,9 +226,12 @@ class LangflowRunnerExperimental:
@staticmethod
async def clear_user_state(user_id: str):
async with session_scope() as session:
await session.exec(delete(Flow).where(Flow.user_id == user_id))
await session.exec(delete(User).where(User.id == user_id))
flows = await session.exec(select(Flow.id).where(Flow.user_id == user_id))
flow_ids: list[UUID] = [fid for fid in flows.scalars().all() if fid is not None]
for flow_id in flow_ids:
await cascade_delete_flow(session, flow_id)
await session.exec(delete(Variable).where(Variable.user_id == user_id))
await session.exec(delete(User).where(User.id == user_id))
async def init_db_if_needed(self):
if not await self.database_exists_check() and self.should_initialize_db: