Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rock/actions/sandbox/sandbox_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ class SandboxInfo(TypedDict, total=False):
create_time: str
start_time: str
stop_time: str
extended_params: dict[str, str]
9 changes: 9 additions & 0 deletions rock/sandbox/operator/abstract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod

from rock.actions.sandbox.sandbox_info import SandboxInfo
from rock.admin.core.redis_key import alive_sandbox_key
from rock.config import RuntimeConfig
from rock.deployments.config import DeploymentConfig
from rock.utils.providers.nacos_provider import NacosConfigProvider
Expand Down Expand Up @@ -29,3 +30,11 @@ def set_redis_provider(self, redis_provider: RedisProvider):

def set_nacos_provider(self, nacos_provider: NacosConfigProvider):
self._nacos_provider = nacos_provider

async def get_sandbox_info_from_redis(self, sandbox_id: str) -> dict | None:
if not self._redis_provider:
raise RuntimeError("Redis provider is not configured")
sandbox_status = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$")
if sandbox_status and len(sandbox_status) > 0:
return sandbox_status[0]
return None
187 changes: 143 additions & 44 deletions rock/sandbox/operator/k8s/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"""

import asyncio
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any

from aiolimiter import AsyncLimiter
Expand Down Expand Up @@ -54,7 +56,6 @@ def __init__(
watch_timeout_seconds: Watch timeout before reconnect (default: 60)
watch_reconnect_delay_seconds: Delay after watch failure (default: 5)
"""
self._api_client = api_client
self._group = group
self._version = version
self._plural = plural
Expand All @@ -73,6 +74,11 @@ def __init__(
self._cache_lock = asyncio.Lock()
self._watch_task = None
self._initialized = False
self._event_queue: asyncio.Queue | None = None
self._watch_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="k8s-watch-")
self._stop_event = threading.Event()
self._resource_version: str | None = None
self._resource_version_lock = threading.Lock()

async def start(self):
"""Start the API client and initialize cache watch."""
Expand Down Expand Up @@ -105,74 +111,167 @@ async def _list_and_sync_cache(self) -> str:
name = item.get("metadata", {}).get("name")
if name:
self._cache[name] = item
return resource_version
self._advance_resource_version(resource_version)
return self._resource_version

async def _watch_resources(self):
"""Background task to watch resources and maintain cache.
def _watch_in_thread(self, resource_version: str | None, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
"""Watch resources in a background thread and put events to queue.

Implements Kubernetes Informer pattern:
1. Initial list-and-sync to populate cache
2. Continuous watch for ADDED/MODIFIED/DELETED events
3. Auto-reconnect on watch timeout or network failures
4. Re-sync on reconnect to avoid event loss
Args:
resource_version: The resource version to watch from
event_queue: Queue to put events for async processing
loop: The asyncio event loop to use for queue operations
"""
resource_version = None
try:
resource_version = await self._list_and_sync_cache()
logger.info(
f"Initial cache populated with {len(self._cache)} resources, resourceVersion={resource_version}"
)
w = watch.Watch()
for event in w.stream(
self._custom_api.list_namespaced_custom_object,
group=self._group,
version=self._version,
namespace=self._namespace,
plural=self._plural,
resource_version=resource_version,
timeout_seconds=self._watch_timeout_seconds,
):
if self._stop_event.is_set():
break
# Put event to queue for async processing
asyncio.run_coroutine_threadsafe(event_queue.put(("event", event)), loop)
except Exception as e:
logger.error(f"Failed to populate initial cache: {e}")
logger.warning(f"Watch thread error: {e}")
finally:
# Signal thread exit
asyncio.run_coroutine_threadsafe(event_queue.put(("exit", None)), loop)

while True:
def _advance_resource_version(self, rv: str | None) -> bool:
"""Advance resource_version only when rv is strictly newer.

K8s resourceVersions are opaque strings but etcd encodes them as
monotonically increasing integers. Prevents stale responses from
rolling back the watch cursor.

Returns:
True if version was updated, False if skipped (old version or error)
"""
if not rv:
return False
with self._resource_version_lock:
if self._resource_version is None:
self._resource_version = rv
return True
try:
if int(rv) > int(self._resource_version):
self._resource_version = rv
return True
return False
except ValueError:
# Non-integer resourceVersion — skip to avoid downgrade
logger.error(f"Non-integer resourceVersion detected: rv={rv}, current={self._resource_version}, skipping")
return False

async def _process_events(self, event_queue: asyncio.Queue, thread_future: Future) -> str | None:
"""Process events from queue and update cache.

def _watch_in_thread():
w = watch.Watch()
stream = w.stream(
self._custom_api.list_namespaced_custom_object,
group=self._group,
version=self._version,
namespace=self._namespace,
plural=self._plural,
resource_version=resource_version,
timeout_seconds=self._watch_timeout_seconds,
)
events = []
for event in stream:
events.append(event)
return events
Args:
event_queue: Queue containing events from watch thread
thread_future: Future representing the watch thread

Returns:
Latest resource version or None if disconnected
"""
while True:
try:
msg_type, event = await asyncio.wait_for(event_queue.get(), timeout=1.0)

events = await asyncio.to_thread(_watch_in_thread)
if msg_type == "exit": # Watch thread exited
break

async with self._cache_lock:
for event in events:
event_type = event["type"]
obj = event["object"]
name = obj.get("metadata", {}).get("name")
new_rv = obj.get("metadata", {}).get("resourceVersion")
if msg_type == "event":
event_type = event["type"]
obj = event["object"]
name = obj.get("metadata", {}).get("name")
new_rv = obj.get("metadata", {}).get("resourceVersion")

if new_rv:
resource_version = new_rv
# Skip event if no resourceVersion or version is stale
if not new_rv:
continue
if not self._advance_resource_version(new_rv):
continue

if not name:
continue
if not name:
continue

async with self._cache_lock:
if event_type in ["ADDED", "MODIFIED"]:
self._cache[name] = obj
elif event_type == "DELETED":
self._cache.pop(name, None)

logger.debug(f"Cache updated: {event_type} {name}, rv={self._resource_version}")

except asyncio.TimeoutError:
# Check if thread has exited
if thread_future.done() and event_queue.empty():
break
continue
except asyncio.CancelledError:
raise

return self._resource_version

async def _watch_resources(self):
"""Background task to watch resources and maintain cache.

Implements Kubernetes Informer pattern:
1. Initial list-and-sync to populate cache
2. Continuous watch for ADDED/MODIFIED/DELETED events (real-time)
3. Auto-reconnect on watch timeout or network failures
4. Re-sync on reconnect to avoid event loss
"""
try:
await self._list_and_sync_cache()
logger.info(
f"Initial cache populated with {len(self._cache)} resources, resourceVersion={self._resource_version}"
)
except Exception as e:
logger.error(f"Failed to populate initial cache: {e}")

self._event_queue = asyncio.Queue()
loop = asyncio.get_event_loop()

while True:
try:
self._stop_event.clear()

# Start watch in background thread
thread_future = self._watch_executor.submit(
self._watch_in_thread,
self._resource_version,
self._event_queue,
loop
)

# Process events in real-time
await self._process_events(self._event_queue, thread_future)

# Wait for thread to complete
if not thread_future.done():
self._stop_event.set()
try:
thread_future.result(timeout=5.0)
except Exception:
pass

except asyncio.CancelledError:
logger.info("Watch task cancelled")
self._stop_event.set()
raise
except Exception as e:
logger.warning(f"Watch stream disconnected: {e}, reconnecting immediately...")
try:
resource_version = await self._list_and_sync_cache()
await self._list_and_sync_cache()
logger.info(
f"Re-synced cache with {len(self._cache)} resources, resourceVersion={resource_version}"
f"Re-synced cache with {len(self._cache)} resources, resourceVersion={self._resource_version}"
)
except Exception as list_err:
logger.error(
Expand Down
1 change: 1 addition & 0 deletions rock/sandbox/operator/k8s/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class K8sConstants:
# Extension keys for DockerDeploymentConfig.extended_params
EXT_POOL_NAME = "pool_name"
EXT_TEMPLATE_NAME = "template_name"
EXT_RESOURCE_VERSION = "k8s_resource_version"

# Nacos config keys
NACOS_POOLS_KEY = "pools"
71 changes: 48 additions & 23 deletions rock/sandbox/operator/k8s/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,55 @@
from rock.deployments.config import DockerDeploymentConfig
from rock.logger import init_logger
from rock.sandbox.operator.abstract import AbstractOperator
from rock.sandbox.operator.k8s.constants import K8sConstants
from rock.sandbox.operator.k8s.provider import BatchSandboxProvider

logger = init_logger(__name__)


def _merge_sandbox_info(redis_info: dict, sandbox_info: SandboxInfo) -> SandboxInfo:
"""Merge Redis cached info with Provider real-time status.

