S3 Tracker#

S3Tracker uses AWS S3 as the backend.

Declare Your Own Tracker#

You have to implement the get_bucket_key() method to define where to store the tracker data.

[1]:
import shutil
import random
import dataclasses

from abstract_tracker.api import logger, S3Tracker, TaskLockedError

from rich import print as rprint

bucket = "my-bucket"

@dataclasses.dataclass
class MyTracker(S3Tracker):
    @classmethod
    def get_bucket_key(self, id):
        """
        The S3 object of the tracker file.
        """
        return bucket, f"{id}.json"

    @classmethod
    def get_s3_client(cls):
        return boto3.client("s3")


class TaskError(Exception):
    pass


# define some dummy task
def run_good_task():
    logger.info("run good task")


def run_bad_task():
    logger.info("run bad task")
    raise TaskError("task failed")

Setup AWS Mock (Or you can use a real one)#

[4]:
import moto
import boto3

mock_sts = moto.mock_sts()
mock_s3 = moto.mock_s3()
mock_sts.start()
mock_s3.start()

print(boto3.client("sts").get_caller_identity()["Account"]) # 123456789012 is a dummy, in-memory AWS account
s3_client = boto3.client("s3")
_ = s3_client.create_bucket(Bucket=bucket)
123456789012

Create a new Tracker#

[5]:
# try to load the existing tracker from file backend, if it doesn't exist, then it will be None
tracker = MyTracker.load(id=1)
if tracker is None: # if not exist, create a new tracker with pending status
    tracker = MyTracker.new(id=1)
rprint(tracker)
MyTracker(
    id=1,
    status=0,
    attempts=0,
    create_time=datetime.datetime(2024, 1, 9, 5, 23, 20, 524127, tzinfo=datetime.timezone.utc),
    update_time=datetime.datetime(2024, 1, 9, 5, 23, 20, 524127, tzinfo=datetime.timezone.utc),
    lock=None,
    lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
    lock_expire_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
    data={},
    errors={}
)

Use Context Manager to Manage Lock and Status Automatically#

[6]:
with tracker.start(verbose=True):
    run_good_task()
+----- ⏱ ⏩ start task(id=1, status=0 (pending), attempts=1) -------------------+
| set status = 10 (âŗ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| âŗ run good task
| +----- end task logging -----------------------------------------------------+
| task succeeded, set status = 40 (✅ succeeded) and 🔐 unlock the task.
+----- ⏰ ⏚ī¸ end task(id=1 status=40)) -----------------------------------------+
[7]:
tracker = MyTracker.load(id=1)
rprint(tracker)
print(f"{tracker.status_name = }")
MyTracker(
    id=1,
    status=40,
    attempts=1,
    create_time=datetime.datetime(2024, 1, 9, 5, 23, 20, 524127, tzinfo=datetime.timezone.utc),
    update_time=datetime.datetime(2024, 1, 9, 5, 23, 48, 48796, tzinfo=datetime.timezone.utc),
    lock=None,
    lock_time=datetime.datetime(2024, 1, 9, 5, 23, 47, 991701, tzinfo=datetime.timezone.utc),
    lock_expire_time=datetime.datetime(2024, 1, 9, 5, 38, 47, 991701, tzinfo=datetime.timezone.utc),
    data={},
    errors={}
)
tracker.status_name = 'succeeded'
[8]:
# test on another task (id=2)
tracker = MyTracker.new(id=2)
[9]:
with tracker.start(verbose=True):
    run_bad_task()
+----- ⏱ ⏩ start task(id=2, status=0 (pending), attempts=1) -------------------+
| set status = 10 (âŗ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| âŗ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed, set status = 20 (❌ failed) and 🔐 unlock the task.
+----- ⏰ ⏚ī¸ end task(id=2 status=20)) -----------------------------------------+
---------------------------------------------------------------------------
TaskError                                 Traceback (most recent call last)
Cell In[9], line 2
      1 with tracker.start(verbose=True):
----> 2     run_bad_task()

Cell In[1], line 36, in run_bad_task()
     34 def run_bad_task():
     35     logger.info("run bad task")
---> 36     raise TaskError("task failed")

TaskError: task failed
[10]:
tracker = MyTracker.load(id=2)
rprint(tracker)
print(f"{tracker.status_name = }")
MyTracker(
    id=2,
    status=20,
    attempts=1,
    create_time=datetime.datetime(2024, 1, 9, 5, 23, 57, 309818, tzinfo=datetime.timezone.utc),
    update_time=datetime.datetime(2024, 1, 9, 5, 24, 3, 974032, tzinfo=datetime.timezone.utc),
    lock=None,
    lock_time=datetime.datetime(2024, 1, 9, 5, 24, 3, 931506, tzinfo=datetime.timezone.utc),
    lock_expire_time=datetime.datetime(2024, 1, 9, 5, 39, 3, 931506, tzinfo=datetime.timezone.utc),
    data={},
    errors={
        'error': "TaskError('task failed')",
        'traceback': 'Traceback (most recent call last):\n  File 
"/Users/sanhehu/Documents/GitHub/abstract_tracker-project/abstract_tracker/base.py", line 370, in start\n    yield 
self\n  File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_9160/1894269443.py", line 2, in <module>\n
run_bad_task()\n  File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_9160/51047758.py", line 36, in 
run_bad_task\n    raise TaskError("task failed")\nTaskError: task failed\n'
    }
)
tracker.status_name = 'failed'
[ ]: