refactor: update TelemetryService and change Service teardown methods to be async (#2937)
* refactor: convert teardown_services function to async The `teardown_services` function in `utils.py` has been converted to an asynchronous function to ensure proper handling of asynchronous operations during service teardown. This change improves the overall reliability and performance of the codebase. * refactor: convert teardown method to async Convert the `teardown` method in the `Service` class to an asynchronous function to handle asynchronous operations during service teardown. This change improves the reliability and performance of the codebase. * refactor: convert teardown methods to async in services * feat: add teardown method in TelemetryService Convert the `teardown` method in the `TelemetryService` class to an asynchronous function to handle asynchronous operations during service teardown. This change improves the reliability and performance of the codebase. * refactor: update TelemetryService to use optimized payload in get request The `TelemetryService` class in `service.py` has been updated to use an optimized payload dictionary in the `get` request. This change improves the efficiency and performance of the codebase. * refactor: convert teardown method to async in utils.py
This commit is contained in:
parent
51b85dbc8e
commit
e0a0242d7a
9 changed files with 18 additions and 12 deletions
|
|
@ -99,7 +99,7 @@ def get_lifespan(fix_migration=False, socketio_server=None, version=None):
|
|||
raise
|
||||
# Shutdown message
|
||||
rprint("[bold red]Shutting down Langflow...[/bold red]")
|
||||
teardown_services()
|
||||
await teardown_services()
|
||||
|
||||
return lifespan
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ class Service(ABC):
|
|||
}
|
||||
return schema
|
||||
|
||||
def teardown(self):
|
||||
async def teardown(self):
|
||||
pass
|
||||
|
||||
def set_ready(self):
|
||||
|
|
|
|||
|
|
@ -294,7 +294,7 @@ class DatabaseService(Service):
|
|||
|
||||
logger.debug("Database and tables created successfully")
|
||||
|
||||
def teardown(self):
|
||||
async def teardown(self):
|
||||
logger.debug("Tearing down database")
|
||||
try:
|
||||
settings_service = get_settings_service()
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import asyncio
|
||||
import importlib
|
||||
import inspect
|
||||
from typing import TYPE_CHECKING, Dict, Optional
|
||||
|
|
@ -6,7 +7,6 @@ from loguru import logger
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from langflow.services.base import Service
|
||||
|
||||
from langflow.services.factory import ServiceFactory
|
||||
from langflow.services.schema import ServiceType
|
||||
|
||||
|
|
@ -93,7 +93,7 @@ class ServiceManager:
|
|||
self.services.pop(service_name, None)
|
||||
self.get(service_name)
|
||||
|
||||
def teardown(self):
|
||||
async def teardown(self):
|
||||
"""
|
||||
Teardown all the services.
|
||||
"""
|
||||
|
|
@ -102,7 +102,9 @@ class ServiceManager:
|
|||
continue
|
||||
logger.debug(f"Teardown service {service.name}")
|
||||
try:
|
||||
service.teardown()
|
||||
result = service.teardown()
|
||||
if asyncio.iscoroutine(result):
|
||||
await result
|
||||
except Exception as exc:
|
||||
logger.exception(exc)
|
||||
self.services = {}
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ class PluginService(Service):
|
|||
return plugin.get()
|
||||
return None
|
||||
|
||||
def teardown(self):
|
||||
async def teardown(self):
|
||||
for plugin in self.plugins.values():
|
||||
plugin.teardown()
|
||||
|
||||
|
|
|
|||
|
|
@ -90,6 +90,6 @@ class LocalStorageService(StorageService):
|
|||
else:
|
||||
logger.warning(f"Attempted to delete non-existent file {file_name} in flow {flow_id}.")
|
||||
|
||||
def teardown(self):
|
||||
async def teardown(self):
|
||||
"""Perform any cleanup operations when the service is being torn down."""
|
||||
pass # No specific teardown actions required for local
|
||||
|
|
|
|||
|
|
@ -38,5 +38,5 @@ class StorageService(Service):
|
|||
async def delete_file(self, flow_id: str, file_name: str) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
def teardown(self):
|
||||
async def teardown(self):
|
||||
raise NotImplementedError
|
||||
|
|
|
|||
|
|
@ -61,7 +61,8 @@ class TelemetryService(Service):
|
|||
if path:
|
||||
url = f"{url}/{path}"
|
||||
try:
|
||||
response = await self.client.get(url, params=payload.model_dump())
|
||||
payload_dict = payload.model_dump(exclude_none=True, exclude_unset=True)
|
||||
response = await self.client.get(url, params=payload_dict)
|
||||
if response.status_code != 200:
|
||||
logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}")
|
||||
else:
|
||||
|
|
@ -134,3 +135,6 @@ class TelemetryService(Service):
|
|||
await self.client.aclose()
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping tracing service: {e}")
|
||||
|
||||
async def teardown(self):
|
||||
await self.stop()
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ def teardown_superuser(settings_service, session):
|
|||
raise RuntimeError("Could not remove default superuser.") from exc
|
||||
|
||||
|
||||
def teardown_services():
|
||||
async def teardown_services():
|
||||
"""
|
||||
Teardown all the services.
|
||||
"""
|
||||
|
|
@ -120,7 +120,7 @@ def teardown_services():
|
|||
try:
|
||||
from langflow.services.manager import service_manager
|
||||
|
||||
service_manager.teardown()
|
||||
await service_manager.teardown()
|
||||
except Exception as exc:
|
||||
logger.exception(exc)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue