Código fuente para ctrutils.handler.notification.loki_handler

"""
Handler personalizado para enviar logs a Grafana Loki.

Grafana Loki es un sistema de agregación de logs horizontalmente escalable,
altamente disponible y multi-tenant inspirado en Prometheus.

Características:
  - Envío de logs vía HTTP API
  - Soporte para labels personalizados (como en Prometheus)
  - Batching opcional para mejor performance
  - Manejo de errores robusto
  - Timeout configurable

API de Loki:
  POST /loki/api/v1/push
  Content-Type: application/json

  {
    "streams": [
      {
        "stream": {
          "label1": "value1",
          "label2": "value2"
        },
        "values": [
          ["<unix_epoch_timestamp_nanoseconds>", "<log_line>"]
        ]
      }
    ]
  }
"""

import json
import logging
import sys
import time
from typing import Dict, List, Optional, Tuple

try:
    import requests
    REQUESTS_AVAILABLE = True
except ImportError:
    REQUESTS_AVAILABLE = False


[documentos] class LokiHandler(logging.Handler): """ Handler personalizado para enviar logs a Grafana Loki. Este handler permite enviar logs a un servidor Loki utilizando su API HTTP. Los logs se etiquetan con labels (similares a Prometheus) para facilitar consultas y filtrado en Grafana. **Características:** - Envío en tiempo real o con batching - Labels personalizados por aplicación/servicio/entorno - Manejo de errores sin bloquear la aplicación - Timeout configurable **Niveles de log recomendados:** - ``logging.DEBUG`` (10): Depuración detallada - ``logging.INFO`` (20): Información general - ``logging.WARNING`` (30): Advertencias - ``logging.ERROR`` (40): Errores - ``logging.CRITICAL`` (50): Errores críticos :param url: URL del servidor Loki (ej: "http://localhost:3100"). :type url: str :param labels: Diccionario de labels para identificar los logs. Ejemplo: {"app": "myapp", "env": "production", "host": "server1"} :type labels: Dict[str, str] :param level: Nivel mínimo de log para enviar a Loki. Defaults to ``logging.INFO``. :type level: int, optional :param timeout: Tiempo de espera para la solicitud HTTP en segundos. Defaults to 5. :type timeout: int, optional :param batch_size: Número de logs a acumular antes de enviar (0 = sin batching). Defaults to 0. :type batch_size: int, optional :ivar url: URL del endpoint de push de Loki. :ivar labels: Labels asociados a este handler. :ivar timeout: Timeout configurado para las peticiones HTTP. :ivar batch_size: Tamaño del batch configurado. :ivar batch: Lista temporal para acumular logs (si batching está habilitado). Ejemplo básico: --------------- .. code-block:: python import logging from loki_handler import LokiHandler handler = LokiHandler( url="http://localhost:3100", labels={"app": "myapp", "env": "dev"}, level=logging.INFO ) logger = logging.getLogger("myapp") logger.addHandler(handler) logger.setLevel(logging.INFO) logger.info("Aplicación iniciada") logger.error("Error al procesar datos") Ejemplo con batching: -------------------- .. code-block:: python handler = LokiHandler( url="http://loki:3100", labels={"app": "myapp", "env": "production"}, level=logging.INFO, batch_size=10 # Envía cada 10 logs ) Ejemplo en producción: --------------------- .. code-block:: python from ctrutils.handler import LoggingHandler logger = LoggingHandler.production_logger( name="myapp", log_file="app.log", loki_url="http://loki:3100", loki_labels={ "app": "myapp", "env": "production", "host": "server-01", "version": "1.0.0" } ) logger.info("Aplicación en producción iniciada") """
[documentos] def __init__( self, url: str, labels: Dict[str, str], level: int = logging.INFO, timeout: int = 5, batch_size: int = 0, ) -> None: """ Inicializa el LokiHandler. :param url: URL del servidor Loki (sin el path /loki/api/v1/push). :param labels: Diccionario de labels para los logs. :param level: Nivel mínimo de log. :param timeout: Timeout en segundos para las peticiones HTTP. :param batch_size: Número de logs a acumular antes de enviar (0 = sin batching). """ super().__init__(level) if not REQUESTS_AVAILABLE: raise ImportError( "El módulo 'requests' es requerido para LokiHandler. " "Instálalo con: pip install requests" ) # Normalizar URL (remover trailing slash) self.url = url.rstrip("/") + "/loki/api/v1/push" self.labels = labels self.timeout = timeout self.batch_size = batch_size self.batch: List[Tuple[str, str]] = [] # [(timestamp_ns, message), ...]
[documentos] def emit(self, record: logging.LogRecord) -> None: """ Envía el mensaje de log a Loki. Este método es llamado automáticamente por el logger cuando se registra un mensaje que cumple con el nivel mínimo configurado. Si el batching está habilitado, acumula los logs hasta alcanzar batch_size antes de enviarlos. Si no, envía inmediatamente. :param record: Registro de log a enviar. :type record: logging.LogRecord """ try: log_entry = self.format(record) timestamp_ns = str(int(time.time() * 1e9)) # Nanosegundos if self.batch_size > 0: # Modo batching self.batch.append((timestamp_ns, log_entry)) if len(self.batch) >= self.batch_size: self._send_batch() else: # Modo inmediato self._send_to_loki([(timestamp_ns, log_entry)]) except Exception as e: # No queremos que un error en logging rompa la aplicación print(f"Error al enviar log a Loki: {e}", file=sys.stderr)
def _send_batch(self) -> None: """ Envía el batch acumulado de logs a Loki y limpia el batch. """ if self.batch: self._send_to_loki(self.batch) self.batch = [] def _send_to_loki(self, values: List[Tuple[str, str]]) -> None: """ Envía una lista de logs a Loki vía HTTP POST. :param values: Lista de tuplas (timestamp_nanoseconds, log_message). """ payload = { "streams": [ { "stream": self.labels, "values": values } ] } try: response = requests.post( self.url, json=payload, headers={"Content-Type": "application/json"}, timeout=self.timeout ) if response.status_code not in (200, 204): print( f"Loki respondió con status {response.status_code}: {response.text}", file=sys.stderr ) except requests.exceptions.Timeout: print( f"Timeout al enviar logs a Loki (timeout={self.timeout}s)", file=sys.stderr ) except requests.exceptions.RequestException as e: print(f"Error de red al enviar logs a Loki: {e}", file=sys.stderr)
[documentos] def flush(self) -> None: """ Fuerza el envío de todos los logs pendientes en el batch. Este método debe ser llamado antes de cerrar la aplicación si se usa batching, para asegurar que todos los logs se envíen. Ejemplo: -------- .. code-block:: python import atexit loki_handler = LokiHandler(...) logger.addHandler(loki_handler) # Asegurar que se envíen los logs pendientes al cerrar atexit.register(loki_handler.flush) """ if self.batch_size > 0: self._send_batch() super().flush()
[documentos] def close(self) -> None: """ Cierra el handler enviando los logs pendientes. """ self.flush() super().close()
# Ejemplo de uso standalone if __name__ == "__main__": # Configurar handler de Loki loki = LokiHandler( url="http://localhost:3100", labels={ "app": "test_app", "env": "development", "host": "localhost" }, level=logging.DEBUG ) # Configurar logger logger = logging.getLogger("test_logger") logger.setLevel(logging.DEBUG) logger.addHandler(loki) # También mostrar en consola console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" )) logger.addHandler(console) # Generar logs de prueba logger.debug("Este es un mensaje de debug") logger.info("Este es un mensaje informativo") logger.warning("Esta es una advertencia") logger.error("Este es un error") logger.critical("Este es un error crítico") print("\n✅ Logs enviados a Loki. Verifica en Grafana.")