Merge rules:
1. Compare resourceVersion in extended_params, use newer data if available
2. Base fields: sandbox_info overrides redis_info (real-time status takes priority)
3. extended_params: deep merge, values from sandbox_info take priority

Args:
redis_info: Cached info from Redis (contains user_id, etc.)
sandbox_info: Real-time status from Provider (IP, port_mapping, is_alive, etc.)

Returns:
Merged SandboxInfo
"""
redis_extended = redis_info.get("extended_params", {}) or {}
sandbox_extended = sandbox_info.get("extended_params", {}) or {}

# Check resourceVersion: return redis_info if it has newer version
redis_rv = redis_extended.get(K8sConstants.EXT_RESOURCE_VERSION)
sandbox_rv = sandbox_extended.get(K8sConstants.EXT_RESOURCE_VERSION)
if redis_rv is not None and sandbox_rv is not None:
try:
if int(redis_rv) > int(sandbox_rv):
return redis_info
except (ValueError, TypeError) as e:
raise ValueError(f"Invalid resourceVersion format: redis_rv={redis_rv}, sandbox_rv={sandbox_rv}") from e

# Deep merge extended_params
merged_extended = dict(redis_extended)
merged_extended.update(sandbox_extended)

# Merge base fields (sandbox_info takes priority)
merged = dict(redis_info)
merged.update(sandbox_info)

# Set the merged extended_params
if merged_extended:
merged["extended_params"] = merged_extended

return merged


class K8sOperator(AbstractOperator):
"""Operator for managing sandboxes via Kubernetes BatchSandbox CRD."""

Expand Down Expand Up @@ -62,32 +106,13 @@ async def get_status(self, sandbox_id: str) -> SandboxInfo:

# Get user info from redis if available
if self._redis_provider:
redis_info = await self._get_sandbox_info_from_redis(sandbox_id)
redis_info = await self.get_sandbox_info_from_redis(sandbox_id)
if redis_info:
redis_info.update(sandbox_info)
return redis_info

return _merge_sandbox_info(redis_info, sandbox_info)
else:
raise Exception(f"Sandbox {sandbox_id} not found in Redis")
return sandbox_info

async def _get_sandbox_info_from_redis(self, sandbox_id: str) -> dict | None:
"""Get sandbox user info from Redis.

Args:
sandbox_id: Sandbox identifier

Returns:
Sandbox info dict from Redis or None if not found
"""
from rock.admin.core.redis_key import alive_sandbox_key

try:
sandbox_status = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$")
if sandbox_status and len(sandbox_status) > 0:
return sandbox_status[0]
except Exception as e:
logger.debug(f"Failed to get sandbox info from redis for {sandbox_id}: {e}")
return None

async def stop(self, sandbox_id: str) -> bool:
"""Stop and delete a sandbox.

Expand Down
Loading
Loading