🔧 chore(__main__.py): remove unused imports and functions to improve code cleanliness and maintainability

🔧 chore(main.py): update import statement to use get_number_of_workers from __main__ module
🔧 chore(util.py): remove unused imports and functions to improve code cleanliness and maintainability
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-28 18:09:26 -03:00
commit acd661f629
3 changed files with 30 additions and 34 deletions

View file

@ -4,9 +4,8 @@ import httpx
from langflow.services.database.utils import session_getter
from langflow.services.manager import initialize_services, initialize_settings_manager
from langflow.services.utils import get_db_manager, get_settings_manager
from langflow.utils.util import get_number_of_workers
from langflow.utils.util import display_results
from multiprocess import Process # type: ignore
from multiprocess import Process, cpu_count # type: ignore
import platform
from pathlib import Path
from typing import Optional
@ -14,6 +13,7 @@ import socket
from rich.panel import Panel
from rich import box
from rich import print as rprint
from rich.table import Table
import typer
from langflow.main import setup_app
from langflow.utils.logger import configure, logger
@ -27,6 +27,32 @@ console = Console()
app = typer.Typer()
def get_number_of_workers(workers=None):
if workers == -1 or workers is None:
workers = (cpu_count() * 2) + 1
logger.debug(f"Number of workers: {workers}")
return workers
def display_results(results):
"""
Display the results of the migration.
"""
for table_results in results:
table = Table(title=f"Migration {table_results.table_name}")
table.add_column("Name")
table.add_column("Type")
table.add_column("Status")
for result in table_results.results:
status = "Success" if result.success else "Failure"
color = "green" if result.success else "red"
table.add_row(result.name, result.type, f"[{color}]{status}[/{color}]")
console.print(table)
console.print() # Print a new line
def update_settings(
config: str,
cache: str,

View file

@ -89,7 +89,7 @@ def setup_app(
if __name__ == "__main__":
import uvicorn
from langflow.utils.util import get_number_of_workers
from langflow.__main__ import get_number_of_workers
configure()
uvicorn.run(

View file

@ -5,13 +5,9 @@ from functools import wraps
from typing import Optional, Dict, Any, Union
from docstring_parser import parse
from langflow.__main__ import console # type: ignore
from langflow.template.frontend_node.constants import FORCE_SHOW_FIELDS
from langflow.utils import constants
from langflow.utils.logger import logger
from multiprocess import cpu_count # type: ignore
from rich.table import Table # type: ignore
def build_template_from_function(
@ -460,29 +456,3 @@ def add_options_to_field(
value["options"] = options_map[class_name]
value["list"] = True
value["value"] = options_map[class_name][0]
def get_number_of_workers(workers=None):
if workers == -1 or workers is None:
workers = (cpu_count() * 2) + 1
logger.debug(f"Number of workers: {workers}")
return workers
def display_results(results):
"""
Display the results of the migration.
"""
for table_results in results:
table = Table(title=f"Migration {table_results.table_name}")
table.add_column("Name")
table.add_column("Type")
table.add_column("Status")
for result in table_results.results:
status = "Success" if result.success else "Failure"
color = "green" if result.success else "red"
table.add_row(result.name, result.type, f"[{color}]{status}[/{color}]")
console.print(table)
console.print() # Print a new line