File Tracker#
FileTracker
uses a local json file as the backend.
Declare Your Own Tracker#
You have to implement the get_path()
method to define where to store the tracker data.
[1]:
import shutil
import random
import dataclasses
from pathlib import Path
from abstract_tracker.api import logger, FileTracker, TaskLockedError
from rich import print as rprint
dir_data = Path.cwd() / "data"
@dataclasses.dataclass
class MyTracker(FileTracker):
@classmethod
def get_path(cls, id) -> Path:
"""
The path of the tracker file.
"""
return dir_data.joinpath(f"{id}.json")
@classmethod
def get_expire(cls) -> int:
"""
Number of seconds before a lock expires for this tracker.
If you don't declare this, then it will use default = 900 (15 minutes)
"""
return 10
@classmethod
def get_max_attempts(cls) -> int:
"""
Maximum number of attempts before this task is considered exhaused.
If you don't declare this, then it will use default = 3
"""
return 3
class TaskError(Exception):
pass
# define some dummy task
def run_good_task():
logger.info("run good task")
def run_bad_task():
logger.info("run bad task")
raise TaskError("task failed")
[2]:
# clean up existing data, let's start from scratch
shutil.rmtree(dir_data, ignore_errors=True)
dir_data.mkdir(parents=True, exist_ok=True)
Create a new Tracker#
[3]:
# try to load the existing tracker from file backend, if it doesn't exist, then it will be None
tracker = MyTracker.load(id=1)
if tracker is None: # if not exist, create a new tracker with pending status
tracker = MyTracker.new(id=1)
rprint(tracker)
MyTracker( id=1, status=0, attempts=0, create_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 114622, tzinfo=datetime.timezone.utc), update_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 114622, tzinfo=datetime.timezone.utc), lock=None, lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), lock_expire_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), data={}, errors={} )
Use Context Manager to Manage Lock and Status Automatically#
[4]:
with tracker.start(verbose=True):
run_good_task()
+----- ⏱ ⏩ start task(id=1, status=0 (pending), attempts=1) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run good task
| +----- end task logging -----------------------------------------------------+
| task succeeded, set status = 40 (✅ succeeded) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=1 status=40)) -----------------------------------------+
[5]:
tracker = MyTracker.load(id=1)
rprint(tracker)
print(f"{tracker.status_name = }")
MyTracker( id=1, status=40, attempts=1, create_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 114622, tzinfo=datetime.timezone.utc), update_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 162125, tzinfo=datetime.timezone.utc), lock=None, lock_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 160961, tzinfo=datetime.timezone.utc), lock_expire_time=datetime.datetime(2024, 1, 9, 4, 51, 17, 160961, tzinfo=datetime.timezone.utc), data={}, errors={} )
tracker.status_name = 'succeeded'
Automatically Log Error Message#
[6]:
# test on another task (id=2)
tracker = MyTracker.new(id=2)
[7]:
with tracker.start(verbose=True):
run_bad_task()
+----- ⏱ ⏩ start task(id=2, status=0 (pending), attempts=1) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed, set status = 20 (❌ failed) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=2 status=20)) -----------------------------------------+
---------------------------------------------------------------------------
TaskError Traceback (most recent call last)
Cell In[7], line 2
1 with tracker.start(verbose=True):
----> 2 run_bad_task()
Cell In[1], line 51, in run_bad_task()
49 def run_bad_task():
50 logger.info("run bad task")
---> 51 raise TaskError("task failed")
TaskError: task failed
[8]:
tracker = MyTracker.load(id=2)
rprint(tracker)
print(f"{tracker.status_name = }")
MyTracker( id=2, status=20, attempts=1, create_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 172294, tzinfo=datetime.timezone.utc), update_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 315313, tzinfo=datetime.timezone.utc), lock=None, lock_time=datetime.datetime(2024, 1, 9, 4, 51, 7, 313203, tzinfo=datetime.timezone.utc), lock_expire_time=datetime.datetime(2024, 1, 9, 4, 51, 17, 313203, tzinfo=datetime.timezone.utc), data={}, errors={ 'error': "TaskError('task failed')", 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/abstract_tracker-project/abstract_tracker/base.py", line 370, in start\n yield self\n File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_7135/1894269443.py", line 2, in <module>\n run_bad_task()\n File "/var/folders/3y/7t5ll4sn6x76g8rhfqlc36dw0000gn/T/ipykernel_7135/2741605091.py", line 51, in run_bad_task\n raise TaskError("task failed")\nTaskError: task failed\n' } )
tracker.status_name = 'failed'
Automatically Set Status as Exhausted When Reach Max Attempts#
[9]:
with tracker.start(verbose=True):
run_bad_task()
+----- ⏱ ⏩ start task(id=2, status=20 (failed), attempts=2) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed, set status = 20 (❌ failed) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=2 status=20)) -----------------------------------------+
---------------------------------------------------------------------------
TaskError Traceback (most recent call last)
Cell In[9], line 2
1 with tracker.start(verbose=True):
----> 2 run_bad_task()
Cell In[1], line 51, in run_bad_task()
49 def run_bad_task():
50 logger.info("run bad task")
---> 51 raise TaskError("task failed")
TaskError: task failed
[10]:
with tracker.start(verbose=True):
run_bad_task()
+----- ⏱ ⏩ start task(id=2, status=20 (failed), attempts=3) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ run bad task
| +----- end task logging -----------------------------------------------------+
| ❌ task failed 3 times already, set status = 30 (🚫 ignored) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=2 status=30)) -----------------------------------------+
---------------------------------------------------------------------------
TaskError Traceback (most recent call last)
Cell In[10], line 2
1 with tracker.start(verbose=True):
----> 2 run_bad_task()
Cell In[1], line 51, in run_bad_task()
49 def run_bad_task():
50 logger.info("run bad task")
---> 51 raise TaskError("task failed")
TaskError: task failed
[11]:
with tracker.start(verbose=True):
run_bad_task()
+----- ⏱ ⏩ start task(id=2, status=30 (exhausted), attempts=4) ----------------+
| the task is 🚫 exhausted, do nothing!
+----- ⏰ ⏹️ end task(id=2 status=30)) -----------------------------------------+
---------------------------------------------------------------------------
NoMoreRetryError Traceback (most recent call last)
Cell In[11], line 1
----> 1 with tracker.start(verbose=True):
2 run_bad_task()
File ~/.pyenv/versions/3.8.13/lib/python3.8/contextlib.py:113, in _GeneratorContextManager.__enter__(self)
111 del self.args, self.kwds, self.func
112 try:
--> 113 return next(self.gen)
114 except StopIteration:
115 raise RuntimeError("generator didn't yield") from None
File ~/Documents/GitHub/abstract_tracker-project/abstract_tracker/base.py:351, in BaseTracker.start(self, lock, now, expire, ignore_lock, force_execution, verbose)
349 logger.info(f"the task is 🚫 exhausted, do nothing!")
350 logger.ruler(f"⏰ ⏹️ end task(id={self.id!r} status={self.status!r}))")
--> 351 raise NoMoreRetryError(
352 f"Already tried {self.attempts} times, No more retry for task {self.id}."
353 )
355 if self.status == status_enum.ignored.value: # pragma: no cover
356 if force_execution is False:
NoMoreRetryError: Already tried 3 times, No more retry for task 2.
Automatically Set Concurrency Lock To Prevent Double Consumption#
[12]:
tracker1 = MyTracker.new(id=3)
with tracker1.start(verbose=True):
# Another worker load the tracker data
tracker2 = MyTracker.load(id=3)
with logger.nested(): # just make the logging nicer
try:
# Worker 2 try to run task
with tracker2.start(verbose=True):
run_good_task() # worker 2 won't be able to run task
except TaskLockedError as e:
pass
run_good_task() # worker 1 then can finish the task
tracker = MyTracker.load(id=3)
rprint(tracker)
print(f"{tracker.status_name = }")
+----- ⏱ ⏩ start task(id=3, status=0 (pending), attempts=1) -------------------+
| set status = 10 (⏳ in_progress) and 🔓 lock the task.
| +----- start task logging ---------------------------------------------------+
| ⏳ +----- ⏱ ⏩ start task(id=3, status=10 (in_progress), attempts=2) ----------+
| ⏳ | 🔓 the task is locked, do nothing!
| ⏳ +----- ⏰ ⏹️ end task(id=3 status=10)) -------------------------------------+
| ⏳ run good task
| +----- end task logging -----------------------------------------------------+
| task succeeded, set status = 40 (✅ succeeded) and 🔐 unlock the task.
+----- ⏰ ⏹️ end task(id=3 status=40)) -----------------------------------------+
MyTracker( id=3, status=40, attempts=1, create_time=datetime.datetime(2024, 1, 9, 4, 51, 8, 928509, tzinfo=datetime.timezone.utc), update_time=datetime.datetime(2024, 1, 9, 4, 51, 8, 936543, tzinfo=datetime.timezone.utc), lock=None, lock_time=datetime.datetime(2024, 1, 9, 4, 51, 8, 932794, tzinfo=datetime.timezone.utc), lock_expire_time=datetime.datetime(2024, 1, 9, 4, 51, 18, 932794, tzinfo=datetime.timezone.utc), data={}, errors={} )
tracker.status_name = 'succeeded'