diff --git a/rock/admin/metrics/constants.py b/rock/admin/metrics/constants.py index 388d9740a..958d5e57c 100644 --- a/rock/admin/metrics/constants.py +++ b/rock/admin/metrics/constants.py @@ -19,5 +19,7 @@ class MetricsConstants: TOTAL_MEM_RESOURCE = "resource.mem.total" AVAILABLE_CPU_RESOURCE = "resource.cpu.available" AVAILABLE_MEM_RESOURCE = "resource.mem.available" + WORKER_DISK_DOCKER_DIR_PERCENT = "resource.worker_pod.disk.docker_dir.percent" + WORKER_DISK_LOG_DIR_PERCENT = "resource.worker_pod.disk.log_dir.percent" SANDBOX_PHASE_FAILURE = "sandbox.phase.failure" diff --git a/rock/env_vars.py b/rock/env_vars.py index be1f98899..eadcd32ee 100644 --- a/rock/env_vars.py +++ b/rock/env_vars.py @@ -37,6 +37,7 @@ ROCK_PIP_INDEX_URL: str | None = "https://mirrors.aliyun.com/pypi/simple/" ROCK_MONITOR_ENABLE: bool = False + ROCK_DOCKER_ROOT: str | None = None ROCK_PROJECT_ROOT: str | None = None ROCK_WORKER_ENV_TYPE: str | None = "local" ROCK_PYTHON_ENV_PATH: str | None = None @@ -89,6 +90,7 @@ "ROCK_OSS_BUCKET_REGION": lambda: os.getenv("ROCK_OSS_BUCKET_REGION"), "ROCK_PIP_INDEX_URL": lambda: os.getenv("ROCK_PIP_INDEX_URL", "https://mirrors.aliyun.com/pypi/simple/"), "ROCK_MONITOR_ENABLE": lambda: os.getenv("ROCK_MONITOR_ENABLE", "false").lower() == "true", + "ROCK_DOCKER_ROOT": lambda: os.getenv("ROCK_DOCKER_ROOT"), "ROCK_PROJECT_ROOT": lambda: os.getenv("ROCK_PROJECT_ROOT", str(Path(__file__).resolve().parents[1])), "ROCK_WORKER_ENV_TYPE": lambda: os.getenv("ROCK_WORKER_ENV_TYPE", "local"), "ROCK_PYTHON_ENV_PATH": lambda: os.getenv("ROCK_PYTHON_ENV_PATH", sys.base_prefix), diff --git a/rock/rocklet/monitor.py b/rock/rocklet/monitor.py new file mode 100644 index 000000000..8e3511dda --- /dev/null +++ b/rock/rocklet/monitor.py @@ -0,0 +1,113 @@ +# rock/rocklet/monitor.py +"""Per-worker resource monitor that runs as a background asyncio task inside rocklet.""" +import asyncio +import socket + +import psutil +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +from rock import env_vars +from rock.admin.metrics.constants import MetricsConstants +from rock.logger import init_logger +from rock.utils import get_uniagent_endpoint + +logger = init_logger(__name__) + + +def _xrl_gateway_metric_name(name: str) -> str: + return f"xrl_gateway.{name}" + + +def _get_worker_ip() -> str: + try: + return socket.gethostbyname(socket.gethostname()) + except socket.gaierror: + return "unknown" + + +class WorkerMonitorService: + """Background worker-node resource monitor. + + Collects psutil metrics every 10 s and pushes them to an OTLP endpoint. + Designed to run as an asyncio background task inside the rocklet process. + + Configuration (environment variables): + ROCK_DOCKER_ROOT — docker data-root dir to monitor (optional) + ROCK_LOGGING_PATH — log directory to monitor (optional) + """ + + _report_interval: int = 10 + _export_interval_millis: int = 10_000 + + def __init__(self) -> None: + self._node_id: str = socket.gethostname() + self._worker_ip: str = _get_worker_ip() + self._docker_root: str | None = env_vars.ROCK_DOCKER_ROOT + self._log_dir: str | None = env_vars.ROCK_LOGGING_PATH + self._running: bool = False + self._gauges: dict = {} + self._init_metrics() + + def _init_metrics(self) -> None: + host, port = get_uniagent_endpoint() + endpoint = f"http://{host}:{port}/v1/metrics" + logger.info(f"WorkerMonitorService OTLP endpoint: {endpoint}") + exporter = OTLPMetricExporter(endpoint=endpoint) + reader = PeriodicExportingMetricReader(exporter, export_interval_millis=self._export_interval_millis) + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter(MetricsConstants.METRICS_METER_NAME) + + self._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT] = meter.create_gauge( + name=_xrl_gateway_metric_name(MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT), + description="Docker root dir disk usage percent on worker node", + unit="1", + ) + self._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT] = meter.create_gauge( + name=_xrl_gateway_metric_name(MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT), + description="Log dir disk usage percent on worker node", + unit="1", + ) + + async def start(self) -> None: + """Start background monitoring loop (idempotent).""" + if self._running: + return + self._running = True + logger.info( + f"WorkerMonitorService starting: node_id={self._node_id} " + f"worker_ip={self._worker_ip} docker_root={self._docker_root!r} " + f"log_dir={self._log_dir!r}" + ) + asyncio.create_task(self._monitor_loop()) + + def stop(self) -> None: + self._running = False + + async def _monitor_loop(self) -> None: + while self._running: + try: + self._collect_and_report() + except Exception as e: + logger.warning(f"WorkerMonitorService collect failed on {self._worker_ip}: {e}") + await asyncio.sleep(self._report_interval) + + def _collect_and_report(self) -> None: + attrs = {"node_id": self._node_id, "worker_ip": self._worker_ip} + + if self._docker_root: + try: + self._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set( + psutil.disk_usage(self._docker_root).percent, attributes=attrs + ) + except Exception as e: + logger.warning(f"disk_usage({self._docker_root!r}) failed: {e}") + + if self._log_dir: + try: + self._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT].set( + psutil.disk_usage(self._log_dir).percent, attributes=attrs + ) + except Exception as e: + logger.warning(f"disk_usage({self._log_dir!r}) failed: {e}") diff --git a/rock/rocklet/server.py b/rock/rocklet/server.py index 8e1d9ef0e..285fe5358 100755 --- a/rock/rocklet/server.py +++ b/rock/rocklet/server.py @@ -6,6 +6,7 @@ import time import traceback import uuid +from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Request from fastapi.exception_handlers import http_exception_handler @@ -20,7 +21,26 @@ from rock.utils import EAGLE_EYE_TRACE_ID, REQUEST_TIMEOUT_SECONDS, sandbox_id_ctx_var, trace_id_ctx_var logger = init_logger("rocklet.server") -app = FastAPI() + +_enable_monitor: bool = False + + +@asynccontextmanager +async def lifespan(app: FastAPI): + if _enable_monitor: + from rock.rocklet.monitor import WorkerMonitorService + monitor = WorkerMonitorService() + await monitor.start() + logger.info("WorkerMonitorService started") + else: + monitor = None + yield + if monitor is not None: + monitor.stop() + logger.info("WorkerMonitorService stopped") + + +app = FastAPI(lifespan=lifespan) app.include_router(local_router, tags=["local"]) @@ -121,8 +141,18 @@ def main(): parser = argparse.ArgumentParser(description="Run the ROCKLET server") parser.add_argument("--host", default="0.0.0.0", help="Host to bind the server to") parser.add_argument("--port", type=int, default=8000, help="Port to run the server on") + parser.add_argument( + "--enable-monitor", + action="store_true", + default=False, + help="Enable background worker-node resource monitoring (for worker nodes only)", + ) args = parser.parse_args(remaining_args) + + global _enable_monitor + _enable_monitor = args.enable_monitor + uvicorn.run(app, host=args.host, port=args.port, access_log=False) diff --git a/tests/unit/rocklet/test_worker_monitor.py b/tests/unit/rocklet/test_worker_monitor.py new file mode 100644 index 000000000..e9c2a452f --- /dev/null +++ b/tests/unit/rocklet/test_worker_monitor.py @@ -0,0 +1,106 @@ +# tests/unit/rocklet/test_worker_monitor.py +"""Unit tests for WorkerMonitorService in rock/rocklet/monitor.py.""" +from unittest.mock import MagicMock + +import pytest + +from rock.admin.metrics.constants import MetricsConstants +from rock.rocklet.monitor import WorkerMonitorService + +_ALL_GAUGE_KEYS = [ + MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT, + MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT, +] + +_DISK_DOCKER = 55.0 +_DISK_LOG = 30.0 + + +def _make_service(docker_root="/var/lib/docker", log_dir="/var/log/rock") -> WorkerMonitorService: + """Create a WorkerMonitorService with mocked gauges (no real OTLP).""" + svc = WorkerMonitorService.__new__(WorkerMonitorService) + svc._node_id = "node-abc" + svc._worker_ip = "10.0.0.5" + svc._docker_root = docker_root + svc._log_dir = log_dir + svc._running = False + svc._gauges = {key: MagicMock() for key in _ALL_GAUGE_KEYS} + return svc + + +def _patch_psutil(monkeypatch): + disk_values = {"/var/lib/docker": _DISK_DOCKER, "/var/log/rock": _DISK_LOG} + + def fake_disk_usage(path): + m = MagicMock() + m.percent = disk_values.get(path, 0.0) + return m + + monkeypatch.setattr("rock.rocklet.monitor.psutil.disk_usage", fake_disk_usage) + + +def test_collect_reports_docker_dir_disk_percent(monkeypatch): + svc = _make_service() + _patch_psutil(monkeypatch) + svc._collect_and_report() + svc._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set.assert_called_once_with( + _DISK_DOCKER, attributes={"node_id": "node-abc", "worker_ip": "10.0.0.5"} + ) + + +def test_collect_reports_log_dir_disk_percent(monkeypatch): + svc = _make_service() + _patch_psutil(monkeypatch) + svc._collect_and_report() + svc._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT].set.assert_called_once_with( + _DISK_LOG, attributes={"node_id": "node-abc", "worker_ip": "10.0.0.5"} + ) + + +def test_docker_dir_gauge_skipped_when_docker_root_is_none(monkeypatch): + svc = _make_service(docker_root=None) + _patch_psutil(monkeypatch) + svc._collect_and_report() + svc._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set.assert_not_called() + + +def test_log_dir_gauge_skipped_when_log_dir_is_none(monkeypatch): + svc = _make_service(log_dir=None) + _patch_psutil(monkeypatch) + svc._collect_and_report() + svc._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT].set.assert_not_called() + + +def test_both_gauges_skipped_when_both_dirs_absent(monkeypatch): + svc = _make_service(docker_root=None, log_dir=None) + _patch_psutil(monkeypatch) + svc._collect_and_report() + svc._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set.assert_not_called() + svc._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT].set.assert_not_called() + + +def test_attributes_contain_node_id_and_worker_ip(monkeypatch): + svc = _make_service() + svc._node_id = "node-xyz" + svc._worker_ip = "192.168.1.99" + _patch_psutil(monkeypatch) + svc._collect_and_report() + call_kwargs = svc._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set.call_args[1] + assert call_kwargs["attributes"]["node_id"] == "node-xyz" + assert call_kwargs["attributes"]["worker_ip"] == "192.168.1.99" + + +@pytest.mark.asyncio +async def test_start_is_idempotent(monkeypatch): + svc = _make_service() + tasks_created = [] + + def fake_create_task(coro): + tasks_created.append(coro) + coro.close() + return MagicMock() + + monkeypatch.setattr("rock.rocklet.monitor.asyncio.create_task", fake_create_task) + await svc.start() + await svc.start() + assert len(tasks_created) == 1