Guía de Inicio
Instalación
Instalación básica:
pip install ctrutils
Instalación con Poetry:
poetry add ctrutils
Verificar la instalación:
python -c "import ctrutils; print(ctrutils.__version__)"
Scheduler Básico
Crear un scheduler básico:
from ctrutils.scheduler import Scheduler
def mi_tarea():
print("Ejecutando tarea...")
scheduler = Scheduler(timezone="Europe/Madrid")
scheduler.add_job(
func=mi_tarea,
trigger="cron",
job_id="tarea_cada_5min",
trigger_args={"minute": "*/5"},
)
# Iniciar (nunca termina)
scheduler.start(blocking=True)
Pipeline ETL con Dependencias
Crear un pipeline con dependencias entre tareas:
from ctrutils.scheduler import Scheduler, Task
scheduler = Scheduler()
extract = Task(
task_id="extract",
func=extract_data,
trigger_type="cron",
trigger_args={"minute": "*/15"},
max_retries=3,
)
transform = Task(
task_id="transform",
func=transform_data,
trigger_type="cron",
trigger_args={"minute": "*/15"},
dependencies=["extract"], # Solo ejecuta si extract OK
)
scheduler.add_task(extract)
scheduler.add_task(transform)
scheduler.start(blocking=True)
InfluxDB Básico
Conexión y escritura básica:
from ctrutils import InfluxdbOperation
import pandas as pd
influx = InfluxdbOperation(
host='localhost',
port=8086,
username='admin',
password='password'
)
df = pd.DataFrame({
'value': [1.0, 2.0, 3.0],
'sensor': ['A', 'B', 'C']
})
stats = influx.write_dataframe(
measurement='mediciones',
data=df,
database='mi_db',
validate_data=True, # Limpia NaN automáticamente
)
print(f"Escritos: {stats['successful_points']} puntos")
Consultar Datos
Consultar datos con DataFrame:
result = influx.query(
query='SELECT * FROM mediciones LIMIT 10',
database='mi_db',
return_dataframe=True
)
print(result.head())
Logging Básico
Configurar logger con múltiples handlers:
from ctrutils.handler import LoggingHandler
import logging
logger = LoggingHandler(
logger_name="mi_app",
level=logging.INFO,
)
logger.add_handlers([
logger.create_stream_handler(),
logger.create_rotating_file_handler(
filename="app.log",
max_bytes=10*1024*1024, # 10MB
backup_count=5,
)
])
logger.info("Aplicación iniciada")
Próximos Pasos
Ver Referencia de API para referencia completa de la API
Ver Ejemplos de InfluxDB para ejemplos de InfluxDB
Ver Ejemplos de Scheduler para ejemplos de Scheduler