diff --git a/src/backend/base/langflow/components/agents/JsonAgent.py b/src/backend/base/langflow/components/agents/JsonAgent.py index 56bdacf14..d05cd09bf 100644 --- a/src/backend/base/langflow/components/agents/JsonAgent.py +++ b/src/backend/base/langflow/components/agents/JsonAgent.py @@ -22,7 +22,8 @@ class JsonAgentComponent(LCAgentComponent): def build_agent(self) -> AgentExecutor: if self.path.endswith("yaml") or self.path.endswith("yml"): - yaml_dict = yaml.load(open(self.path, "r"), Loader=yaml.FullLoader) + with open(self.path, "r") as file: + yaml_dict = yaml.load(file, Loader=yaml.FullLoader) spec = JsonSpec(dict_=yaml_dict) else: spec = JsonSpec.from_file(Path(self.path)) diff --git a/src/backend/base/langflow/components/agents/OpenAPIAgent.py b/src/backend/base/langflow/components/agents/OpenAPIAgent.py index 798057fdb..e1972b9ed 100644 --- a/src/backend/base/langflow/components/agents/OpenAPIAgent.py +++ b/src/backend/base/langflow/components/agents/OpenAPIAgent.py @@ -24,7 +24,8 @@ class OpenAPIAgentComponent(LCAgentComponent): def build_agent(self) -> AgentExecutor: if self.path.endswith("yaml") or self.path.endswith("yml"): - yaml_dict = yaml.load(open(self.path, "r"), Loader=yaml.FullLoader) + with open(self.path, "r") as file: + yaml_dict = yaml.load(file, Loader=yaml.FullLoader) spec = JsonSpec(dict_=yaml_dict) else: spec = JsonSpec.from_file(Path(self.path)) diff --git a/src/backend/base/langflow/load/utils.py b/src/backend/base/langflow/load/utils.py index 4012bc934..8050ba51e 100644 --- a/src/backend/base/langflow/load/utils.py +++ b/src/backend/base/langflow/load/utils.py @@ -20,11 +20,12 @@ def upload(file_path, host, flow_id): """ try: url = f"{host}/api/v1/upload/{flow_id}" - response = httpx.post(url, files={"file": open(file_path, "rb")}) - if response.status_code == 200: - return response.json() - else: - raise Exception(f"Error uploading file: {response.status_code}") + with open(file_path, "rb") as file: + response = httpx.post(url, files={"file": file}) + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Error uploading file: {response.status_code}") except Exception as e: raise Exception(f"Error uploading file: {e}") diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index b14a85ba9..22de47dac 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -169,59 +169,59 @@ class DatabaseService(Service): # which is a buffer # I don't want to output anything # subprocess.DEVNULL is an int - buffer = open(self.script_location / "alembic.log", "w") - alembic_cfg = Config(stdout=buffer) - # alembic_cfg.attributes["connection"] = session - alembic_cfg.set_main_option("script_location", str(self.script_location)) - alembic_cfg.set_main_option("sqlalchemy.url", self.database_url.replace("%", "%%")) + with open(self.script_location / "alembic.log", "w") as buffer: + alembic_cfg = Config(stdout=buffer) + # alembic_cfg.attributes["connection"] = session + alembic_cfg.set_main_option("script_location", str(self.script_location)) + alembic_cfg.set_main_option("sqlalchemy.url", self.database_url.replace("%", "%%")) - should_initialize_alembic = False - with Session(self.engine) as session: - # If the table does not exist it throws an error - # so we need to catch it - try: - session.exec(text("SELECT * FROM alembic_version")) - except Exception: - logger.info("Alembic not initialized") - should_initialize_alembic = True + should_initialize_alembic = False + with Session(self.engine) as session: + # If the table does not exist it throws an error + # so we need to catch it + try: + session.exec(text("SELECT * FROM alembic_version")) + except Exception: + logger.info("Alembic not initialized") + should_initialize_alembic = True + + if should_initialize_alembic: + try: + self.init_alembic(alembic_cfg) + except Exception as exc: + logger.error(f"Error initializing alembic: {exc}") + raise RuntimeError("Error initializing alembic") from exc + else: + logger.info("Alembic already initialized") + + logger.info(f"Running DB migrations in {self.script_location}") - if should_initialize_alembic: try: - self.init_alembic(alembic_cfg) + buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n") + command.check(alembic_cfg) except Exception as exc: - logger.error(f"Error initializing alembic: {exc}") - raise RuntimeError("Error initializing alembic") from exc - else: - logger.info("Alembic already initialized") + if isinstance(exc, (util.exc.CommandError, util.exc.AutogenerateDiffsDetected)): + command.upgrade(alembic_cfg, "head") + time.sleep(3) - logger.info(f"Running DB migrations in {self.script_location}") + try: + buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n") + command.check(alembic_cfg) + except util.exc.AutogenerateDiffsDetected as exc: + logger.error(f"AutogenerateDiffsDetected: {exc}") + if not fix: + raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}") + try: + migrate_messages_from_monitor_service_to_database(session) + except Exception as exc: + logger.error(f"Error migrating messages from monitor service to database: {exc}") + try: + migrate_transactions_from_monitor_service_to_database(session) + except Exception as exc: + logger.error(f"Error migrating transactions from monitor service to database: {exc}") - try: - buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n") - command.check(alembic_cfg) - except Exception as exc: - if isinstance(exc, (util.exc.CommandError, util.exc.AutogenerateDiffsDetected)): - command.upgrade(alembic_cfg, "head") - time.sleep(3) - - try: - buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n") - command.check(alembic_cfg) - except util.exc.AutogenerateDiffsDetected as exc: - logger.error(f"AutogenerateDiffsDetected: {exc}") - if not fix: - raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}") - try: - migrate_messages_from_monitor_service_to_database(session) - except Exception as exc: - logger.error(f"Error migrating messages from monitor service to database: {exc}") - try: - migrate_transactions_from_monitor_service_to_database(session) - except Exception as exc: - logger.error(f"Error migrating transactions from monitor service to database: {exc}") - - if fix: - self.try_downgrade_upgrade_until_success(alembic_cfg) + if fix: + self.try_downgrade_upgrade_until_success(alembic_cfg) def try_downgrade_upgrade_until_success(self, alembic_cfg, retries=5): # Try -1 then head, if it fails, try -2 then head, etc.