Ejemplos de Scheduler
Scheduler Básico con Cron
Ejecutar tarea cada 5 minutos:
from ctrutils.scheduler import Scheduler
def enviar_reporte():
print("Enviando reporte...")
scheduler = Scheduler()
scheduler.add_job(
func=enviar_reporte,
trigger='cron',
job_id='reporte_cada_5min',
trigger_args={'minute': '*/5'}
)
scheduler.start(blocking=True)
Pipeline ETL con Dependencias
Tareas secuenciales (extract -> transform -> load):
from ctrutils.scheduler import Scheduler, Task
def extract_data():
print("Extrayendo datos...")
return {"status": "ok"}
def transform_data():
print("Transformando datos...")
return {"status": "ok"}
def load_data():
print("Cargando datos...")
scheduler = Scheduler()
extract = Task(
task_id='extract',
func=extract_data,
trigger_type='cron',
trigger_args={'minute': '0', 'hour': '2'},
max_retries=3
)
transform = Task(
task_id='transform',
func=transform_data,
trigger_type='cron',
trigger_args={'minute': '0', 'hour': '2'},
dependencies=['extract']
)
load = Task(
task_id='load',
func=load_data,
trigger_type='cron',
trigger_args={'minute': '0', 'hour': '2'},
dependencies=['transform']
)
scheduler.add_task(extract)
scheduler.add_task(transform)
scheduler.add_task(load)
scheduler.start(blocking=True)
Tarea con Callbacks
Ejecutar callbacks en caso de éxito o fallo:
from ctrutils.scheduler import Scheduler
def tarea_critica():
# Simular tarea que puede fallar
import random
if random.random() > 0.5:
raise Exception("Error simulado")
return "Éxito"
def on_success(result):
print(f"Tarea exitosa: {result}")
def on_failure(error):
print(f"Tarea falló: {error}")
# Enviar alerta
def on_retry(error, attempt):
print(f"Reintentando (intento {attempt}): {error}")
scheduler = Scheduler()
scheduler.add_job(
func=tarea_critica,
trigger='cron',
job_id='backup',
trigger_args={'hour': '3', 'minute': '0'},
max_retries=5,
retry_delay=300, # 5 minutos
on_success=on_success,
on_failure=on_failure,
on_retry=on_retry
)
scheduler.start(blocking=True)
Ejecución Condicional
Ejecutar tarea solo si se cumple una condición:
from ctrutils.scheduler import Scheduler, Task
def es_dia_laboral():
import datetime
return datetime.datetime.now().weekday() < 5 # Lunes a Viernes
def tarea_laboral():
print("Ejecutando tarea de día laboral")
task = Task(
task_id='tarea_laboral',
func=tarea_laboral,
trigger_type='cron',
trigger_args={'minute': '0', 'hour': '9'},
condition=es_dia_laboral # Solo ejecuta si retorna True
)
scheduler = Scheduler()
scheduler.add_task(task)
scheduler.start(blocking=True)
Monitoreo de Métricas
Obtener métricas de ejecución:
from ctrutils.scheduler import Scheduler
scheduler = Scheduler()
# ... agregar tareas ...
scheduler.start()
# En otro hilo o proceso, obtener métricas
metrics = scheduler.get_all_metrics()
print(f"Uptime: {metrics['global']['uptime_seconds']}s")
print(f"Total jobs ejecutados: {metrics['global']['total_jobs_executed']}")
print(f"Total fallos: {metrics['global']['total_failures']}")
# Métricas por tarea
for task_id, task_metrics in metrics['tasks'].items():
print(f"\n{task_id}:")
print(f" Success rate: {task_metrics['success_rate']:.2%}")
print(f" Avg duration: {task_metrics['avg_duration']:.2f}s")
Referencia de API
Para documentación completa, consulta Scheduler - Referencia de API.