Scheduler - Referencia de API

Sistema de programación de tareas robusta tipo Airflow con dependencias, reintentos y métricas.

Clase Scheduler

class ctrutils.scheduler.Scheduler(logger=None, timezone='UTC', max_workers=10, coalesce=True, misfire_grace_time=300, **scheduler_options)[fuente]

Bases: object

Scheduler robusto y eficiente para gestión de tareas programadas.

Características: - Ejecución continua sin terminar - Dependencias entre tareas - Reintentos automáticos - Callbacks y hooks - Monitoreo de estado - Gestión de señales para shutdown graceful

__init__(logger=None, timezone='UTC', max_workers=10, coalesce=True, misfire_grace_time=300, **scheduler_options)[fuente]

Inicializa el Scheduler.

Parámetros:
  • logger (Logger | None) – Instancia de logger a utilizar.

  • timezone (str) – Zona horaria para el scheduler.

  • max_workers (int) – Número máximo de workers concurrentes.

  • coalesce (bool) – Si True, combina múltiples ejecuciones pendientes en una.

  • misfire_grace_time (int) – Tiempo de gracia para ejecuciones perdidas (segundos).

  • scheduler_options (Any) – Opciones adicionales para APScheduler.

add_task(task)[fuente]

Añade una nueva tarea al scheduler.

Parámetros:

task (Task) – Instancia de Task a añadir.

add_job(func, trigger, job_id, trigger_args, max_retries=3, retry_delay=60, dependencies=None, on_success=None, on_failure=None, **job_kwargs)[fuente]

Añade un nuevo trabajo al scheduler (método simplificado).

Parámetros:
  • func (Callable) – La función a ejecutar.

  • trigger (str) – Tipo de trigger (“cron”, “interval”, “date”).

  • job_id (str) – ID único para el trabajo.

  • trigger_args (Dict[str, Any]) – Argumentos para el trigger.

  • max_retries (int) – Número máximo de reintentos.

  • retry_delay (int) – Delay entre reintentos (segundos).

  • dependencies (List[str] | None) – Lista de job_ids que deben completarse antes.

  • on_success (Callable | None) – Callback en caso de éxito.

  • on_failure (Callable | None) – Callback en caso de fallo.

  • job_kwargs (Any) – Argumentos adicionales (args, kwargs, etc.).

remove_job(job_id)[fuente]

Elimina un trabajo del scheduler.

Parámetros:

job_id (str) – ID del trabajo a eliminar.

get_jobs()[fuente]

Obtiene la lista de trabajos programados.

Devuelve:

Lista de jobs del scheduler.

Tipo del valor devuelto:

List[Any]

get_task_metrics(task_id)[fuente]

Obtiene las métricas de una tarea específica.

Parámetros:

task_id (str) – ID de la tarea.

Devuelve:

Diccionario con las métricas o None.

Tipo del valor devuelto:

Dict[str, Any] | None

get_all_metrics()[fuente]

Obtiene todas las métricas del scheduler.

Devuelve:

Diccionario con métricas globales y por tarea.

Tipo del valor devuelto:

Dict[str, Any]

start(blocking=False)[fuente]

Inicia el scheduler.

Parámetros:

blocking (bool) – Si True, mantiene el scheduler ejecutándose indefinidamente.

shutdown(wait=True)[fuente]

Detiene el scheduler.

Parámetros:

wait (bool) – Si True, espera a que terminen los jobs en ejecución.

pause_job(job_id)[fuente]

Pausa temporalmente la ejecución de una tarea programada.

La tarea permanece registrada pero no se ejecutará según su schedule hasta que se llame a resume_job(). Las ejecuciones que ocurran mientras está pausada se considerarán «missed» según la política de misfire.

Parámetros:

job_id (str) – Identificador único de la tarea a pausar.

Muestra:

JobLookupError – Si el job_id no existe en el scheduler.

Ejemplos

>>> scheduler.pause_job("tarea_nocturna")
>>> # Durante el día, hacer mantenimiento...
>>> scheduler.resume_job("tarea_nocturna")

Ver también

resume_job: Reactiva una tarea pausada. remove_job: Elimina permanentemente una tarea.

resume_job(job_id)[fuente]

Reanuda la ejecución de una tarea previamente pausada.

La tarea volverá a ejecutarse según su schedule configurado. Si la siguiente ejecución ya pasó, se ejecutará inmediatamente o según la política de misfire.

Parámetros:

job_id (str) – Identificador único de la tarea a reanudar.

Muestra:

JobLookupError – Si el job_id no existe en el scheduler.

Ejemplos

>>> scheduler.pause_job("backup_nocturno")
>>> # Realizar mantenimiento...
>>> scheduler.resume_job("backup_nocturno")

Ver también

pause_job: Pausa una tarea temporalmente.

reschedule_job(job_id, trigger_type, **trigger_args)[fuente]

Re-programa un job existente con un nuevo trigger.

Parámetros:
  • job_id (str) – ID del job a re-programar.

  • trigger_type (str) – Nuevo tipo de trigger.

  • trigger_args (Any) – Argumentos para el nuevo trigger.

is_running()[fuente]

Verifica si el scheduler está actualmente ejecutándose.

Devuelve:

True si el scheduler está activo y procesando tareas,

False en caso contrario.

Tipo del valor devuelto:

bool

Ejemplos

>>> scheduler.start()
>>> assert scheduler.is_running() is True
>>> scheduler.shutdown()
>>> assert scheduler.is_running() is False

Ver también

start: Inicia el scheduler. shutdown: Detiene el scheduler.

print_jobs()[fuente]

Imprime información de todos los jobs programados en el scheduler.

Muestra una tabla formateada con información de cada job incluyendo: ID, nombre, trigger, próxima ejecución y estado.

Esta función es útil para debugging y monitoreo del estado del scheduler.

Ejemplos

>>> scheduler.add_job(func=tarea1, trigger='cron',
...                   job_id='tarea1',
...                   trigger_args={'minute': '*/5'})
>>> scheduler.print_jobs()
Jobstore default:
    tarea1 (trigger: cron[minute='*/5'], next run at: ...)

Ver también

get_jobs: Obtiene lista programática de jobs. get_all_metrics: Obtiene métricas de todos los jobs.

Clase Task

class ctrutils.scheduler.Task(task_id, func, trigger_type, trigger_args, max_retries=3, retry_delay=60, retry_backoff=2.0, timeout=None, dependencies=None, on_success=None, on_failure=None, on_retry=None, condition=None, args=None, kwargs=None)[fuente]

Bases: object

Representa una tarea programable con dependencias y reintentos.

__init__(task_id, func, trigger_type, trigger_args, max_retries=3, retry_delay=60, retry_backoff=2.0, timeout=None, dependencies=None, on_success=None, on_failure=None, on_retry=None, condition=None, args=None, kwargs=None)[fuente]

Inicializa una tarea.

Parámetros:
  • task_id (str) – Identificador único de la tarea.

  • func (Callable) – Función a ejecutar.

  • trigger_type (str) – Tipo de trigger (“cron”, “interval”, “date”).

  • trigger_args (Dict[str, Any]) – Argumentos para el trigger.

  • max_retries (int) – Número máximo de reintentos en caso de fallo.

  • retry_delay (int) – Delay inicial entre reintentos (segundos).

  • retry_backoff (float) – Multiplicador para el delay de reintento (backoff exponencial).

  • timeout (int | None) – Timeout de ejecución en segundos.

  • dependencies (List[str] | None) – Lista de task_ids que deben completarse antes.

  • on_success (Callable | None) – Callback a ejecutar en caso de éxito.

  • on_failure (Callable | None) – Callback a ejecutar en caso de fallo.

  • on_retry (Callable | None) – Callback a ejecutar en cada reintento.

  • condition (Callable[[], bool] | None) – Función que retorna True/False para ejecutar condicionalmente.

  • args (tuple | None) – Argumentos posicionales para la función.

  • kwargs (Dict[str, Any] | None) – Argumentos nombrados para la función.

Enum JobState

