fix: add a lock check on starter project initialization (#8631)
* Adds a lock check on starter project initialization * [autofix.ci] apply automated fixes * ruff fix for exception * [autofix.ci] apply automated fixes * Adds a lock check on starter project initialization * [autofix.ci] apply automated fixes * fix: Improve logging for lock acquisition failures in starter project creation * [autofix.ci] apply automated fixes --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
parent
c05327d773
commit
360c2395fb
4 changed files with 411 additions and 423 deletions
|
|
@ -124,7 +124,9 @@ dependencies = [
|
|||
"cleanlab-tlm>=1.1.2",
|
||||
'gassist>=0.0.1; sys_platform == "win32"',
|
||||
"twelvelabs>=0.4.7",
|
||||
"filelock>=3.18.0",
|
||||
"docling>=2.36.1",
|
||||
"filelock>=3.18.0"
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
|
|
|
|||
|
|
@ -26,9 +26,9 @@
|
|||
"id": "xy-edge__Memory-hMFmY{œdataTypeœ:œMemoryœ,œidœ:œMemory-hMFmYœ,œnameœ:œmessages_textœ,œoutput_typesœ:[œMessageœ]}-Prompt-CixsJ{œfieldNameœ:œmemoryœ,œidœ:œPrompt-CixsJœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}",
|
||||
"selected": false,
|
||||
"source": "Memory-hMFmY",
|
||||
"sourceHandle": "{œdataTypeœ:œMemoryœ,œidœ:œMemory-hMFmYœ,œnameœ:œmessages_textœ,œoutput_typesœ:[œMessageœ]}",
|
||||
"sourceHandle": "{œdataTypeœ: œMemoryœ, œidœ: œMemory-hMFmYœ, œnameœ: œmessages_textœ, œoutput_typesœ: [œMessageœ]}",
|
||||
"target": "Prompt-CixsJ",
|
||||
"targetHandle": "{œfieldNameœ:œmemoryœ,œidœ:œPrompt-CixsJœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}"
|
||||
"targetHandle": "{œfieldNameœ: œmemoryœ, œidœ: œPrompt-CixsJœ, œinputTypesœ: [œMessageœ, œTextœ], œtypeœ: œstrœ}"
|
||||
},
|
||||
{
|
||||
"animated": false,
|
||||
|
|
@ -54,9 +54,9 @@
|
|||
"id": "xy-edge__Prompt-CixsJ{œdataTypeœ:œPromptœ,œidœ:œPrompt-CixsJœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}-LanguageModelComponent-vvfAj{œfieldNameœ:œsystem_messageœ,œidœ:œLanguageModelComponent-vvfAjœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}",
|
||||
"selected": false,
|
||||
"source": "Prompt-CixsJ",
|
||||
"sourceHandle": "{œdataTypeœ:œPromptœ,œidœ:œPrompt-CixsJœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}",
|
||||
"sourceHandle": "{œdataTypeœ: œPromptœ, œidœ: œPrompt-CixsJœ, œnameœ: œpromptœ, œoutput_typesœ: [œMessageœ]}",
|
||||
"target": "LanguageModelComponent-vvfAj",
|
||||
"targetHandle": "{œfieldNameœ:œsystem_messageœ,œidœ:œLanguageModelComponent-vvfAjœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}"
|
||||
"targetHandle": "{œfieldNameœ: œsystem_messageœ, œidœ: œLanguageModelComponent-vvfAjœ, œinputTypesœ: [œMessageœ], œtypeœ: œstrœ}"
|
||||
},
|
||||
{
|
||||
"animated": false,
|
||||
|
|
@ -82,9 +82,9 @@
|
|||
"id": "xy-edge__ChatInput-5t3wq{œdataTypeœ:œChatInputœ,œidœ:œChatInput-5t3wqœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-LanguageModelComponent-vvfAj{œfieldNameœ:œinput_valueœ,œidœ:œLanguageModelComponent-vvfAjœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}",
|
||||
"selected": false,
|
||||
"source": "ChatInput-5t3wq",
|
||||
"sourceHandle": "{œdataTypeœ:œChatInputœ,œidœ:œChatInput-5t3wqœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}",
|
||||
"sourceHandle": "{œdataTypeœ: œChatInputœ, œidœ: œChatInput-5t3wqœ, œnameœ: œmessageœ, œoutput_typesœ: [œMessageœ]}",
|
||||
"target": "LanguageModelComponent-vvfAj",
|
||||
"targetHandle": "{œfieldNameœ:œinput_valueœ,œidœ:œLanguageModelComponent-vvfAjœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}"
|
||||
"targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œLanguageModelComponent-vvfAjœ, œinputTypesœ: [œMessageœ], œtypeœ: œstrœ}"
|
||||
},
|
||||
{
|
||||
"animated": false,
|
||||
|
|
@ -112,9 +112,9 @@
|
|||
"id": "xy-edge__LanguageModelComponent-vvfAj{œdataTypeœ:œLanguageModelComponentœ,œidœ:œLanguageModelComponent-vvfAjœ,œnameœ:œtext_outputœ,œoutput_typesœ:[œMessageœ]}-ChatOutput-CwxcD{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-CwxcDœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œstrœ}",
|
||||
"selected": false,
|
||||
"source": "LanguageModelComponent-vvfAj",
|
||||
"sourceHandle": "{œdataTypeœ:œLanguageModelComponentœ,œidœ:œLanguageModelComponent-vvfAjœ,œnameœ:œtext_outputœ,œoutput_typesœ:[œMessageœ]}",
|
||||
"sourceHandle": "{œdataTypeœ: œLanguageModelComponentœ, œidœ: œLanguageModelComponent-vvfAjœ, œnameœ: œtext_outputœ, œoutput_typesœ: [œMessageœ]}",
|
||||
"target": "ChatOutput-CwxcD",
|
||||
"targetHandle": "{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-CwxcDœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œstrœ}"
|
||||
"targetHandle": "{œfieldNameœ: œinput_valueœ, œidœ: œChatOutput-CwxcDœ, œinputTypesœ: [œDataœ, œDataFrameœ, œMessageœ], œtypeœ: œstrœ}"
|
||||
}
|
||||
],
|
||||
"nodes": [
|
||||
|
|
@ -1006,6 +1006,7 @@
|
|||
"group_outputs": false,
|
||||
"method": "retrieve_messages_dataframe",
|
||||
"name": "dataframe",
|
||||
"selected": null,
|
||||
"tool_mode": true,
|
||||
"types": [
|
||||
"DataFrame"
|
||||
|
|
|
|||
|
|
@ -154,10 +154,31 @@ def get_lifespan(*, fix_migration=False, version=None):
|
|||
all_types_dict = await get_and_cache_all_types_dict(get_settings_service())
|
||||
logger.debug(f"Types cached in {asyncio.get_event_loop().time() - current_time:.2f}s")
|
||||
|
||||
# Use file-based lock to prevent multiple workers from creating duplicate starter projects concurrently.
|
||||
# Note that it's still possible that one worker may complete this task, release the lock,
|
||||
# then another worker pick it up, but the operation is idempotent so worst case it duplicates
|
||||
# the initialization work.
|
||||
current_time = asyncio.get_event_loop().time()
|
||||
logger.debug("Creating/updating starter projects")
|
||||
await create_or_update_starter_projects(all_types_dict)
|
||||
logger.debug(f"Starter projects updated in {asyncio.get_event_loop().time() - current_time:.2f}s")
|
||||
import tempfile
|
||||
|
||||
from filelock import FileLock
|
||||
|
||||
lock_file = Path(tempfile.gettempdir()) / "langflow_starter_projects.lock"
|
||||
lock = FileLock(lock_file, timeout=1)
|
||||
try:
|
||||
with lock:
|
||||
await create_or_update_starter_projects(all_types_dict)
|
||||
logger.debug(
|
||||
f"Starter projects created/updated in {asyncio.get_event_loop().time() - current_time:.2f}s"
|
||||
)
|
||||
except TimeoutError:
|
||||
# Another process has the lock
|
||||
logger.debug("Another worker is creating starter projects, skipping")
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.warning(
|
||||
f"Failed to acquire lock for starter projects: {e}. Starter projects may not be created or updated."
|
||||
)
|
||||
|
||||
current_time = asyncio.get_event_loop().time()
|
||||
logger.debug("Starting telemetry service")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue