base#
This module implements the abstraction layer of a BaseTracker.
- class abstract_tracker.base.BaseTracker[source]#
Tracker is a metadata container to track the processing status of a task.
It should have the following features:
be able to track the status of a task
- provide a lock mechanism to prevent multiple workers from processing
the same task at the same time
- provide a retry and exhaustion mechanism to prevent the task
from being processed indefinitely.
be able to capture the error information of the task for debugging
You can use any framework to implement the tracker subclass, for instance:
- Parameters:
id – The unique identifier of the tracker. Usually, it’s the same as the task id.
status – Indicate the status of the tracker.
create_time – when the tracker is created. Usually, it’s the time a task is scheduled as to do.
update_time – when the tracker status is updated.
attempts – how many times we have tried to process the tracker.
lock – a concurrency control mechanism. It is an uuid string. if the worker has the same lock as the tracker, it can process the tracker even it is locked.
lock_time – when this tracker is locked. so other workers can’t work on it.
lock_expire_time – when this lock will expire.
data – arbitrary data in python dictionary.
errors – arbitrary error data in python dictionary.
- classmethod get_status_enum() Type[T_STATUS_ENUM][source]#
The
StatusEnumor a subclass bound to this tracker.
- classmethod get_max_attempts() int[source]#
Maximum number of attempts before this task is considered exhaused.
- classmethod load(id: str, **kwargs) Optional[T_TRACKER][source]#
Create a tracker object by loading from the backend. If not found, return None.
- lock_it(now: Optional[datetime] = None, expire_time: Optional[datetime] = None)[source]#
Lock the tracker.
Note
This method will NOT dump the tracker data to backend.
- unlock_it()[source]#
Unlock the tracker.
Note
This method will NOT dump the tracker data to backend.
- is_locked(lock: Optional[str] = None, now: Optional[datetime] = None) bool[source]#
Check if the tracker is locked.
If the
self.lockis None, then consider it is not locked. If theself.lockis not None, then compare it to the manually providedlockparameter. If they are the same, then consider it is not locked. Otherwise, check if the lock is expired.- Parameters:
lock – the lock to compare with
self.lock.now – the current time. If not provided, use
get_utc_now().
- mark_as_in_progress(now: Optional[datetime] = None, expire: Optional[int] = None)[source]#
Mark the tracker as in progress.
- start(lock: Optional[str] = None, now: Optional[datetime] = None, expire: Optional[int] = None, ignore_lock: bool = False, force_execution: bool = False, verbose: bool = False)[source]#
A context manager to execute a task, and handle error automatically.
- It will set the status to
in_progressand set the lock. If the task is already locked, it will raise a
TaskLockedError.
- It will set the status to
If the task succeeded, it will set the status to the
success.- If the task fail, it will set the status to the
failedand log the error to
errorsattribute.
- If the task fail, it will set the status to the
- If the task failed N times in a row, it will set the status to the
exhausted.
- Parameters:
lock – see
is_locked()now – the current time. If not provided, use
get_utc_now().expire – the lock expire time in seconds. If not provided, use
get_expire`().ignore_lock – if True, ignore the lock and execute the task anyway.
force_execution – if True, force to execute even the current status is exhausted or ignored.