class ctrutils.scheduler.JobState(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[fuente]

Bases: Enum

Estados posibles de un job durante su ciclo de vida.

Los estados representan las diferentes fases por las que pasa una tarea desde su creación hasta su finalización o fallo.

PENDING

La tarea está programada pero aún no ha comenzado su ejecución.

RUNNING

La tarea está actualmente en ejecución.

SUCCESS

La tarea se completó exitosamente sin errores.

FAILED

La tarea falló después de agotar todos los reintentos.

RETRYING

La tarea falló pero se reintentará automáticamente.

SKIPPED

La tarea se omitió porque no cumplió su condición de ejecución o porque una dependencia falló.

Ejemplos

>>> if task.state == JobState.SUCCESS:
...     print("Tarea completada")
>>> elif task.state == JobState.RETRYING:
...     print(f"Reintentando...")
PENDING = 'pending'
RUNNING = 'running'
SUCCESS = 'success'
FAILED = 'failed'
RETRYING = 'retrying'
SKIPPED = 'skipped'

Clase JobMetrics

class ctrutils.scheduler.JobMetrics[fuente]

Bases: object

Métricas de ejecución y monitoreo de un job.

Esta clase mantiene estadísticas detalladas sobre las ejecuciones de una tarea, incluyendo contadores de éxitos/fallos, tiempos de ejecución, y tasa de éxito.

Las métricas se actualizan automáticamente en cada ejecución y se pueden exportar a diccionario para logging o visualización.

total_runs

Número total de ejecuciones (incluye éxitos, fallos y reintentos).

Type:

int

successes

Número de ejecuciones exitosas.

Type:

int

failures

Número de ejecuciones fallidas (después de agotar reintentos).

Type:

int

retries

Número total de intentos de reintento realizados.

Type:

int

last_run_time

Timestamp de la última ejecución.

Type:

Optional[datetime]

last_duration

Duración en segundos de la última ejecución.

Type:

Optional[float]

last_state

Estado final de la última ejecución.

Type:

Optional[JobState]

avg_duration

Duración promedio de las ejecuciones exitosas en segundos.

Type:

float

Ejemplos

>>> metrics = JobMetrics()
>>> metrics.record_run(duration=1.5, state=JobState.SUCCESS)
>>> print(f"Tasa de éxito: {metrics.to_dict()['success_rate']:.2%}")
Tasa de éxito: 100.00%
__init__()[fuente]

Inicializa todas las métricas en cero.

Las métricas comienzan vacías y se actualizan mediante el método record_run() después de cada ejecución de la tarea.

record_run(duration, state)[fuente]

Registra los resultados de una ejecución del job.

Actualiza todos los contadores y métricas según el estado de la ejecución. Para ejecuciones exitosas, también actualiza el promedio de duración.

Parámetros:
  • duration (float) – Tiempo de ejecución en segundos (debe ser >= 0).

  • state (JobState) – Estado final de la ejecución (SUCCESS, FAILED, RETRYING).

Ejemplos

>>> metrics = JobMetrics()
>>> metrics.record_run(1.5, JobState.SUCCESS)
>>> metrics.record_run(2.1, JobState.SUCCESS)
>>> assert metrics.avg_duration == 1.8  # (1.5 + 2.1) / 2
to_dict()[fuente]

Convierte las métricas a un diccionario serializable.

Devuelve:

  • total_runs (int): Total de ejecuciones

  • successes (int): Ejecuciones exitosas

  • failures (int): Ejecuciones fallidas

  • retries (int): Total de reintentos

  • success_rate (float): Porcentaje de éxito (0.0 a 1.0)

  • last_run_time (str|None): ISO timestamp de última ejecución

  • last_duration (float|None): Duración de última ejecución

  • last_state (str|None): Estado de última ejecución

  • avg_duration (float): Duración promedio de ejecuciones exitosas

Tipo del valor devuelto:

Dict con las siguientes claves

Ejemplos

>>> metrics = JobMetrics()
>>> metrics.record_run(1.5, JobState.SUCCESS)
>>> data = metrics.to_dict()
>>> print(data['success_rate'])  # 1.0
1.0