Scheduler - Referencia de API
Sistema de programación de tareas robusta tipo Airflow con dependencias, reintentos y métricas.
Clase Scheduler
- class ctrutils.scheduler.Scheduler(logger=None, timezone='UTC', max_workers=10, coalesce=True, misfire_grace_time=300, **scheduler_options)[fuente]
Bases:
objectScheduler robusto y eficiente para gestión de tareas programadas.
Características: - Ejecución continua sin terminar - Dependencias entre tareas - Reintentos automáticos - Callbacks y hooks - Monitoreo de estado - Gestión de señales para shutdown graceful
- __init__(logger=None, timezone='UTC', max_workers=10, coalesce=True, misfire_grace_time=300, **scheduler_options)[fuente]
Inicializa el Scheduler.
- Parámetros:
logger (Logger | None) – Instancia de logger a utilizar.
timezone (str) – Zona horaria para el scheduler.
max_workers (int) – Número máximo de workers concurrentes.
coalesce (bool) – Si True, combina múltiples ejecuciones pendientes en una.
misfire_grace_time (int) – Tiempo de gracia para ejecuciones perdidas (segundos).
scheduler_options (Any) – Opciones adicionales para APScheduler.
- add_task(task)[fuente]
Añade una nueva tarea al scheduler.
- Parámetros:
task (Task) – Instancia de Task a añadir.
- add_job(func, trigger, job_id, trigger_args, max_retries=3, retry_delay=60, dependencies=None, on_success=None, on_failure=None, **job_kwargs)[fuente]
Añade un nuevo trabajo al scheduler (método simplificado).
- Parámetros:
func (Callable) – La función a ejecutar.
trigger (str) – Tipo de trigger (“cron”, “interval”, “date”).
job_id (str) – ID único para el trabajo.
max_retries (int) – Número máximo de reintentos.
retry_delay (int) – Delay entre reintentos (segundos).
dependencies (List[str] | None) – Lista de job_ids que deben completarse antes.
on_success (Callable | None) – Callback en caso de éxito.
on_failure (Callable | None) – Callback en caso de fallo.
job_kwargs (Any) – Argumentos adicionales (args, kwargs, etc.).
- remove_job(job_id)[fuente]
Elimina un trabajo del scheduler.
- Parámetros:
job_id (str) – ID del trabajo a eliminar.
- start(blocking=False)[fuente]
Inicia el scheduler.
- Parámetros:
blocking (bool) – Si True, mantiene el scheduler ejecutándose indefinidamente.
- shutdown(wait=True)[fuente]
Detiene el scheduler.
- Parámetros:
wait (bool) – Si True, espera a que terminen los jobs en ejecución.
- pause_job(job_id)[fuente]
Pausa temporalmente la ejecución de una tarea programada.
La tarea permanece registrada pero no se ejecutará según su schedule hasta que se llame a resume_job(). Las ejecuciones que ocurran mientras está pausada se considerarán «missed» según la política de misfire.
- Parámetros:
job_id (str) – Identificador único de la tarea a pausar.
- Muestra:
JobLookupError – Si el job_id no existe en el scheduler.
Ejemplos
>>> scheduler.pause_job("tarea_nocturna") >>> # Durante el día, hacer mantenimiento... >>> scheduler.resume_job("tarea_nocturna")
Ver también
resume_job: Reactiva una tarea pausada. remove_job: Elimina permanentemente una tarea.
- resume_job(job_id)[fuente]
Reanuda la ejecución de una tarea previamente pausada.
La tarea volverá a ejecutarse según su schedule configurado. Si la siguiente ejecución ya pasó, se ejecutará inmediatamente o según la política de misfire.
- Parámetros:
job_id (str) – Identificador único de la tarea a reanudar.
- Muestra:
JobLookupError – Si el job_id no existe en el scheduler.
Ejemplos
>>> scheduler.pause_job("backup_nocturno") >>> # Realizar mantenimiento... >>> scheduler.resume_job("backup_nocturno")
Ver también
pause_job: Pausa una tarea temporalmente.
- reschedule_job(job_id, trigger_type, **trigger_args)[fuente]
Re-programa un job existente con un nuevo trigger.
- is_running()[fuente]
Verifica si el scheduler está actualmente ejecutándose.
- Devuelve:
- True si el scheduler está activo y procesando tareas,
False en caso contrario.
- Tipo del valor devuelto:
Ejemplos
>>> scheduler.start() >>> assert scheduler.is_running() is True >>> scheduler.shutdown() >>> assert scheduler.is_running() is False
Ver también
start: Inicia el scheduler. shutdown: Detiene el scheduler.
- print_jobs()[fuente]
Imprime información de todos los jobs programados en el scheduler.
Muestra una tabla formateada con información de cada job incluyendo: ID, nombre, trigger, próxima ejecución y estado.
Esta función es útil para debugging y monitoreo del estado del scheduler.
Ejemplos
>>> scheduler.add_job(func=tarea1, trigger='cron', ... job_id='tarea1', ... trigger_args={'minute': '*/5'}) >>> scheduler.print_jobs() Jobstore default: tarea1 (trigger: cron[minute='*/5'], next run at: ...)
Ver también
get_jobs: Obtiene lista programática de jobs. get_all_metrics: Obtiene métricas de todos los jobs.
Clase Task
- class ctrutils.scheduler.Task(task_id, func, trigger_type, trigger_args, max_retries=3, retry_delay=60, retry_backoff=2.0, timeout=None, dependencies=None, on_success=None, on_failure=None, on_retry=None, condition=None, args=None, kwargs=None)[fuente]
Bases:
objectRepresenta una tarea programable con dependencias y reintentos.
- __init__(task_id, func, trigger_type, trigger_args, max_retries=3, retry_delay=60, retry_backoff=2.0, timeout=None, dependencies=None, on_success=None, on_failure=None, on_retry=None, condition=None, args=None, kwargs=None)[fuente]
Inicializa una tarea.
- Parámetros:
task_id (str) – Identificador único de la tarea.
func (Callable) – Función a ejecutar.
trigger_type (str) – Tipo de trigger (“cron”, “interval”, “date”).
max_retries (int) – Número máximo de reintentos en caso de fallo.
retry_delay (int) – Delay inicial entre reintentos (segundos).
retry_backoff (float) – Multiplicador para el delay de reintento (backoff exponencial).
timeout (int | None) – Timeout de ejecución en segundos.
dependencies (List[str] | None) – Lista de task_ids que deben completarse antes.
on_success (Callable | None) – Callback a ejecutar en caso de éxito.
on_failure (Callable | None) – Callback a ejecutar en caso de fallo.
on_retry (Callable | None) – Callback a ejecutar en cada reintento.
condition (Callable[[], bool] | None) – Función que retorna True/False para ejecutar condicionalmente.
args (tuple | None) – Argumentos posicionales para la función.
kwargs (Dict[str, Any] | None) – Argumentos nombrados para la función.
Enum JobState
- class ctrutils.scheduler.JobState(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[fuente]
Bases:
EnumEstados posibles de un job durante su ciclo de vida.
Los estados representan las diferentes fases por las que pasa una tarea desde su creación hasta su finalización o fallo.
- PENDING
La tarea está programada pero aún no ha comenzado su ejecución.
- RUNNING
La tarea está actualmente en ejecución.
- SUCCESS
La tarea se completó exitosamente sin errores.
- FAILED
La tarea falló después de agotar todos los reintentos.
- RETRYING
La tarea falló pero se reintentará automáticamente.
- SKIPPED
La tarea se omitió porque no cumplió su condición de ejecución o porque una dependencia falló.
Ejemplos
>>> if task.state == JobState.SUCCESS: ... print("Tarea completada") >>> elif task.state == JobState.RETRYING: ... print(f"Reintentando...")
- PENDING = 'pending'
- RUNNING = 'running'
- SUCCESS = 'success'
- FAILED = 'failed'
- RETRYING = 'retrying'
- SKIPPED = 'skipped'
Clase JobMetrics
- class ctrutils.scheduler.JobMetrics[fuente]
Bases:
objectMétricas de ejecución y monitoreo de un job.
Esta clase mantiene estadísticas detalladas sobre las ejecuciones de una tarea, incluyendo contadores de éxitos/fallos, tiempos de ejecución, y tasa de éxito.
Las métricas se actualizan automáticamente en cada ejecución y se pueden exportar a diccionario para logging o visualización.
- last_run_time
Timestamp de la última ejecución.
- Type:
Optional[datetime]
- last_duration
Duración en segundos de la última ejecución.
- Type:
Optional[float]
- last_state
Estado final de la última ejecución.
- Type:
Optional[JobState]
Ejemplos
>>> metrics = JobMetrics() >>> metrics.record_run(duration=1.5, state=JobState.SUCCESS) >>> print(f"Tasa de éxito: {metrics.to_dict()['success_rate']:.2%}") Tasa de éxito: 100.00%
- __init__()[fuente]
Inicializa todas las métricas en cero.
Las métricas comienzan vacías y se actualizan mediante el método record_run() después de cada ejecución de la tarea.
- record_run(duration, state)[fuente]
Registra los resultados de una ejecución del job.
Actualiza todos los contadores y métricas según el estado de la ejecución. Para ejecuciones exitosas, también actualiza el promedio de duración.
- Parámetros:
Ejemplos
>>> metrics = JobMetrics() >>> metrics.record_run(1.5, JobState.SUCCESS) >>> metrics.record_run(2.1, JobState.SUCCESS) >>> assert metrics.avg_duration == 1.8 # (1.5 + 2.1) / 2
- to_dict()[fuente]
Convierte las métricas a un diccionario serializable.
- Devuelve:
total_runs (int): Total de ejecuciones
successes (int): Ejecuciones exitosas
failures (int): Ejecuciones fallidas
retries (int): Total de reintentos
success_rate (float): Porcentaje de éxito (0.0 a 1.0)
last_run_time (str|None): ISO timestamp de última ejecución
last_duration (float|None): Duración de última ejecución
last_state (str|None): Estado de última ejecución
avg_duration (float): Duración promedio de ejecuciones exitosas
- Tipo del valor devuelto:
Dict con las siguientes claves
Ejemplos
>>> metrics = JobMetrics() >>> metrics.record_run(1.5, JobState.SUCCESS) >>> data = metrics.to_dict() >>> print(data['success_rate']) # 1.0 1.0