S3 Tracker#
S3Tracker
uses AWS S3 as the backend.
Declare Your Own Tracker#
You have to implement the get_bucket_key()
method to define where to store the tracker data.
[1]:
import shutil
import random
import dataclasses
from abstract_tracker.api import logger, S3Tracker, TaskLockedError
from rich import print as rprint
bucket = "my-bucket"
@dataclasses.dataclass
class MyTracker(S3Tracker):
@classmethod
def get_bucket_key(self, id):
"""
The S3 object of the tracker file.
"""
return bucket, f"{id}.json"
@classmethod
def get_s3_client(cls):
return boto3.client("s3")
class TaskError(Exception):
pass
# define some dummy task
def run_good_task():
logger.info("run good task")
def run_bad_task():
logger.info("run bad task")
raise TaskError("task failed")
Setup AWS Mock (Or you can use a real one)#
[4]:
import moto
import boto3
mock_sts = moto.mock_sts()
mock_s3 = moto.mock_s3()
mock_sts.start()
mock_s3.start()
print(boto3.client("sts").get_caller_identity()["Account"]) # 123456789012 is a dummy, in-memory AWS account
s3_client = boto3.client("s3")
_ = s3_client.create_bucket(Bucket=bucket)
123456789012
Create a new Tracker#
[5]:
# try to load the existing tracker from file backend, if it doesn't exist, then it will be None
tracker = MyTracker.load(id=1)
if tracker is None: # if not exist, create a new tracker with pending status
tracker = MyTracker.new(id=1)
rprint(tracker)
MyTracker( id=1, status=0, attempts=0, create_time=datetime.datetime(2024, 1, 9, 5, 23, 20, 524127, tzinfo=datetime.timezone.utc), update_time=datetime.datetime(2024, 1, 9, 5, 23, 20, 524127, tzinfo=datetime.timezone.utc), lock=None, lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), lock_expire_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), data={}, errors={} )
Use Context Manager to Manage Lock and Status Automatically#
[6]:
with tracker.start(verbose=True):
run_good_task()
+----- ⹠⊠start task(id=1, status=0 (pending), attempts=1) -------------------+
| set status = 10 (âŗ in_progress) and đ lock the task.
| +----- start task logging ---------------------------------------------------+
| âŗ run good task
| +----- end task logging -----------------------------------------------------+
| task succeeded, set status = 40 (â
succeeded) and đ unlock the task.
+----- â° âšī¸ end task(id=1 status=40)) -----------------------------------------+
[7]:
tracker = MyTracker.load(id=1)
rprint(tracker)
print(f"{tracker.status_name = }")
MyTracker( id=1, status=40, attempts=1, create_time=datetime.datetime(2024, 1, 9, 5, 23, 20, 524127, tzinfo=datetime.timezone.utc), update_time=datetime.datetime(2024, 1, 9, 5, 23, 48, 48796, tzinfo=datetime.timezone.utc), lock=None, lock_time=datetime.datetime(2024, 1, 9, 5, 23, 47, 991701, tzinfo=datetime.timezone.utc), lock_expire_time=datetime.datetime(2024, 1, 9, 5, 38, 47, 991701, tzinfo=datetime.timezone.utc), data={}, errors={} )
tracker.status_name = 'succeeded'
[8]:
# test on another task (id=2)
tracker = MyTracker.new(id=2)
[9]:
with tracker.start(verbose=True):
run_bad_task()
+----- ⹠⊠start task(id=2, status=0 (pending), attempts=1) -------------------+
| set status = 10 (âŗ in_progress) and đ lock the task.
| +----- start task logging ---------------------------------------------------+
| âŗ run bad task
| +----- end task logging -----------------------------------------------------+
| â task failed, set status = 20 (â failed) and đ unlock the task.
+----- â° âšī¸ end task(id=2 status=20)) -----------------------------------------+
---------------------------------------------------------------------------
TaskError Traceback (most recent call last)
Cell In[9], line 2
1 with tracker.start(verbose=True):
----> 2 run_bad_task()
Cell In[1], line 36, in run_bad_task()
34 def run_bad_task():
35 logger.info("run bad task")
---> 36 raise TaskError("task failed")
TaskError: task failed
[10]:
tracker = MyTracker.load(id=2)
rprint(tracker)
print(f"{tracker.status_name = }")
MyTracker( id=2, status=20, attempts=1, create_time=datetime.datetime(2024, 1, 9, 5, 23, 57, 309818, tzinfo=datetime.timezone.utc), update_time=datetime.datetime(2024, 1, 9, 5, 24, 3, 974032, tzinfo=datetime.timezone.utc), lock=None, lock_time=datetime.datetime(2024, 1, 9, 5, 24, 3, 931506, tzinfo=datetime.timezone.utc), lock_expire_time=datetime.datetime(2024, 1, 9, 5, 39, 3, 931506, tzinfo=datetime.timezone.utc), data={}, errors={ 'error': "TaskError('task failed')", 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/abstract_tracker-project/abstract_tracker/base.py", line 370, in start\n yield self\n File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_9160/1894269443.py", line 2, in <module>\n run_bad_task()\n File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_9160/51047758.py", line 36, in run_bad_task\n raise TaskError("task failed")\nTaskError: task failed\n' } )
tracker.status_name = 'failed'
[ ]: