Ejemplos de InfluxDB

Validación de Datos

Validar DataFrame antes de escribir:

import pandas as pd
import numpy as np
from ctrutils import InfluxdbOperation

# DataFrame con datos problemáticos
df = pd.DataFrame({
    'value': [1.0, np.nan, np.inf, 3.0],
    'sensor': ['A', 'B', 'C', 'D']
})

influx = InfluxdbOperation(host='localhost', port=8086)

# Escribir con limpieza automática
stats = influx.write_dataframe(
    measurement='mediciones',
    data=df,
    database='mi_db',
    validate_data=True,  # Limpia NaN/inf automáticamente
    action_on_invalid='remove'
)

print(f"Puntos válidos escritos: {stats['successful_points']}")
print(f"Puntos inválidos eliminados: {stats.get('invalid_points_removed', 0)}")

Escritura Paralela

Para DataFrames grandes (>10k filas):

import pandas as pd
from ctrutils import InfluxdbOperation

# DataFrame grande
df = pd.DataFrame({
    'value': range(100000),
    'sensor': ['A'] * 100000
})

influx = InfluxdbOperation(host='localhost', port=8086)

# Escritura paralela con callback
def progress_callback(batch_num, total_batches):
    print(f"Procesando batch {batch_num}/{total_batches}")

stats = influx.write_dataframe_parallel(
    measurement='datos_masivos',
    data=df,
    database='mi_db',
    batch_size=5000,
    max_workers=4,
    progress_callback=progress_callback
)

print(f"Total puntos escritos: {stats['total_points']}")
print(f"Tiempo total: {stats['total_time']:.2f}s")

Backup y Restore

Exportar y restaurar datos:

from ctrutils import InfluxdbOperation

influx = InfluxdbOperation(host='localhost', port=8086)

# Backup a CSV
backup_file = influx.backup_measurement(
    database='mi_db',
    measurement='mediciones',
    output_file='backup_mediciones.csv',
    start_time='2024-01-01T00:00:00Z',
    end_time='2024-12-31T23:59:59Z'
)

print(f"Backup guardado en: {backup_file}")

# Restore desde CSV
stats = influx.restore_measurement(
    database='mi_db',
    measurement='mediciones_restored',
    csv_file='backup_mediciones.csv',
    batch_size=1000
)

print(f"Restaurados: {stats['successful_points']} puntos")

Consultas Avanzadas

Usar query builder para consultas complejas:

from ctrutils import InfluxdbOperation

influx = InfluxdbOperation(host='localhost', port=8086)

# Construir query dinámica
query = influx.query_builder(
    measurement='sensores',
    fields=['temperatura', 'humedad'],
    start_time='2024-01-01T00:00:00Z',
    end_time='2024-01-31T23:59:59Z',
    where_conditions={'location': 'sala1'},
    limit=1000
)

result = influx.execute_query_builder(query, return_dataframe=True)
print(result.describe())

Ver Código Completo

El código completo de estos ejemplos está disponible en el repositorio GitHub.

Referencia de API

Para documentación completa de todos los métodos, consulta InfluxDB - Referencia de API.