File Tracker#

FileTracker uses a local json file as the backend.

Declare Your Own Tracker#

You have to implement the get_path() method to define where to store the tracker data.

[1]:
import shutil
import random
import dataclasses
from pathlib import Path

from abstract_tracker.api import logger, FileTracker, TaskLockedError

from rich import print as rprint

dir_data = Path.cwd() / "data"

@dataclasses.dataclass
class MyTracker(FileTracker):
    @classmethod
    def get_path(cls, id) -> Path:
        """
        The path of the tracker file.
        """
        return dir_data.joinpath(f"{id}.json")

    @classmethod
    def get_expire(cls) -> int:
        """
        Number of seconds before a lock expires for this tracker.

        If you don't declare this, then it will use default = 900 (15 minutes)
        """
        return 10

    @classmethod
    def get_max_attempts(cls) -> int:
        """
        Maximum number of attempts before this task is considered exhaused.

        If you don't declare this, then it will use default = 3
        """
        return 3


class TaskError(Exception):
    pass


# define some dummy task
def run_good_task():
    logger.info("run good task")


def run_bad_task():
    logger.info("run bad task")
    raise TaskError("task failed")
[2]:
# clean up existing data, let's start from scratch
shutil.rmtree(dir_data, ignore_errors=True)
dir_data.mkdir(parents=True, exist_ok=True)

Create a new Tracker#

[3]:
# try to load the existing tracker from file backend, if it doesn't exist, then it will be None
tracker = MyTracker.load(id=1)
if tracker is None: # if not exist, create a new tracker with pending status
    tracker = MyTracker.new(id=1)
rprint(tracker)
MyTracker(
    id=1,
    status=0,
    attempts=0,
    create_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 114622, tzinfo=datetime.timezone.utc),
    update_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 114622, tzinfo=datetime.timezone.utc),
    lock=None,
    lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
    lock_expire_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
    data={},
    errors={}
)

Use Context Manager to Manage Lock and Status Automatically#

[4]:
with tracker.start(verbose=True):
    run_good_task()
+----- ⏱ ⏩ start task(id=1, status=0 (pending), attempts=1) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run good task
| +----- end task logging -----------------------------------------------------+
| task succeeded, set status = 40 (✅ succeeded) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=1 status=40)) -----------------------------------------+
[5]:
tracker = MyTracker.load(id=1)
rprint(tracker)
print(f"{tracker.status_name = }")
MyTracker(
    id=1,
    status=40,
    attempts=1,
    create_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 114622, tzinfo=datetime.timezone.utc),
    update_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 162125, tzinfo=datetime.timezone.utc),
    lock=None,
    lock_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 160961, tzinfo=datetime.timezone.utc),
    lock_expire_time=datetime.datetime(2024, 1, 9, 4, 51, 17, 160961, tzinfo=datetime.timezone.utc),
    data={},
    errors={}
)
tracker.status_name = 'succeeded'

Automatically Log Error Message#

[6]:
# test on another task (id=2)
tracker = MyTracker.new(id=2)
[7]:
with tracker.start(verbose=True):
    run_bad_task()
+----- ⏱ ⏩ start task(id=2, status=0 (pending), attempts=1) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed, set status = 20 (❌ failed) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=2 status=20)) -----------------------------------------+
---------------------------------------------------------------------------
TaskError                                 Traceback (most recent call last)
Cell In[7], line 2
      1 with tracker.start(verbose=True):
----> 2     run_bad_task()

Cell In[1], line 51, in run_bad_task()
     49 def run_bad_task():
     50     logger.info("run bad task")
---> 51     raise TaskError("task failed")

TaskError: task failed
[8]:
tracker = MyTracker.load(id=2)
rprint(tracker)
print(f"{tracker.status_name = }")
MyTracker(
    id=2,
    status=20,
    attempts=1,
    create_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 172294, tzinfo=datetime.timezone.utc),
    update_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 315313, tzinfo=datetime.timezone.utc),
    lock=None,
    lock_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 313203, tzinfo=datetime.timezone.utc),
    lock_expire_time=datetime.datetime(2024, 1, 9, 4, 51, 17, 313203, tzinfo=datetime.timezone.utc),
    data={},
    errors={
        'error': "TaskError('task failed')",
        'traceback': 'Traceback (most recent call last):\n  File 
"/Users/sanhehu/Documents/GitHub/abstract_tracker-project/abstract_tracker/base.py", line 370, in start\n    yield 
self\n  File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_7135/1894269443.py", line 2, in <module>\n
run_bad_task()\n  File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_7135/2741605091.py", line 51, in
run_bad_task\n    raise TaskError("task failed")\nTaskError: task failed\n'
    }
)
tracker.status_name = 'failed'

