base#

This module implements the abstraction layer of a BaseTracker.

abstract_tracker.base.get_utc_now() datetime[source]#

Get time aware utc now datetime object.

class abstract_tracker.base.BaseStatusEnum(value)[source]#

An enumeration.

class abstract_tracker.base.StatusEnum(value)[source]#

An enumeration.

class abstract_tracker.base.BaseTracker[source]#

Tracker is a metadata container to track the processing status of a task.

It should have the following features:

  • be able to track the status of a task

  • provide a lock mechanism to prevent multiple workers from processing

    the same task at the same time

  • provide a retry and exhaustion mechanism to prevent the task

    from being processed indefinitely.

  • be able to capture the error information of the task for debugging

You can use any framework to implement the tracker subclass, for instance:

Parameters:
  • id – The unique identifier of the tracker. Usually, it’s the same as the task id.

  • status – Indicate the status of the tracker.

  • create_time – when the tracker is created. Usually, it’s the time a task is scheduled as to do.

  • update_time – when the tracker status is updated.

  • attempts – how many times we have tried to process the tracker.

  • lock – a concurrency control mechanism. It is an uuid string. if the worker has the same lock as the tracker, it can process the tracker even it is locked.

  • lock_time – when this tracker is locked. so other workers can’t work on it.

  • lock_expire_time – when this lock will expire.

  • data – arbitrary data in python dictionary.

  • errors – arbitrary error data in python dictionary.

classmethod get_status_enum() Type[T_STATUS_ENUM][source]#

The StatusEnum or a subclass bound to this tracker.

classmethod get_expire() int[source]#

Number of seconds before a lock expires for this tracker.

classmethod get_max_attempts() int[source]#

Maximum number of attempts before this task is considered exhaused.

classmethod load(id: str, **kwargs) Optional[T_TRACKER][source]#

Create a tracker object by loading from the backend. If not found, return None.

dump(**kwargs)[source]#

Write the tracker object to the backend.

property status_name: str#

Get the human friendly name of the status.

lock_it(now: Optional[datetime] = None, expire_time: Optional[datetime] = None)[source]#

Lock the tracker.

Note

This method will NOT dump the tracker data to backend.

unlock_it()[source]#

Unlock the tracker.

Note

This method will NOT dump the tracker data to backend.

is_locked(lock: Optional[str] = None, now: Optional[datetime] = None) bool[source]#

Check if the tracker is locked.

If the self.lock is None, then consider it is not locked. If the self.lock is not None, then compare it to the manually provided lock parameter. If they are the same, then consider it is not locked. Otherwise, check if the lock is expired.

Parameters:
  • lock – the lock to compare with self.lock.

  • now – the current time. If not provided, use get_utc_now().

mark_as_in_progress(now: Optional[datetime] = None, expire: Optional[int] = None)[source]#

Mark the tracker as in progress.

mark_as_failed_or_exhausted(e: Exception)[source]#

Mark the tracker as failed or exhausted.

mark_as_succeeded()[source]#

Mark the tracker as succeeded.

start(lock: Optional[str] = None, now: Optional[datetime] = None, expire: Optional[int] = None, ignore_lock: bool = False, force_execution: bool = False, verbose: bool = False)[source]#

A context manager to execute a task, and handle error automatically.

  1. It will set the status to in_progress and set the lock.

    If the task is already locked, it will raise a TaskLockedError.

  2. If the task succeeded, it will set the status to the success.

  3. If the task fail, it will set the status to the failed and

    log the error to errors attribute.

  4. If the task failed N times in a row, it will set the status to the

    exhausted.

Parameters:
  • lock – see is_locked()

  • now – the current time. If not provided, use get_utc_now().

  • expire – the lock expire time in seconds. If not provided, use get_expire`().

  • ignore_lock – if True, ignore the lock and execute the task anyway.

  • force_execution – if True, force to execute even the current status is exhausted or ignored.