Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rock/admin/metrics/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ class MetricsConstants:
TOTAL_MEM_RESOURCE = "resource.mem.total"
AVAILABLE_CPU_RESOURCE = "resource.cpu.available"
AVAILABLE_MEM_RESOURCE = "resource.mem.available"
WORKER_DISK_DOCKER_DIR_PERCENT = "resource.worker_pod.disk.docker_dir.percent"
WORKER_DISK_LOG_DIR_PERCENT = "resource.worker_pod.disk.log_dir.percent"

SANDBOX_PHASE_FAILURE = "sandbox.phase.failure"
2 changes: 2 additions & 0 deletions rock/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

ROCK_PIP_INDEX_URL: str | None = "https://mirrors.aliyun.com/pypi/simple/"
ROCK_MONITOR_ENABLE: bool = False
ROCK_DOCKER_ROOT: str | None = None
ROCK_PROJECT_ROOT: str | None = None
ROCK_WORKER_ENV_TYPE: str | None = "local"
ROCK_PYTHON_ENV_PATH: str | None = None
Expand Down Expand Up @@ -89,6 +90,7 @@
"ROCK_OSS_BUCKET_REGION": lambda: os.getenv("ROCK_OSS_BUCKET_REGION"),
"ROCK_PIP_INDEX_URL": lambda: os.getenv("ROCK_PIP_INDEX_URL", "https://mirrors.aliyun.com/pypi/simple/"),
"ROCK_MONITOR_ENABLE": lambda: os.getenv("ROCK_MONITOR_ENABLE", "false").lower() == "true",
"ROCK_DOCKER_ROOT": lambda: os.getenv("ROCK_DOCKER_ROOT"),
"ROCK_PROJECT_ROOT": lambda: os.getenv("ROCK_PROJECT_ROOT", str(Path(__file__).resolve().parents[1])),
"ROCK_WORKER_ENV_TYPE": lambda: os.getenv("ROCK_WORKER_ENV_TYPE", "local"),
"ROCK_PYTHON_ENV_PATH": lambda: os.getenv("ROCK_PYTHON_ENV_PATH", sys.base_prefix),
Expand Down
113 changes: 113 additions & 0 deletions rock/rocklet/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# rock/rocklet/monitor.py
"""Per-worker resource monitor that runs as a background asyncio task inside rocklet."""
import asyncio
import socket

import psutil
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader

from rock import env_vars
from rock.admin.metrics.constants import MetricsConstants
from rock.logger import init_logger
from rock.utils import get_uniagent_endpoint

logger = init_logger(__name__)


def _xrl_gateway_metric_name(name: str) -> str:
return f"xrl_gateway.{name}"


def _get_worker_ip() -> str:
try:
return socket.gethostbyname(socket.gethostname())
except socket.gaierror:
return "unknown"


class WorkerMonitorService:
"""Background worker-node resource monitor.

Collects psutil metrics every 10 s and pushes them to an OTLP endpoint.
Designed to run as an asyncio background task inside the rocklet process.

Configuration (environment variables):
ROCK_DOCKER_ROOT — docker data-root dir to monitor (optional)
ROCK_LOGGING_PATH — log directory to monitor (optional)
"""

_report_interval: int = 10
_export_interval_millis: int = 10_000

def __init__(self) -> None:
self._node_id: str = socket.gethostname()
self._worker_ip: str = _get_worker_ip()
self._docker_root: str | None = env_vars.ROCK_DOCKER_ROOT
self._log_dir: str | None = env_vars.ROCK_LOGGING_PATH
self._running: bool = False
self._gauges: dict = {}
self._init_metrics()

def _init_metrics(self) -> None:
host, port = get_uniagent_endpoint()
endpoint = f"http://{host}:{port}/v1/metrics"
logger.info(f"WorkerMonitorService OTLP endpoint: {endpoint}")
exporter = OTLPMetricExporter(endpoint=endpoint)
reader = PeriodicExportingMetricReader(exporter, export_interval_millis=self._export_interval_millis)
provider = MeterProvider(metric_readers=[reader])
meter = provider.get_meter(MetricsConstants.METRICS_METER_NAME)

self._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT] = meter.create_gauge(
name=_xrl_gateway_metric_name(MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT),
description="Docker root dir disk usage percent on worker node",
unit="1",
)
self._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT] = meter.create_gauge(
name=_xrl_gateway_metric_name(MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT),
description="Log dir disk usage percent on worker node",
unit="1",
)

async def start(self) -> None:
"""Start background monitoring loop (idempotent)."""
if self._running:
return
self._running = True
logger.info(
f"WorkerMonitorService starting: node_id={self._node_id} "
f"worker_ip={self._worker_ip} docker_root={self._docker_root!r} "
f"log_dir={self._log_dir!r}"
)
asyncio.create_task(self._monitor_loop())

def stop(self) -> None:
self._running = False

async def _monitor_loop(self) -> None:
while self._running:
try:
self._collect_and_report()
except Exception as e:
logger.warning(f"WorkerMonitorService collect failed on {self._worker_ip}: {e}")
await asyncio.sleep(self._report_interval)

def _collect_and_report(self) -> None:
attrs = {"node_id": self._node_id, "worker_ip": self._worker_ip}

if self._docker_root:
try:
self._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set(
psutil.disk_usage(self._docker_root).percent, attributes=attrs
)
except Exception as e:
logger.warning(f"disk_usage({self._docker_root!r}) failed: {e}")