Automatically Set Status as Exhausted When Reach Max Attempts#

[9]:
with tracker.start(verbose=True):
    run_bad_task()
+----- ⏱ ⏩ start task(id=2, status=20 (failed), attempts=2) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed, set status = 20 (❌ failed) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=2 status=20)) -----------------------------------------+
---------------------------------------------------------------------------
TaskError                                 Traceback (most recent call last)
Cell In[9], line 2
      1 with tracker.start(verbose=True):
----> 2     run_bad_task()

Cell In[1], line 51, in run_bad_task()
     49 def run_bad_task():
     50     logger.info("run bad task")
---> 51     raise TaskError("task failed")

TaskError: task failed
[10]:
with tracker.start(verbose=True):
    run_bad_task()
+----- ⏱ ⏩ start task(id=2, status=20 (failed), attempts=3) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed 3 times already, set status = 30 (🚫 ignored) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=2 status=30)) -----------------------------------------+
---------------------------------------------------------------------------
TaskError                                 Traceback (most recent call last)
Cell In[10], line 2
      1 with tracker.start(verbose=True):
----> 2     run_bad_task()

Cell In[1], line 51, in run_bad_task()
     49 def run_bad_task():
     50     logger.info("run bad task")
---> 51     raise TaskError("task failed")

TaskError: task failed
[11]:
with tracker.start(verbose=True):
    run_bad_task()
+----- ⏱ ⏩ start task(id=2, status=30 (exhausted), attempts=4) ----------------+
| the task is 🚫 exhausted, do nothing!
+----- ⏰ ⏹️ end task(id=2 status=30)) -----------------------------------------+
---------------------------------------------------------------------------
NoMoreRetryError                          Traceback (most recent call last)
Cell In[11], line 1
----> 1 with tracker.start(verbose=True):
      2     run_bad_task()

File ~/.pyenv/versions/3.8.13/lib/python3.8/contextlib.py:113, in _GeneratorContextManager.__enter__(self)
    111 del self.args, self.kwds, self.func
    112 try:
--> 113     return next(self.gen)
    114 except StopIteration:
    115     raise RuntimeError("generator didn't yield") from None

File ~/Documents/GitHub/abstract_tracker-project/abstract_tracker/base.py:351, in BaseTracker.start(self, lock, now, expire, ignore_lock, force_execution, verbose)
    349         logger.info(f"the task is 🚫 exhausted, do nothing!")
    350         logger.ruler(f"⏰ ⏹️ end task(id={self.id!r} status={self.status!r}))")
--> 351         raise NoMoreRetryError(
    352             f"Already tried {self.attempts} times, No more retry for task {self.id}."
    353         )
    355 if self.status == status_enum.ignored.value:  # pragma: no cover
    356     if force_execution is False:

NoMoreRetryError: Already tried 3 times, No more retry for task 2.

Automatically Set Concurrency Lock To Prevent Double Consumption#

[12]:
tracker1 = MyTracker.new(id=3)
with tracker1.start(verbose=True):
    # Another worker load the tracker data
    tracker2 = MyTracker.load(id=3)
    with logger.nested(): # just make the logging nicer
        try:
            # Worker 2 try to run task
            with tracker2.start(verbose=True):
                run_good_task() # worker 2 won't be able to run task
        except TaskLockedError as e:
            pass
    run_good_task() # worker 1 then can finish the task
tracker = MyTracker.load(id=3)
rprint(tracker)
print(f"{tracker.status_name = }")
+----- ⏱ ⏩ start task(id=3, status=0 (pending), attempts=1) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ +----- ⏱ ⏩ start task(id=3, status=10 (in_progress), attempts=2) ----------+
| ⏳ | 🔓 the task is locked, do nothing!
| ⏳ +----- ⏰ ⏹️ end task(id=3 status=10)) -------------------------------------+
| ⏳ run good task
| +----- end task logging -----------------------------------------------------+
| task succeeded, set status = 40 (✅ succeeded) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=3 status=40)) -----------------------------------------+
MyTracker(
    id=3,
    status=40,
    attempts=1,
    create_time=datetime.datetime(2024, 1, 9, 4, 51, 8, 928509, tzinfo=datetime.timezone.utc),
    update_time=datetime.datetime(2024, 1, 9, 4, 51, 8, 936543, tzinfo=datetime.timezone.utc),
    lock=None,
    lock_time=datetime.datetime(2024, 1, 9, 4, 51, 8, 932794, tzinfo=datetime.timezone.utc),
    lock_expire_time=datetime.datetime(2024, 1, 9, 4, 51, 18, 932794, tzinfo=datetime.timezone.utc),
    data={},
    errors={}
)
tracker.status_name = 'succeeded'