fix: file open leaks (#3067)

This commit is contained in:
Nicolò Boschi 2024-07-30 14:45:42 +02:00 committed by GitHub
commit 747a1848d4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 57 additions and 54 deletions

View file

@ -22,7 +22,8 @@ class JsonAgentComponent(LCAgentComponent):
def build_agent(self) -> AgentExecutor:
if self.path.endswith("yaml") or self.path.endswith("yml"):
yaml_dict = yaml.load(open(self.path, "r"), Loader=yaml.FullLoader)
with open(self.path, "r") as file:
yaml_dict = yaml.load(file, Loader=yaml.FullLoader)
spec = JsonSpec(dict_=yaml_dict)
else:
spec = JsonSpec.from_file(Path(self.path))

View file

@ -24,7 +24,8 @@ class OpenAPIAgentComponent(LCAgentComponent):
def build_agent(self) -> AgentExecutor:
if self.path.endswith("yaml") or self.path.endswith("yml"):
yaml_dict = yaml.load(open(self.path, "r"), Loader=yaml.FullLoader)
with open(self.path, "r") as file:
yaml_dict = yaml.load(file, Loader=yaml.FullLoader)
spec = JsonSpec(dict_=yaml_dict)
else:
spec = JsonSpec.from_file(Path(self.path))

View file

@ -20,11 +20,12 @@ def upload(file_path, host, flow_id):
"""
try:
url = f"{host}/api/v1/upload/{flow_id}"
response = httpx.post(url, files={"file": open(file_path, "rb")})
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Error uploading file: {response.status_code}")
with open(file_path, "rb") as file:
response = httpx.post(url, files={"file": file})
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Error uploading file: {response.status_code}")
except Exception as e:
raise Exception(f"Error uploading file: {e}")

View file

@ -169,59 +169,59 @@ class DatabaseService(Service):
# which is a buffer
# I don't want to output anything
# subprocess.DEVNULL is an int
buffer = open(self.script_location / "alembic.log", "w")
alembic_cfg = Config(stdout=buffer)
# alembic_cfg.attributes["connection"] = session
alembic_cfg.set_main_option("script_location", str(self.script_location))
alembic_cfg.set_main_option("sqlalchemy.url", self.database_url.replace("%", "%%"))
with open(self.script_location / "alembic.log", "w") as buffer:
alembic_cfg = Config(stdout=buffer)
# alembic_cfg.attributes["connection"] = session
alembic_cfg.set_main_option("script_location", str(self.script_location))
alembic_cfg.set_main_option("sqlalchemy.url", self.database_url.replace("%", "%%"))
should_initialize_alembic = False
with Session(self.engine) as session:
# If the table does not exist it throws an error
# so we need to catch it
try:
session.exec(text("SELECT * FROM alembic_version"))
except Exception:
logger.info("Alembic not initialized")
should_initialize_alembic = True
should_initialize_alembic = False
with Session(self.engine) as session:
# If the table does not exist it throws an error
# so we need to catch it
try:
session.exec(text("SELECT * FROM alembic_version"))
except Exception:
logger.info("Alembic not initialized")
should_initialize_alembic = True
if should_initialize_alembic:
try:
self.init_alembic(alembic_cfg)
except Exception as exc:
logger.error(f"Error initializing alembic: {exc}")
raise RuntimeError("Error initializing alembic") from exc
else:
logger.info("Alembic already initialized")
logger.info(f"Running DB migrations in {self.script_location}")
if should_initialize_alembic:
try:
self.init_alembic(alembic_cfg)
buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n")
command.check(alembic_cfg)
except Exception as exc:
logger.error(f"Error initializing alembic: {exc}")
raise RuntimeError("Error initializing alembic") from exc
else:
logger.info("Alembic already initialized")
if isinstance(exc, (util.exc.CommandError, util.exc.AutogenerateDiffsDetected)):
command.upgrade(alembic_cfg, "head")
time.sleep(3)
logger.info(f"Running DB migrations in {self.script_location}")
try:
buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n")
command.check(alembic_cfg)
except util.exc.AutogenerateDiffsDetected as exc:
logger.error(f"AutogenerateDiffsDetected: {exc}")
if not fix:
raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}")
try:
migrate_messages_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating messages from monitor service to database: {exc}")
try:
migrate_transactions_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating transactions from monitor service to database: {exc}")
try:
buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n")
command.check(alembic_cfg)
except Exception as exc:
if isinstance(exc, (util.exc.CommandError, util.exc.AutogenerateDiffsDetected)):
command.upgrade(alembic_cfg, "head")
time.sleep(3)
try:
buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n")
command.check(alembic_cfg)
except util.exc.AutogenerateDiffsDetected as exc:
logger.error(f"AutogenerateDiffsDetected: {exc}")
if not fix:
raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}")
try:
migrate_messages_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating messages from monitor service to database: {exc}")
try:
migrate_transactions_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating transactions from monitor service to database: {exc}")
if fix:
self.try_downgrade_upgrade_until_success(alembic_cfg)
if fix:
self.try_downgrade_upgrade_until_success(alembic_cfg)
def try_downgrade_upgrade_until_success(self, alembic_cfg, retries=5):
# Try -1 then head, if it fails, try -2 then head, etc.