if self._log_dir:
try:
self._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT].set(
psutil.disk_usage(self._log_dir).percent, attributes=attrs
)
except Exception as e:
logger.warning(f"disk_usage({self._log_dir!r}) failed: {e}")
32 changes: 31 additions & 1 deletion rock/rocklet/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import traceback
import uuid
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException, Request
from fastapi.exception_handlers import http_exception_handler
Expand All @@ -20,7 +21,26 @@
from rock.utils import EAGLE_EYE_TRACE_ID, REQUEST_TIMEOUT_SECONDS, sandbox_id_ctx_var, trace_id_ctx_var

logger = init_logger("rocklet.server")
app = FastAPI()

_enable_monitor: bool = False


@asynccontextmanager
async def lifespan(app: FastAPI):
if _enable_monitor:
from rock.rocklet.monitor import WorkerMonitorService
monitor = WorkerMonitorService()
await monitor.start()
logger.info("WorkerMonitorService started")
else:
monitor = None
yield
if monitor is not None:
monitor.stop()
logger.info("WorkerMonitorService stopped")


app = FastAPI(lifespan=lifespan)

app.include_router(local_router, tags=["local"])

Expand Down Expand Up @@ -121,8 +141,18 @@ def main():
parser = argparse.ArgumentParser(description="Run the ROCKLET server")
parser.add_argument("--host", default="0.0.0.0", help="Host to bind the server to")
parser.add_argument("--port", type=int, default=8000, help="Port to run the server on")
parser.add_argument(
"--enable-monitor",
action="store_true",
default=False,
help="Enable background worker-node resource monitoring (for worker nodes only)",
)

args = parser.parse_args(remaining_args)

global _enable_monitor
_enable_monitor = args.enable_monitor

uvicorn.run(app, host=args.host, port=args.port, access_log=False)


Expand Down
106 changes: 106 additions & 0 deletions tests/unit/rocklet/test_worker_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# tests/unit/rocklet/test_worker_monitor.py
"""Unit tests for WorkerMonitorService in rock/rocklet/monitor.py."""
from unittest.mock import MagicMock

import pytest

from rock.admin.metrics.constants import MetricsConstants
from rock.rocklet.monitor import WorkerMonitorService

_ALL_GAUGE_KEYS = [
MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT,
MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT,
]

_DISK_DOCKER = 55.0
_DISK_LOG = 30.0


def _make_service(docker_root="/var/lib/docker", log_dir="/var/log/rock") -> WorkerMonitorService:
"""Create a WorkerMonitorService with mocked gauges (no real OTLP)."""
svc = WorkerMonitorService.__new__(WorkerMonitorService)
svc._node_id = "node-abc"
svc._worker_ip = "10.0.0.5"
svc._docker_root = docker_root
svc._log_dir = log_dir
svc._running = False
svc._gauges = {key: MagicMock() for key in _ALL_GAUGE_KEYS}
return svc


def _patch_psutil(monkeypatch):
disk_values = {"/var/lib/docker": _DISK_DOCKER, "/var/log/rock": _DISK_LOG}

def fake_disk_usage(path):
m = MagicMock()
m.percent = disk_values.get(path, 0.0)
return m

monkeypatch.setattr("rock.rocklet.monitor.psutil.disk_usage", fake_disk_usage)


def test_collect_reports_docker_dir_disk_percent(monkeypatch):
svc = _make_service()
_patch_psutil(monkeypatch)
svc._collect_and_report()
svc._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set.assert_called_once_with(
_DISK_DOCKER, attributes={"node_id": "node-abc", "worker_ip": "10.0.0.5"}
)


def test_collect_reports_log_dir_disk_percent(monkeypatch):
svc = _make_service()
_patch_psutil(monkeypatch)
svc._collect_and_report()
svc._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT].set.assert_called_once_with(
_DISK_LOG, attributes={"node_id": "node-abc", "worker_ip": "10.0.0.5"}
)


def test_docker_dir_gauge_skipped_when_docker_root_is_none(monkeypatch):
svc = _make_service(docker_root=None)
_patch_psutil(monkeypatch)
svc._collect_and_report()
svc._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set.assert_not_called()


def test_log_dir_gauge_skipped_when_log_dir_is_none(monkeypatch):
svc = _make_service(log_dir=None)
_patch_psutil(monkeypatch)
svc._collect_and_report()
svc._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT].set.assert_not_called()


def test_both_gauges_skipped_when_both_dirs_absent(monkeypatch):
svc = _make_service(docker_root=None, log_dir=None)
_patch_psutil(monkeypatch)
svc._collect_and_report()
svc._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set.assert_not_called()
svc._gauges[MetricsConstants.WORKER_DISK_LOG_DIR_PERCENT].set.assert_not_called()


def test_attributes_contain_node_id_and_worker_ip(monkeypatch):
svc = _make_service()
svc._node_id = "node-xyz"
svc._worker_ip = "192.168.1.99"
_patch_psutil(monkeypatch)
svc._collect_and_report()
call_kwargs = svc._gauges[MetricsConstants.WORKER_DISK_DOCKER_DIR_PERCENT].set.call_args[1]
assert call_kwargs["attributes"]["node_id"] == "node-xyz"
assert call_kwargs["attributes"]["worker_ip"] == "192.168.1.99"


@pytest.mark.asyncio
async def test_start_is_idempotent(monkeypatch):
svc = _make_service()
tasks_created = []

def fake_create_task(coro):
tasks_created.append(coro)
coro.close()
return MagicMock()

monkeypatch.setattr("rock.rocklet.monitor.asyncio.create_task", fake_create_task)
await svc.start()
await svc.start()
assert len(tasks_created) == 1
Loading