🐛 fix(base.py): remove unnecessary import statement for celery.result.AsyncResult

🔀 refactor(base.py): refactor the build method in Vertex class to improve readability and remove redundant code
 feat(base.py): add get_task method to Vertex class to retrieve the task from celery using task_id
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-17 10:15:38 -03:00
commit 24344ee0fa

View file

@ -73,6 +73,13 @@ class Vertex:
self.base_type = base_type
break
def get_task(self):
# using the task_id, get the task from celery
# and return it
from celery.result import AsyncResult
return AsyncResult(self.task_id)
def _build_params(self):
# sourcery skip: merge-list-append, remove-redundant-if
# Some params are required, some are optional
@ -179,16 +186,9 @@ class Vertex:
if self._built:
return self._built_object
# Check if there's a task_id, which means it was sent to a Celery worker
try:
from celery.result import AsyncResult
except ImportError:
# If Celery is not installed, just build the vertex locally
return self.build()
if self.is_task and self.task_id is not None:
result = AsyncResult(self.task_id).get(
timeout=timeout
) # Blocking until result is ready or timeout
task = self.get_task()
result = task.get(timeout=timeout)
if result is not None: # If result is ready
self._update_built_object_and_artifacts(result)
return self._built_object
@ -197,7 +197,8 @@ class Vertex:
pass
# If there's no task_id, build the vertex locally
return self.build()
self.build()
return self._built_object
def _build_node_and_update_params(self, key, node):
"""