Код IT
← Каталог

Логирование, мониторинг и наблюдаемость систем — Сквозное тестирование наблюдаемости

Фрагмент из «Логирование, мониторинг и наблюдаемость систем»: Сквозное тестирование наблюдаемости.

python infra-securityencyclopedia8-04-devops-ci-cd-19 embed URL статья в энциклопедии
Python main.py

import asyncio
import requests

from datetime import datetime, timedelta

class ObservabilityIntegrationTest:
    """Сквозное тестирование наблюдаемости."""
    
    def __init__(self, app_url, prometheus_url, loki_url, tempo_url):
        self.app_url = app_url
        self.prometheus_url = prometheus_url
        self.loki_url = loki_url
        self.tempo_url = tempo_url
    
    async def run_full_test(self):
        """Полный тест всех трёх столпов наблюдаемости."""
        test_id = f"obs-test-{datetime.utcnow().isoformat()}"
        
        # Генерация тестового трафика с уникальным идентификатором
        await self._generate_test_traffic(test_id)
        
        # Ожидание обработки данных
        await asyncio.sleep(30)
        
        results = {
            "metrics": await self._verify_metrics(test_id),
            "logs": await self._verify_logs(test_id),
            "traces": await self._verify_traces(test_id),
        }
        
        overall_pass = all(r["passed"] for r in results.values())
        
        return {
            "test_id": test_id,
            "timestamp": datetime.utcnow().isoformat(),
            "overall_pass": overall_pass,
            "results": results
        }
    
    async def _generate_test_traffic(self, test_id: str):
        """Генерация запросов с тестовым идентификатором."""
        headers = {"X-Test-Id": test_id}
        
        # Успешный запрос
        requests.get(f"{self.app_url}/api/health", headers=headers)
        
        # Запрос с ошибкой
        requests.get(f"{self.app_url}/api/nonexistent", headers=headers)
        
        # Медленный запрос
        requests.get(
            f"{self.app_url}/api/slow-operation",
            headers=headers,
            timeout=10
        )
    
    async def _verify_metrics(self, test_id: str) -> dict:
        """Проверка наличия метрик от тестового трафика."""
        query = f'http_requests_total{{test_id="{test_id}"}}'
        response = requests.get(
            f"{self.prometheus_url}/api/v1/query",
            params={"query": query}
        )
        data = response.json()
        
        has_data = len(data.get("data", {}).get("result", [])) > 0
        
        return {
            "passed": has_data,
            "details": "Метрики обнаружены" if has_data else "Метрики отсутствуют"
        }
    
    async def _verify_logs(self, test_id: str) -> dict:
        """Проверка наличия логов с тестовым идентификатором."""
        query = f'{{app="myapp"}} |= "{test_id}"'
        end = datetime.utcnow()
        start = end - timedelta(minutes=5)
        
        response = requests.get(
            f"{self.loki_url}/loki/api/v1/query_range",
            params={
                "query": query,
                "start": start.timestamp(),
                "end": end.timestamp(),
                "limit": 10
            }
        )
        data = response.json()
        
        log_count = sum(
            len(stream["values"])
            for stream in data.get("data", {}).get("result", [])
        )
        
        return {
            "passed": log_count > 0,
            "details": f"Обнаружено {log_count} лог-записей"
        }
    
    async def _verify_traces(self, test_id: str) -> dict:
        """Проверка наличия трассировок с тестовым идентификатором."""
        response = requests.get(
            f"{self.tempo_url}/api/search",
            params={
                "tags": f"test.id={test_id}",
                "limit": 10
            }
        )
        data = response.json()
        
        trace_count = len(data.get("traces", []))
        
        return {
            "passed": trace_count > 0,
            "details": f"Обнаружено {trace_count} трассировок"
        }

import asyncio
import requests

from datetime import datetime, timedelta

class ObservabilityIntegrationTest:
    """Сквозное тестирование наблюдаемости."""
    
    def __init__(self, app_url, prometheus_url, loki_url, tempo_url):
        self.app_url = app_url
        self.prometheus_url = prometheus_url
        self.loki_url = loki_url
        self.tempo_url = tempo_url
    
    async def run_full_test(self):
        """Полный тест всех трёх столпов наблюдаемости."""
        test_id = f"obs-test-{datetime.utcnow().isoformat()}"
        
        # Генерация тестового трафика с уникальным идентификатором
        await self._generate_test_traffic(test_id)
        
        # Ожидание обработки данных
        await asyncio.sleep(30)
        
        results = {
            "metrics": await self._verify_metrics(test_id),
            "logs": await self._verify_logs(test_id),
            "traces": await self._verify_traces(test_id),
        }
        
        overall_pass = all(r["passed"] for r in results.values())
        
        return {
            "test_id": test_id,
            "timestamp": datetime.utcnow().isoformat(),
            "overall_pass": overall_pass,
            "results": results
        }
    
    async def _generate_test_traffic(self, test_id: str):
        """Генерация запросов с тестовым идентификатором."""
        headers = {"X-Test-Id": test_id}
        
        # Успешный запрос
        requests.get(f"{self.app_url}/api/health", headers=headers)
        
        # Запрос с ошибкой
        requests.get(f"{self.app_url}/api/nonexistent", headers=headers)
        
        # Медленный запрос
        requests.get(
            f"{self.app_url}/api/slow-operation",
            headers=headers,
            timeout=10
        )
    
    async def _verify_metrics(self, test_id: str) -> dict:
        """Проверка наличия метрик от тестового трафика."""
        query = f'http_requests_total{{test_id="{test_id}"}}'
        response = requests.get(
            f"{self.prometheus_url}/api/v1/query",
            params={"query": query}
        )
        data = response.json()
        
        has_data = len(data.get("data", {}).get("result", [])) > 0
        
        return {
            "passed": has_data,
            "details": "Метрики обнаружены" if has_data else "Метрики отсутствуют"
        }
    
    async def _verify_logs(self, test_id: str) -> dict:
        """Проверка наличия логов с тестовым идентификатором."""
        query = f'{{app="myapp"}} |= "{test_id}"'
        end = datetime.utcnow()
        start = end - timedelta(minutes=5)
        
        response = requests.get(
            f"{self.loki_url}/loki/api/v1/query_range",
            params={
                "query": query,
                "start": start.timestamp(),
                "end": end.timestamp(),
                "limit": 10
            }
        )
        data = response.json()
        
        log_count = sum(
            len(stream["values"])
            for stream in data.get("data", {}).get("result", [])
        )
        
        return {
            "passed": log_count > 0,
            "details": f"Обнаружено {log_count} лог-записей"
        }
    
    async def _verify_traces(self, test_id: str) -> dict:
        """Проверка наличия трассировок с тестовым идентификатором."""
        response = requests.get(
            f"{self.tempo_url}/api/search",
            params={
                "tags": f"test.id={test_id}",
                "limit": 10
            }
        )
        data = response.json()
        
        trace_count = len(data.get("traces", []))
        
        return {
            "passed": trace_count > 0,
            "details": f"Обнаружено {trace_count} трассировок"
        }