Código fuente para ctrutils.database.influxdb.InfluxdbOperation

"""
Este modulo proporciona la clase `InfluxdbOperation` para manejar operaciones en una base de datos
InfluxDB utilizando un cliente `InfluxDBClient`.
"""

from collections import defaultdict
from typing import Any, Dict, List, Optional, Union, Callable
from datetime import datetime, timezone
import math
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager

import pandas as pd  # type: ignore
import numpy as np  # type: ignore
from influxdb import InfluxDBClient


[documentos] class InfluxdbOperation: """ Clase para manejar la conexion y operaciones en una base de datos InfluxDB. Esta clase proporciona una interfaz completa para interactuar con InfluxDB, incluyendo validacion avanzada de datos, limpieza de NaN, escritura por lotes de DataFrames grandes, y operaciones de administracion de base de datos. Ejemplos: >>> # Crear instancia con credenciales >>> influx = InfluxdbOperation( ... host='localhost', ... port=8086, ... username='admin', ... password='password' ... ) >>> # O usar un cliente existente >>> client = InfluxDBClient(host='localhost', port=8086) >>> influx = InfluxdbOperation(client=client) >>> # Escribir un DataFrame grande con validacion >>> df = pd.DataFrame(...) >>> influx.write_dataframe( ... measurement='mi_medicion', ... data=df, ... database='mi_db', ... batch_size=1000, ... validate_data=True ... ) """
[documentos] def __init__( self, host: Optional[str] = None, port: Optional[Union[int, str]] = None, timeout: Optional[Union[int, float]] = 5, client: Optional[InfluxDBClient] = None, **kwargs: Any, ): """ Inicializa la conexion y la clase `InfluxdbOperation`. Args: host: Direccion del servidor InfluxDB (requerido si client no se proporciona). port: Puerto del servidor InfluxDB (requerido si client no se proporciona). timeout: Tiempo de espera para las operaciones en segundos. client: Cliente InfluxDBClient existente (opcional). Si se proporciona, se usara este cliente en lugar de crear uno nuevo. **kwargs: Argumentos adicionales para InfluxDBClient (username, password, etc.). Raises: ValueError: Si no se proporciona ni cliente ni host/port. """ if client is not None: # Usar el cliente proporcionado self._client = client self._is_external_client = True # Intentar extraer informacion del cliente self.host = getattr(client, '_host', None) self.port = getattr(client, '_port', None) self.timeout = getattr(client, '_timeout', timeout) elif host is not None and port is not None: # Crear un nuevo cliente self.host = host self.port = port self.timeout = timeout self._is_external_client = False self._headers = {"Accept": "application/json"} self._gzip = True self._client = InfluxDBClient( host=host, port=port, timeout=timeout, headers=self._headers, gzip=self._gzip, **kwargs, ) else: raise ValueError( "Debe proporcionar un cliente existente (client) o " "las credenciales de conexion (host y port)." ) self._database: Optional[str] = None self._headers = {"Accept": "application/json"} self._gzip = True self._logger: Optional[logging.Logger] = None self._retry_attempts = 3 self._retry_delay = 1 # segundos self._metrics = { 'total_writes': 0, 'total_points': 0, 'failed_writes': 0, 'total_write_time': 0.0 }
# ==================== UTILIDADES INTERNAS ==================== @staticmethod def _convert_to_utc_iso(dt: Union[str, datetime, pd.Timestamp]) -> str: """ Convierte un datetime a formato ISO8601 en UTC. Args: dt: Datetime a convertir (string, datetime o Timestamp). Returns: String en formato ISO8601 UTC. """ if isinstance(dt, str): # Ya es string, asumimos que está en formato correcto return dt elif isinstance(dt, pd.Timestamp): dt = dt.to_pydatetime() # Convertir a UTC si tiene timezone, o asumirlo como UTC if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: dt = dt.astimezone(timezone.utc) return dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ') def _setup_logger( self, name: str = 'InfluxdbOperation', level: int = logging.INFO, logger: Optional[logging.Logger] = None ) -> None: """ Configura el logger para la clase. Args: name: Nombre del logger. level: Nivel de logging. logger: Logger personalizado (opcional). Si se proporciona, se usa en lugar de crear uno nuevo. """ if logger: self._logger = logger else: self._logger = logging.getLogger(name) self._logger.setLevel(level) if not self._logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) self._logger.addHandler(handler)
[documentos] def enable_logging( self, level: int = logging.INFO, logger: Optional[logging.Logger] = None ) -> None: """ Activa el logging para debugging y monitoreo. Args: level: Nivel de logging (logging.DEBUG, INFO, WARNING, ERROR). logger: Instancia de logger personalizado (opcional). Si se proporciona, se usa en lugar de crear uno nuevo. Ejemplos: >>> # Con logging estándar >>> influx = InfluxdbOperation(host='localhost', port=8086) >>> influx.enable_logging(logging.DEBUG) >>> # Con LoggingHandler de ctrutils >>> from ctrutils.handler import LoggingHandler >>> handler = LoggingHandler() >>> custom_logger = handler.add_handlers([ ... handler.create_stream_handler(), ... handler.create_file_handler('influxdb.log') ... ]) >>> influx.enable_logging(logger=custom_logger) """ self._setup_logger(level=level, logger=logger)
[documentos] def get_metrics(self) -> Dict[str, Any]: """ Obtiene métricas de rendimiento de las operaciones. Returns: Diccionario con métricas de escritura. Ejemplos: >>> metrics = influx.get_metrics() >>> print(f"Total writes: {metrics['total_writes']}") >>> print(f"Avg write time: {metrics['avg_write_time']:.2f}s") """ avg_time = ( self._metrics['total_write_time'] / self._metrics['total_writes'] if self._metrics['total_writes'] > 0 else 0 ) return { **self._metrics, 'avg_write_time': avg_time, 'success_rate': ( (self._metrics['total_writes'] - self._metrics['failed_writes']) / self._metrics['total_writes'] * 100 if self._metrics['total_writes'] > 0 else 0 ) }
[documentos] def reset_metrics(self) -> None: """Reinicia las métricas de rendimiento.""" self._metrics = { 'total_writes': 0, 'total_points': 0, 'failed_writes': 0, 'total_write_time': 0.0 }
def _retry_operation( self, operation: Callable, *args, max_attempts: Optional[int] = None, **kwargs ) -> Any: """ Ejecuta una operación con reintentos automáticos. Args: operation: Función a ejecutar. max_attempts: Número máximo de intentos (usa self._retry_attempts si es None). *args, **kwargs: Argumentos para la operación. Returns: Resultado de la operación. Raises: Exception: Si todos los intentos fallan. """ attempts = max_attempts or self._retry_attempts last_exception = None for attempt in range(attempts): try: return operation(*args, **kwargs) except Exception as e: last_exception = e if self._logger: self._logger.warning( f"Intento {attempt + 1}/{attempts} falló: {e}" ) if attempt < attempts - 1: # Backoff exponencial sleep_time = self._retry_delay * (2 ** attempt) time.sleep(sleep_time) raise last_exception or Exception("Operación falló después de reintentos")
[documentos] @contextmanager def transaction(self, database: Optional[str] = None): """ Context manager para operaciones transaccionales. Args: database: Base de datos para la transacción. Ejemplos: >>> with influx.transaction('mi_db') as db: ... influx.write_dataframe(measurement='datos', data=df) ... influx.write_points(points=points) """ if database: original_db = self._database self.switch_database(database) try: yield self except Exception as e: if self._logger: self._logger.error(f"Error en transacción: {e}") raise finally: if database and original_db: self.switch_database(original_db)
@property def get_client_info(self) -> Dict[str, Any]: """ Obtiene informacion del cliente actual. """ return { "host": self.host, "port": self.port, "database": self._database, "timeout": self.timeout, "headers": self._headers, "gzip": self._gzip, } @property def get_client(self) -> InfluxDBClient: """ Obtiene el cliente actual `InfluxDBClient`. """ return self._client
[documentos] def close_client(self) -> None: """ Cierra la conexion actual del cliente `InfluxDBClient`. Nota: Si el cliente fue proporcionado externamente, no se cerrara automaticamente para evitar efectos secundarios en otros componentes que lo usen. """ if not self._is_external_client: self._client.close()
[documentos] def switch_database(self, database: str) -> None: """ Cambia la base de datos activa en el cliente de InfluxDB. """ if database not in self._client.get_list_database(): self._client.create_database(database) self._database = database self._client.switch_database(database)
[documentos] def get_data( self, query: str, database: Optional[str] = None, ) -> pd.DataFrame: """ Ejecuta una consulta en InfluxDB y devuelve los resultados en un DataFrame. """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) result_set = self._client.query( query=query, chunked=True, chunk_size=5000 ) data_list = [ point for chunk in result_set for point in chunk.get_points() ] if not data_list: raise ValueError( f"No hay datos disponibles para la query '{query}' en la base de datos '{database or self._database}'." ) df = pd.DataFrame(data_list) if "time" in df.columns: df = df.set_index("time") df.index = pd.to_datetime(df.index) return df
[documentos] def query_to_dataframe( self, measurement: str, fields: Optional[List[str]] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, where_conditions: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, database: Optional[str] = None, convert_to_local_tz: bool = False, ) -> pd.DataFrame: """ Consulta datos de un measurement y devuelve un DataFrame. Args: measurement: Nombre del measurement. fields: Lista de campos a consultar (None = todos). start_time: Tiempo de inicio (formato ISO8601 o string InfluxQL compatible). end_time: Tiempo de fin (formato ISO8601 o string InfluxQL compatible). where_conditions: Condiciones adicionales como dict {'tag': 'value'}. limit: Limitar numero de resultados. database: Base de datos (None = usa la actual). convert_to_local_tz: Si True, convierte el indice a la timezone local. Returns: DataFrame con los datos consultados, indice es time en UTC. Ejemplos: >>> # Leer ultimos 100 puntos >>> df = influx.query_to_dataframe( ... measurement='temperatura', ... limit=100, ... database='mi_db' ... ) >>> >>> # Leer rango de tiempo especifico >>> df = influx.query_to_dataframe( ... measurement='temperatura', ... start_time='2024-01-01T00:00:00Z', ... end_time='2024-01-31T23:59:59Z', ... database='mi_db' ... ) """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) # Construir query field_str = ", ".join(fields) if fields else "*" query = f'SELECT {field_str} FROM "{measurement}"' # Agregar condiciones WHERE where_clauses = [] if start_time: where_clauses.append(f"time >= '{start_time}'") if end_time: where_clauses.append(f"time <= '{end_time}'") if where_conditions: for key, value in where_conditions.items(): if isinstance(value, str): where_clauses.append(f'"{key}" = \'{value}\'') else: where_clauses.append(f'"{key}" = {value}') if where_clauses: query += " WHERE " + " AND ".join(where_clauses) # Ordenar por tiempo descendente query += " ORDER BY time DESC" # Limitar resultados if limit: query += f" LIMIT {limit}" if self._logger: self._logger.debug(f"Ejecutando query: {query}") # Ejecutar query result = self._client.query(query) points = list(result.get_points()) if not points: if self._logger: self._logger.warning(f"No se encontraron datos para measurement '{measurement}'") return pd.DataFrame() # Convertir a DataFrame df = pd.DataFrame(points) if 'time' in df.columns: df['time'] = pd.to_datetime(df['time'], utc=True) if convert_to_local_tz: df['time'] = df['time'].dt.tz_convert('local') df.set_index('time', inplace=True) return df
[documentos] def read_last_n_points( self, measurement: str, n: int = 100, fields: Optional[List[str]] = None, database: Optional[str] = None, ) -> pd.DataFrame: """ Lee los ultimos N puntos de un measurement. Args: measurement: Nombre del measurement. n: Numero de puntos a leer. fields: Lista de campos a leer (None = todos). database: Base de datos (None = usa la actual). Returns: DataFrame con los ultimos N puntos. Ejemplos: >>> # Leer ultimos 50 puntos >>> df = influx.read_last_n_points('temperatura', n=50) """ return self.query_to_dataframe( measurement=measurement, fields=fields, limit=n, database=database )
[documentos] def read_time_range( self, measurement: str, start_time: Union[str, datetime, pd.Timestamp], end_time: Union[str, datetime, pd.Timestamp], fields: Optional[List[str]] = None, database: Optional[str] = None, ) -> pd.DataFrame: """ Lee datos de un rango de tiempo especifico. Args: measurement: Nombre del measurement. start_time: Tiempo de inicio (acepta string, datetime o Timestamp). end_time: Tiempo de fin (acepta string, datetime o Timestamp). fields: Lista de campos a leer (None = todos). database: Base de datos (None = usa la actual). Returns: DataFrame con los datos del rango de tiempo. Ejemplos: >>> # Usando strings >>> df = influx.read_time_range( ... measurement='temperatura', ... start_time='2024-01-01T00:00:00Z', ... end_time='2024-01-31T23:59:59Z' ... ) >>> >>> # Usando datetime >>> from datetime import datetime, timedelta >>> now = datetime.now() >>> df = influx.read_time_range( ... measurement='temperatura', ... start_time=now - timedelta(hours=24), ... end_time=now ... ) """ # Convertir a formato ISO8601 si es necesario if not isinstance(start_time, str): start_time = self._convert_to_utc_iso(start_time) if not isinstance(end_time, str): end_time = self._convert_to_utc_iso(end_time) return self.query_to_dataframe( measurement=measurement, fields=fields, start_time=start_time, end_time=end_time, database=database )
[documentos] def get_measurements(self, database: Optional[str] = None) -> List[str]: """ Obtiene la lista de mediciones (measurements) en una base de datos. Este método es un alias conveniente de list_measurements() que devuelve solo los nombres en lugar de objetos Result completos. Args: database: Nombre de la base de datos. Si es None, usa la base de datos por defecto configurada en el cliente. Returns: List[str]: Lista de nombres de mediciones ordenadas alfabéticamente. Devuelve lista vacía si no hay mediciones o si la base de datos no existe. Raises: InfluxDBClientError: Si hay error de conexión o permisos insuficientes. Examples: >>> influx = InfluxdbOperation(host='localhost', port=8086) >>> measurements = influx.get_measurements('mi_db') >>> print(f"Encontradas {len(measurements)} mediciones") >>> for m in measurements: ... print(f" - {m}") See Also: list_measurements: Método original que devuelve objetos Result. get_databases: Obtiene lista de bases de datos. """ return self.list_measurements(database)
[documentos] def get_databases(self) -> List[str]: """ Obtiene la lista de bases de datos disponibles en InfluxDB. Este método es un alias conveniente de list_databases() para mantener consistencia de nomenclatura con get_measurements(). Returns: List[str]: Lista de nombres de bases de datos disponibles, incluyendo bases de datos del sistema como '_internal'. Raises: InfluxDBClientError: Si hay error de conexión al servidor. Examples: >>> influx = InfluxdbOperation(host='localhost', port=8086) >>> databases = influx.get_databases() >>> print("Bases de datos disponibles:") >>> for db in databases: ... print(f" - {db}") Bases de datos disponibles: - _internal - mi_db - produccion See Also: list_databases: Método original. get_measurements: Obtiene lista de mediciones en una BD. """ return self.list_databases()
[documentos] def normalize_value_to_write(self, value: Any) -> Any: """ Normaliza el valor para su escritura en InfluxDB. Esta funcion valida y convierte valores para asegurar compatibilidad con InfluxDB. Args: value: Valor a normalizar. Returns: Valor normalizado o None si no es valido. """ # Verificar si es NaN, None o infinito if value is None: return None # Manejar valores numericos especiales if isinstance(value, (int, float, np.integer, np.floating)): # Convertir tipos numpy a tipos nativos Python if isinstance(value, (np.integer, np.int64, np.int32, np.int16, np.int8)): value = int(value) elif isinstance(value, (np.floating, np.float64, np.float32, np.float16)): value = float(value) # Verificar NaN e infinitos if isinstance(value, float): if math.isnan(value) or math.isinf(value): return None return value elif isinstance(value, int): return float(value) # Manejar booleanos elif isinstance(value, (bool, np.bool_)): return bool(value) # Manejar strings elif isinstance(value, (str, np.str_)): value_str = str(value).strip() # Filtrar strings vacios o con valores especiales if value_str in ['', 'nan', 'NaN', 'None', 'null', 'NULL']: return None return value_str # Manejar pandas.NA o numpy.nan elif pd.isna(value): return None return value
def _validate_point(self, point: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Valida y limpia un punto antes de escribirlo en InfluxDB. Args: point: Diccionario representando un punto de datos. Returns: Punto validado o None si no es valido. """ if not point or 'fields' not in point: return None # Filtrar campos con valores None o NaN validated_fields = {} for field_key, field_value in point['fields'].items(): normalized_value = self.normalize_value_to_write(field_value) if normalized_value is not None: validated_fields[field_key] = normalized_value # Si no quedan campos validos, el punto es invalido if not validated_fields: return None point['fields'] = validated_fields return point
[documentos] def write_points( self, points: list, database: Optional[str] = None, tags: Optional[dict] = None, batch_size: int = 5000, validate_data: bool = True, ) -> Dict[str, int]: """ Escribe una lista de puntos directamente en InfluxDB, asegurando que el timestamp este en UTC. Args: points: Lista de puntos a escribir. Cada punto debe ser un diccionario con las claves 'measurement', 'time', 'fields' y opcionalmente 'tags'. database: Nombre de la base de datos donde escribir (opcional si ya esta configurada). tags: Tags adicionales para agregar a todos los puntos. batch_size: Tamaño del lote para escritura. Por defecto 5000. validate_data: Si True, valida y limpia los puntos antes de escribir. Returns: Diccionario con estadisticas de escritura: { 'total_points': int, 'written_points': int, 'invalid_points': int, 'batches': int } Raises: ValueError: Si no se proporciona base de datos o la lista de puntos esta vacia. """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) if not points: raise ValueError("La lista de puntos no puede estar vacia.") total_points = len(points) validated_points = [] invalid_count = 0 for point in points: # Convertir timestamp a UTC if "time" in point: point["time"] = self._convert_to_utc_iso(point["time"]) # Agregar tags adicionales if tags: point["tags"] = {**point.get("tags", {}), **tags} # Validar el punto si se solicita if validate_data: validated_point = self._validate_point(point) if validated_point is not None: validated_points.append(validated_point) else: invalid_count += 1 else: validated_points.append(point) # Escribir en lotes written_count = 0 batch_count = 0 for i in range(0, len(validated_points), batch_size): batch = validated_points[i:i + batch_size] self._client.write_points( points=batch, database=db_to_use, batch_size=batch_size ) written_count += len(batch) batch_count += 1 return { 'total_points': total_points, 'written_points': written_count, 'invalid_points': invalid_count, 'batches': batch_count }
[documentos] def write_dataframe( self, measurement: Optional[str] = None, data: Optional[pd.DataFrame] = None, df: Optional[pd.DataFrame] = None, tags: Optional[dict] = None, database: Optional[str] = None, batch_size: int = 1000, validate_data: bool = True, pass_to_float: bool = True, convert_bool_to_float: bool = False, suffix_bool_to_float: str = "_bool_to_float", drop_na_rows: bool = False, field_columns: Optional[List[str]] = None, tag_columns: Optional[List[str]] = None, convert_index_to_utc: bool = True, ) -> Dict[str, int]: """ Convierte un DataFrame en una lista de puntos y los escribe en InfluxDB, con validacion avanzada y limpieza de datos. Este metodo maneja automaticamente: - Valores NaN, None, e infinitos - Conversion de tipos numpy a tipos nativos Python - Validacion de cada punto antes de escribir - Escritura por lotes para DataFrames grandes - Conversion de timestamps a UTC Args: measurement: Nombre de la medicion en InfluxDB. data: DataFrame con los datos a escribir. El indice debe ser DatetimeIndex (acepta tambien 'df'). df: Alias para 'data' (por compatibilidad). tags: Tags adicionales para todos los puntos (opcional). database: Base de datos donde escribir (opcional si ya esta configurada). batch_size: Tamaño del lote para escritura. Por defecto 1000. Reducir si tiene problemas de memoria con DataFrames grandes. validate_data: Si True, valida y limpia datos antes de escribir. pass_to_float: Si True, convierte enteros a float para compatibilidad InfluxDB. convert_bool_to_float: Si True, convierte columnas booleanas a float. suffix_bool_to_float: Sufijo para columnas booleanas convertidas. drop_na_rows: Si True, elimina filas donde todos los valores son NaN. field_columns: Lista de columnas a usar como fields (None = todas excepto tag_columns). tag_columns: Lista de columnas a usar como tags adicionales. convert_index_to_utc: Si True, convierte el indice a UTC antes de escribir. Returns: Diccionario con estadisticas de escritura. Raises: ValueError: Si no se proporciona DataFrame o measurement. TypeError: Si el indice del DataFrame no es DatetimeIndex. Ejemplos: >>> df = pd.DataFrame({ ... 'temperatura': [20.5, np.nan, 21.0, 22.5], ... 'humedad': [45.0, 50.0, np.nan, 55.0] ... }, index=pd.date_range('2024-01-01', periods=4, freq='H')) >>> >>> stats = influx.write_dataframe( ... measurement='clima', ... data=df, ... database='mi_db', ... batch_size=1000, ... validate_data=True ... ) >>> print(f"Escritos: {stats['written_points']}/{stats['total_points']}") """ # Compatibilidad con ambos parametros dataframe = df if df is not None else data if dataframe is None or measurement is None: raise ValueError( "Debe proporcionar un DataFrame 'data' o 'df' y un 'measurement'." ) if not isinstance(dataframe.index, pd.DatetimeIndex): raise TypeError("El indice del DataFrame debe ser de tipo DatetimeIndex.") # Crear una copia para no modificar el original dataframe = dataframe.copy() # Convertir indice a UTC si se solicita if convert_index_to_utc: if dataframe.index.tz is None: # Localizar como UTC si no tiene timezone dataframe.index = dataframe.index.tz_localize('UTC') else: # Convertir a UTC si tiene otro timezone dataframe.index = dataframe.index.tz_convert('UTC') # Eliminar filas completamente vacias si se solicita if drop_na_rows: dataframe = dataframe.dropna(how='all') # Separar columnas para tags y fields tags_from_columns = {} if tag_columns: for col in tag_columns: if col in dataframe.columns: # Los tags no pueden cambiar por fila en este enfoque simple # Tomamos el primer valor no-nulo first_val = dataframe[col].dropna().iloc[0] if not dataframe[col].dropna().empty else None if first_val is not None: tags_from_columns[col] = str(first_val) dataframe = dataframe.drop(columns=[col for col in tag_columns if col in dataframe.columns]) # Determinar columnas de fields if field_columns: dataframe = dataframe[field_columns] # Convertir columnas booleanas si se solicita if convert_bool_to_float: for column in dataframe.select_dtypes(include=["bool"]).columns: dataframe[f"{column}{suffix_bool_to_float}"] = dataframe[column].astype(float) dataframe = dataframe.drop(columns=[column]) # Convertir DataFrame a lista de diccionarios de puntos points = [] for index, row in dataframe.iterrows(): # Construir los campos, aplicando normalizacion fields = {} for field, value in row.items(): # Saltar valores NaN if pd.isna(value): continue # Normalizar el valor if validate_data: normalized_value = self.normalize_value_to_write(value) if normalized_value is not None: fields[field] = normalized_value else: # Sin validacion, solo aplicar conversion basica if pass_to_float and isinstance(value, (int, np.integer)): fields[field] = float(value) else: fields[field] = value # Solo agregar el punto si tiene campos validos if fields: point = { "measurement": measurement, "time": self._convert_to_utc_iso(index), "fields": fields, } # Agregar tags (tanto los proporcionados como los de columnas) point_tags = {} if tags: point_tags.update(tags) if tags_from_columns: point_tags.update(tags_from_columns) if point_tags: point["tags"] = point_tags points.append(point) # Escribir los puntos usando el metodo write_points mejorado return self.write_points( points=points, database=database, tags=None, # Ya los agregamos arriba batch_size=batch_size, validate_data=False, # Ya validamos arriba )
[documentos] def write_dataframe_parallel( self, df: pd.DataFrame, measurement: str, tags: Optional[Dict[str, str]] = None, field_columns: Optional[List[str]] = None, tag_columns: Optional[List[str]] = None, batch_size: int = 5000, max_workers: int = 4, progress_callback: Optional[Callable[[int, int], None]] = None, database: Optional[str] = None, ) -> Dict[str, Any]: """ Escribe un DataFrame a InfluxDB usando procesamiento paralelo. Args: df: DataFrame con datos a escribir measurement: Nombre de la medicion tags: Tags adicionales a agregar a todos los puntos field_columns: Columnas a usar como fields (None = todas las numericas) tag_columns: Columnas a usar como tags batch_size: Tamaño de cada batch max_workers: Numero maximo de threads para procesamiento paralelo progress_callback: Funcion opcional(processed, total) para reportar progreso database: Nombre de la base de datos (None = usa la actual) Returns: Diccionario con estadisticas de la operacion """ if df.empty: return {"total_points": 0, "successful": 0, "failed": 0, "duration": 0.0} start_time = time.time() total_rows = len(df) processed = 0 successful = 0 failed = 0 # Dividir DataFrame en chunks chunks = [df.iloc[i:i + batch_size] for i in range(0, total_rows, batch_size)] # Funcion para procesar cada chunk def process_chunk(chunk_data): try: stats = self.write_dataframe( df=chunk_data, measurement=measurement, tags=tags, field_columns=field_columns, tag_columns=tag_columns, batch_size=batch_size, validate_data=True, database=database ) return stats['successful'], stats['failed'] except Exception as e: if self._logger: self._logger.error(f"Error procesando chunk: {e}") return 0, len(chunk_data) # Procesar chunks en paralelo with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(process_chunk, chunk): i for i, chunk in enumerate(chunks)} for future in futures: chunk_success, chunk_failed = future.result() successful += chunk_success failed += chunk_failed processed += chunk_success + chunk_failed if progress_callback: progress_callback(processed, total_rows) duration = time.time() - start_time result = { "total_points": total_rows, "successful": successful, "failed": failed, "duration": duration, "points_per_second": successful / duration if duration > 0 else 0 } if self._logger: self._logger.info(f"Escritura paralela completada: {result}") return result
[documentos] def downsample_data( self, measurement: str, target_measurement: str, aggregation_window: str, aggregation_func: str = "MEAN", fields: Optional[List[str]] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, database: Optional[str] = None, ) -> int: """ Crea una version downsampled de los datos. Args: measurement: Measurement de origen target_measurement: Measurement de destino aggregation_window: Ventana de agregacion (ej: '1h', '1d') aggregation_func: Funcion de agregacion (MEAN, SUM, MAX, MIN, COUNT) fields: Campos a agregar (None = todos) start_time: Tiempo de inicio (formato RFC3339) end_time: Tiempo de fin (formato RFC3339) database: Base de datos (None = usa la actual) Returns: Numero de puntos creados """ db_to_use = database or self._database if db_to_use is None: raise ValueError("Debe proporcionar una base de datos") self.switch_database(db_to_use) # Construir query field_list = ", ".join([f"{aggregation_func}({f}) AS {f}" for f in fields]) if fields else f"{aggregation_func}(*)" query = f""" SELECT {field_list} INTO "{target_measurement}" FROM "{measurement}" """ where_clauses = [] if start_time: where_clauses.append(f"time >= '{start_time}'") if end_time: where_clauses.append(f"time <= '{end_time}'") if where_clauses: query += " WHERE " + " AND ".join(where_clauses) query += f" GROUP BY time({aggregation_window}), *" result = self._client.query(query) # Contar puntos creados count_query = f'SELECT COUNT(*) FROM "{target_measurement}"' count_result = self._client.query(count_query) points_created = list(count_result.get_points())[0]['count'] if count_result else 0 if self._logger: self._logger.info(f"Downsampling completado: {points_created} puntos creados") return points_created
[documentos] def create_continuous_query( self, cq_name: str, measurement: str, target_measurement: str, aggregation_window: str, aggregation_func: str = "MEAN", fields: Optional[List[str]] = None, database: Optional[str] = None, ) -> None: """ Crea una continuous query para downsampling automatico. Args: cq_name: Nombre de la continuous query measurement: Measurement de origen target_measurement: Measurement de destino aggregation_window: Ventana de agregacion aggregation_func: Funcion de agregacion fields: Campos a agregar database: Base de datos """ db_to_use = database or self._database if db_to_use is None: raise ValueError("Debe proporcionar una base de datos") field_list = ", ".join([f"{aggregation_func}({f}) AS {f}" for f in fields]) if fields else f"{aggregation_func}(*)" query = f""" CREATE CONTINUOUS QUERY "{cq_name}" ON "{db_to_use}" BEGIN SELECT {field_list} INTO "{target_measurement}" FROM "{measurement}" GROUP BY time({aggregation_window}), * END """ self._client.query(query) if self._logger: self._logger.info(f"Continuous query '{cq_name}' creada exitosamente")
[documentos] def list_continuous_queries(self, database: Optional[str] = None) -> List[Dict[str, Any]]: """ Lista todas las continuous queries. Returns: Lista de diccionarios con informacion de las CQs """ db_to_use = database or self._database result = self._client.query("SHOW CONTINUOUS QUERIES") cqs = [] for point in result.get_points(): if not db_to_use or point.get('database') == db_to_use: cqs.append(dict(point)) return cqs
[documentos] def drop_continuous_query(self, cq_name: str, database: Optional[str] = None) -> None: """ Elimina una continuous query. Args: cq_name: Nombre de la CQ a eliminar database: Base de datos """ db_to_use = database or self._database if db_to_use is None: raise ValueError("Debe proporcionar una base de datos") query = f'DROP CONTINUOUS QUERY "{cq_name}" ON "{db_to_use}"' self._client.query(query) if self._logger: self._logger.info(f"Continuous query '{cq_name}' eliminada")
[documentos] def backup_measurement( self, measurement: str, output_file: str, start_time: Optional[str] = None, end_time: Optional[str] = None, database: Optional[str] = None, ) -> int: """ Exporta un measurement a archivo CSV. Args: measurement: Nombre del measurement output_file: Ruta del archivo CSV de salida start_time: Tiempo de inicio (opcional) end_time: Tiempo de fin (opcional) database: Base de datos (None = usa la actual) Returns: Numero de puntos exportados """ df = self.query_to_dataframe( measurement=measurement, start_time=start_time, end_time=end_time, database=database ) if df.empty: if self._logger: self._logger.warning(f"No hay datos para exportar del measurement '{measurement}'") return 0 df.to_csv(output_file, index=True) if self._logger: self._logger.info(f"Backup completado: {len(df)} puntos exportados a '{output_file}'") return len(df)
[documentos] def restore_measurement( self, measurement: str, input_file: str, tags: Optional[Dict[str, str]] = None, batch_size: int = 5000, database: Optional[str] = None, ) -> Dict[str, Any]: """ Restaura un measurement desde archivo CSV. Args: measurement: Nombre del measurement de destino input_file: Ruta del archivo CSV tags: Tags adicionales batch_size: Tamaño de batch para escritura database: Base de datos de destino Returns: Estadisticas de la operacion """ df = pd.read_csv(input_file, index_col=0, parse_dates=True) if df.empty: return {"total_points": 0, "successful": 0, "failed": 0} result = self.write_dataframe( df=df, measurement=measurement, tags=tags, batch_size=batch_size, database=database ) if self._logger: self._logger.info(f"Restauracion completada: {result}") return result
[documentos] def calculate_data_quality_metrics( self, measurement: str, fields: Optional[List[str]] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, database: Optional[str] = None, ) -> Dict[str, Dict[str, Any]]: """ Calcula metricas de calidad de datos para un measurement. Args: measurement: Nombre del measurement fields: Campos a analizar (None = todos) start_time: Tiempo de inicio end_time: Tiempo de fin database: Base de datos Returns: Diccionario con metricas por campo """ df = self.query_to_dataframe( measurement=measurement, fields=fields, start_time=start_time, end_time=end_time, database=database ) if df.empty: return {} metrics = {} for col in df.columns: if pd.api.types.is_numeric_dtype(df[col]): metrics[col] = { "count": int(df[col].count()), "missing": int(df[col].isna().sum()), "missing_percentage": float(df[col].isna().sum() / len(df) * 100), "mean": float(df[col].mean()) if df[col].count() > 0 else None, "std": float(df[col].std()) if df[col].count() > 1 else None, "min": float(df[col].min()) if df[col].count() > 0 else None, "max": float(df[col].max()) if df[col].count() > 0 else None, "zeros": int((df[col] == 0).sum()), "outliers": int(self._count_outliers(df[col])), } else: metrics[col] = { "count": int(df[col].count()), "missing": int(df[col].isna().sum()), "missing_percentage": float(df[col].isna().sum() / len(df) * 100), "unique_values": int(df[col].nunique()), } return metrics
@staticmethod def _count_outliers(series: pd.Series, threshold: float = 3.0) -> int: """ Cuenta outliers usando el metodo de desviacion estandar. Args: series: Serie de pandas threshold: Numero de desviaciones estandar para considerar outlier Returns: Numero de outliers """ if series.count() < 2: return 0 mean = series.mean() std = series.std() if std == 0: return 0 z_scores = np.abs((series - mean) / std) return int((z_scores > threshold).sum())
[documentos] def query_builder( self, measurement: str, fields: Optional[List[str]] = None, where_conditions: Optional[Dict[str, Any]] = None, group_by: Optional[List[str]] = None, order_by: str = "time DESC", limit: Optional[int] = None, database: Optional[str] = None, ) -> str: """ Constructor de queries InfluxQL avanzado. Args: measurement: Nombre del measurement fields: Campos a seleccionar (None = todos) where_conditions: Condiciones WHERE como diccionario group_by: Campos para agrupar order_by: Orden de resultados limit: Limite de resultados database: Base de datos Returns: Query InfluxQL como string """ # SELECT field_str = ", ".join(fields) if fields else "*" query = f'SELECT {field_str} FROM "{measurement}"' # WHERE if where_conditions: where_clauses = [] for key, value in where_conditions.items(): if isinstance(value, str): where_clauses.append(f'"{key}" = \'{value}\'') elif isinstance(value, (list, tuple)): # Para operadores como IN, >, <, etc operator, val = value if isinstance(val, str): where_clauses.append(f'"{key}" {operator} \'{val}\'') else: where_clauses.append(f'"{key}" {operator} {val}') else: where_clauses.append(f'"{key}" = {value}') if where_clauses: query += " WHERE " + " AND ".join(where_clauses) # GROUP BY if group_by: query += " GROUP BY " + ", ".join(group_by) # ORDER BY if order_by: query += f" ORDER BY {order_by}" # LIMIT if limit: query += f" LIMIT {limit}" return query
[documentos] def execute_query_builder( self, measurement: str, fields: Optional[List[str]] = None, where_conditions: Optional[Dict[str, Any]] = None, group_by: Optional[List[str]] = None, order_by: str = "time DESC", limit: Optional[int] = None, as_dataframe: bool = True, database: Optional[str] = None, ) -> Union[pd.DataFrame, Any]: """ Construye y ejecuta una query. Args: measurement: Nombre del measurement fields: Campos a seleccionar where_conditions: Condiciones WHERE group_by: Campos para agrupar order_by: Orden de resultados limit: Limite de resultados as_dataframe: Si retornar como DataFrame database: Base de datos Returns: DataFrame o resultado de query """ query = self.query_builder( measurement=measurement, fields=fields, where_conditions=where_conditions, group_by=group_by, order_by=order_by, limit=limit, database=database ) if as_dataframe: db_to_use = database or self._database if db_to_use: self.switch_database(db_to_use) result = self._client.query(query) if result: df = pd.DataFrame(list(result.get_points())) if not df.empty and 'time' in df.columns: df['time'] = pd.to_datetime(df['time']) df.set_index('time', inplace=True) return df return pd.DataFrame() else: return self._client.query(query)
[documentos] def delete( self, measurement: str, start_time: Optional[Union[str, pd.Timestamp]] = None, end_time: Optional[Union[str, pd.Timestamp]] = None, filters: Optional[Dict[str, str]] = None, database: Optional[str] = None, ) -> None: """ Elimina datos de una medición en InfluxDB. Permite eliminar datos con filtros por rango de tiempo y/o tags específicos. PRECAUCIÓN: Esta operación es irreversible. Args: measurement: Nombre de la medición de donde eliminar datos. start_time: Tiempo de inicio para el rango de eliminación (opcional). Acepta string ISO8601 o pd.Timestamp. end_time: Tiempo de fin para el rango de eliminación (opcional). Acepta string ISO8601 o pd.Timestamp. filters: Diccionario de filtros adicionales por tags {tag: valor}. database: Base de datos donde ejecutar el DELETE. Si es None, usa la BD actual. Returns: None: La operación no devuelve valor. Raises: ValueError: Si no se proporciona database y no hay BD configurada. InfluxDBClientError: Si hay error ejecutando la operación. Warning: Esta operación elimina datos permanentemente y no se puede deshacer. Considera hacer un backup antes de eliminar datos importantes. Examples: Eliminar todos los datos de una medición: >>> influx.delete(measurement='temperatura_antigua', database='mi_db') Eliminar datos de un rango de tiempo: >>> influx.delete( ... measurement='sensores', ... start_time='2024-01-01T00:00:00Z', ... end_time='2024-01-31T23:59:59Z', ... database='mi_db' ... ) Eliminar datos con filtro por tag: >>> influx.delete( ... measurement='sensores', ... filters={'sensor_id': 'sensor_001'}, ... database='mi_db' ... ) See Also: drop_measurement: Elimina la medición completa incluyendo esquema. backup_measurement: Crea backup antes de eliminar. """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) query = f"DELETE FROM \"{measurement}\"" where_clauses = [] if start_time: start_time_utc = self._convert_to_utc_iso(start_time) where_clauses.append(f"time >= '{start_time_utc}'") if end_time: end_time_utc = self._convert_to_utc_iso(end_time) where_clauses.append(f"time <= '{end_time_utc}'") if filters: for key, value in filters.items(): where_clauses.append(f"\"{key}\" = '{value}'") if where_clauses: query += " WHERE " + " AND ".join(where_clauses) self._client.query(query)
[documentos] def get_field_keys_grouped_by_type(self, measurement: str) -> Dict[str, List[str]]: """ Obtiene las claves de fields de un measurement, agrupadas por tipo de dato. Este método es útil para construir queries dinámicas que necesitan aplicar diferentes operaciones según el tipo de dato del field. Args: measurement: Nombre del measurement a consultar. Returns: Dict[str, List[str]]: Diccionario donde las claves son los tipos de datos ('float', 'integer', 'string', 'boolean') y los valores son listas de nombres de fields con ese tipo. Raises: InfluxDBClientError: Si el measurement no existe o hay error de conexión. Examples: >>> fields_by_type = influx.get_field_keys_grouped_by_type('sensores') >>> print(fields_by_type) { 'float': ['temperatura', 'humedad', 'presion'], 'integer': ['contador', 'nivel'], 'string': ['estado', 'ubicacion'], 'boolean': ['activo', 'alarma'] } >>> >>> # Usar para construir queries diferentes por tipo >>> float_fields = fields_by_type.get('float', []) >>> for field in float_fields: ... print(f"SELECT MEAN({field}) FROM sensores") See Also: list_fields: Obtiene lista simple de fields sin agrupar por tipo. build_query_fields: Construye parte de query aplicando operaciones por tipo. """ query = f"SHOW FIELD KEYS FROM \"{measurement}\"" results = list(self._client.query(query).get_points()) field_type_dict = defaultdict(list) for result in results: field_type_dict[result["fieldType"]].append(result["fieldKey"]) return dict(field_type_dict)
[documentos] def build_query_fields( self, fields: Union[List[str], Dict[str, List[str]]], operation: str ) -> Dict[str, str]: """ Construye una parte de la consulta de InfluxDB aplicando una operación a cada campo. Este método helper facilita la construcción de queries complejas que aplican operaciones agregadas (MEAN, MAX, MIN, etc.) a múltiples fields, respetando los tipos de datos para evitar errores. Args: fields: Lista de nombres de fields O diccionario con fields agrupados por tipo (como el retornado por get_field_keys_grouped_by_type()). operation: Operación de InfluxDB a aplicar ('MEAN', 'MAX', 'MIN', 'SUM', 'COUNT', 'FIRST', 'LAST', etc.). Returns: Dict[str, str]: Diccionario con las partes construidas de la query. Si fields es List, retorna {'fields': 'MEAN("temp") AS "temp", ...'}. Si fields es Dict, retorna {tipo: query_string} para cada tipo. Examples: Con lista simple de fields: >>> fields = ['temperatura', 'humedad', 'presion'] >>> query_parts = influx.build_query_fields(fields, 'MEAN') >>> print(query_parts['fields']) MEAN("temperatura") AS "temperatura", MEAN("humedad") AS "humedad", MEAN("presion") AS "presion" Con fields agrupados por tipo (no aplica operación a boolean/integer): >>> fields_by_type = { ... 'float': ['temperatura', 'humedad'], ... 'integer': ['contador'], ... 'boolean': ['activo'] ... } >>> query_parts = influx.build_query_fields(fields_by_type, 'MEAN') >>> print(query_parts['float']) MEAN("temperatura") AS "temperatura", MEAN("humedad") AS "humedad" >>> print(query_parts['integer']) "contador" >>> print(query_parts['boolean']) "activo" Uso en query completa: >>> fields = influx.get_field_keys_grouped_by_type('sensores') >>> query_parts = influx.build_query_fields(fields, 'MEAN') >>> select_clause = ', '.join(query_parts.values()) >>> query = f"SELECT {select_clause} FROM sensores WHERE time > now() - 1h" See Also: get_field_keys_grouped_by_type: Obtiene fields agrupados por tipo. query_builder: Constructor de queries más completo. """ query_fields = defaultdict(str) if isinstance(fields, list): query_fields["fields"] = ", ".join( [f'{operation}("{field}") AS "{field}"' for field in fields] ) if isinstance(fields, dict): for field_type, field_list in fields.items(): if field_type in ["boolean", "integer"]: query_parts = [f'"{field}"' for field in field_list] else: query_parts = [ f'{operation}("{field}") AS "{field}"' for field in field_list ] query_fields[field_type] = ", ".join(query_parts) return dict(query_fields)
# ==================== METODOS ADMINISTRATIVOS ====================
[documentos] def list_databases(self) -> List[str]: """ Lista todas las bases de datos disponibles en InfluxDB. Returns: Lista con los nombres de las bases de datos. Ejemplos: >>> influx = InfluxdbOperation(host='localhost', port=8086) >>> databases = influx.list_databases() >>> print(databases) ['_internal', 'mi_db', 'otra_db'] """ result = self._client.get_list_database() return [db['name'] for db in result]
[documentos] def database_exists(self, database: str) -> bool: """ Verifica si una base de datos existe. Args: database: Nombre de la base de datos a verificar. Returns: True si la base de datos existe, False en caso contrario. """ databases = self.list_databases() return database in databases
[documentos] def create_database(self, database: str) -> None: """ Crea una nueva base de datos en InfluxDB. Args: database: Nombre de la base de datos a crear. Ejemplos: >>> influx.create_database('nueva_db') """ self._client.create_database(database)
[documentos] def drop_database(self, database: str, confirm: bool = False) -> None: """ Elimina una base de datos de InfluxDB. Args: database: Nombre de la base de datos a eliminar. confirm: Debe ser True para confirmar la eliminacion (seguridad). Raises: ValueError: Si confirm no es True. Ejemplos: >>> influx.drop_database('vieja_db', confirm=True) """ if not confirm: raise ValueError( "Debe confirmar la eliminacion de la base de datos estableciendo confirm=True" ) self._client.drop_database(database)
[documentos] def list_measurements(self, database: Optional[str] = None) -> List[str]: """ Lista todas las mediciones en una base de datos. Args: database: Nombre de la base de datos (opcional si ya esta configurada). Returns: Lista con los nombres de las mediciones. Ejemplos: >>> measurements = influx.list_measurements('mi_db') >>> print(measurements) ['temperatura', 'humedad', 'presion'] """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) result = self._client.query("SHOW MEASUREMENTS") points = list(result.get_points()) return [point['name'] for point in points]
[documentos] def measurement_exists(self, measurement: str, database: Optional[str] = None) -> bool: """ Verifica si una medicion existe en la base de datos. Args: measurement: Nombre de la medicion a verificar. database: Nombre de la base de datos (opcional si ya esta configurada). Returns: True si la medicion existe, False en caso contrario. """ measurements = self.list_measurements(database) return measurement in measurements
[documentos] def drop_measurement(self, measurement: str, database: Optional[str] = None, confirm: bool = False) -> None: """ Elimina una medicion (y todos sus datos) de la base de datos. Args: measurement: Nombre de la medicion a eliminar. database: Nombre de la base de datos (opcional si ya esta configurada). confirm: Debe ser True para confirmar la eliminacion (seguridad). Raises: ValueError: Si confirm no es True. Ejemplos: >>> influx.drop_measurement('vieja_medicion', confirm=True) """ if not confirm: raise ValueError( "Debe confirmar la eliminacion de la medicion estableciendo confirm=True" ) db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) self._client.query(f'DROP MEASUREMENT "{measurement}"')
[documentos] def list_tags(self, measurement: str, database: Optional[str] = None) -> List[str]: """ Lista todos los tags de una medicion. Args: measurement: Nombre de la medicion. database: Nombre de la base de datos (opcional si ya esta configurada). Returns: Lista con los nombres de los tags. Ejemplos: >>> tags = influx.list_tags('temperatura', 'mi_db') >>> print(tags) ['sensor_id', 'location', 'type'] """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) query = f'SHOW TAG KEYS FROM "{measurement}"' result = self._client.query(query) points = list(result.get_points()) return [point['tagKey'] for point in points]
[documentos] def list_tag_values(self, measurement: str, tag_key: str, database: Optional[str] = None) -> List[str]: """ Lista todos los valores de un tag especifico en una medicion. Args: measurement: Nombre de la medicion. tag_key: Nombre del tag. database: Nombre de la base de datos (opcional si ya esta configurada). Returns: Lista con los valores del tag. Ejemplos: >>> values = influx.list_tag_values('temperatura', 'location', 'mi_db') >>> print(values) ['salon', 'cocina', 'dormitorio'] """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) query = f'SHOW TAG VALUES FROM "{measurement}" WITH KEY = "{tag_key}"' result = self._client.query(query) points = list(result.get_points()) return [point['value'] for point in points]
[documentos] def list_fields(self, measurement: str, database: Optional[str] = None) -> Dict[str, str]: """ Lista todos los campos de una medicion con sus tipos. Args: measurement: Nombre de la medicion. database: Nombre de la base de datos (opcional si ya esta configurada). Returns: Diccionario con nombres de campos como claves y tipos como valores. Ejemplos: >>> fields = influx.list_fields('temperatura', 'mi_db') >>> print(fields) {'temperatura': 'float', 'humedad': 'float', 'activo': 'boolean'} """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) query = f'SHOW FIELD KEYS FROM "{measurement}"' result = self._client.query(query) points = list(result.get_points()) return {point['fieldKey']: point['fieldType'] for point in points}
[documentos] def get_measurement_cardinality(self, measurement: str, database: Optional[str] = None) -> int: """ Obtiene la cardinalidad (numero de series unicas) de una medicion. La cardinalidad es el numero de combinaciones unicas de tags en una medicion. Una cardinalidad alta puede afectar el rendimiento. Args: measurement: Nombre de la medicion. database: Nombre de la base de datos (opcional si ya esta configurada). Returns: Numero de series unicas en la medicion. Ejemplos: >>> cardinality = influx.get_measurement_cardinality('temperatura', 'mi_db') >>> print(f"Series unicas: {cardinality}") """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) query = f'SHOW SERIES FROM "{measurement}"' result = self._client.query(query) points = list(result.get_points()) return len(points)
[documentos] def get_retention_policies(self, database: Optional[str] = None) -> List[Dict[str, Any]]: """ Lista las politicas de retencion de una base de datos. Args: database: Nombre de la base de datos (opcional si ya esta configurada). Returns: Lista de diccionarios con informacion de las politicas de retencion. Ejemplos: >>> policies = influx.get_retention_policies('mi_db') >>> for policy in policies: ... print(f"{policy['name']}: {policy['duration']}") """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) result = self._client.get_list_retention_policies(db_to_use) return result
[documentos] def count_points(self, measurement: str, database: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None) -> int: """ Cuenta el numero de puntos en una medicion. Args: measurement: Nombre de la medicion. database: Nombre de la base de datos (opcional si ya esta configurada). start_time: Tiempo de inicio para el conteo (opcional). end_time: Tiempo de fin para el conteo (opcional). Returns: Numero de puntos en la medicion. Ejemplos: >>> # Contar todos los puntos >>> total = influx.count_points('temperatura', 'mi_db') >>> print(f"Total de puntos: {total}") >>> >>> # Contar puntos en un rango de tiempo >>> total = influx.count_points( ... 'temperatura', ... 'mi_db', ... start_time='2024-01-01T00:00:00Z', ... end_time='2024-01-31T23:59:59Z' ... ) """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) self.switch_database(db_to_use) query = f'SELECT COUNT(*) FROM "{measurement}"' where_clauses = [] if start_time: where_clauses.append(f"time >= '{start_time}'") if end_time: where_clauses.append(f"time <= '{end_time}'") if where_clauses: query += " WHERE " + " AND ".join(where_clauses) result = self._client.query(query) points = list(result.get_points()) if points: # Obtener el primer valor de conteo disponible for key, value in points[0].items(): if key.startswith('count_'): return int(value) if value is not None else 0 return 0
[documentos] def get_database_info(self, database: Optional[str] = None) -> Dict[str, Any]: """ Obtiene informacion completa sobre una base de datos. Args: database: Nombre de la base de datos (opcional si ya esta configurada). Returns: Diccionario con informacion de la base de datos: - name: Nombre de la base de datos - measurements: Lista de mediciones - retention_policies: Politicas de retencion Ejemplos: >>> info = influx.get_database_info('mi_db') >>> print(f"Base de datos: {info['name']}") >>> print(f"Mediciones: {len(info['measurements'])}") >>> for measurement in info['measurements']: ... print(f" - {measurement}") """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) return { 'name': db_to_use, 'measurements': self.list_measurements(db_to_use), 'retention_policies': self.get_retention_policies(db_to_use), }
[documentos] def get_measurement_info(self, measurement: str, database: Optional[str] = None) -> Dict[str, Any]: """ Obtiene informacion completa sobre una medicion. Args: measurement: Nombre de la medicion. database: Nombre de la base de datos (opcional si ya esta configurada). Returns: Diccionario con informacion de la medicion: - name: Nombre de la medicion - tags: Lista de tags - fields: Diccionario de campos con sus tipos - cardinality: Numero de series unicas - point_count: Numero total de puntos Ejemplos: >>> info = influx.get_measurement_info('temperatura', 'mi_db') >>> print(f"Medicion: {info['name']}") >>> print(f"Tags: {info['tags']}") >>> print(f"Campos: {info['fields']}") >>> print(f"Total puntos: {info['point_count']}") """ db_to_use = database or self._database if db_to_use is None: raise ValueError( "Debe proporcionar una base de datos o establecerla mediante el metodo 'switch_database'." ) return { 'name': measurement, 'tags': self.list_tags(measurement, db_to_use), 'fields': self.list_fields(measurement, db_to_use), 'cardinality': self.get_measurement_cardinality(measurement, db_to_use), 'point_count': self.count_points(measurement, db_to_use), }