Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rock/actions/sandbox/sandbox_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ class SandboxInfo(TypedDict, total=False):
create_time: str
start_time: str
stop_time: str
extended_params: dict[str, str]
9 changes: 9 additions & 0 deletions rock/sandbox/operator/abstract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod

from rock.actions.sandbox.sandbox_info import SandboxInfo
from rock.admin.core.redis_key import alive_sandbox_key
from rock.config import RuntimeConfig
from rock.deployments.config import DeploymentConfig
from rock.utils.providers.nacos_provider import NacosConfigProvider
Expand Down Expand Up @@ -29,3 +30,11 @@ def set_redis_provider(self, redis_provider: RedisProvider):

def set_nacos_provider(self, nacos_provider: NacosConfigProvider):
self._nacos_provider = nacos_provider

async def get_sandbox_info_from_redis(self, sandbox_id: str) -> dict | None:
if not self._redis_provider:
raise RuntimeError("Redis provider is not configured")
sandbox_status = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$")
if sandbox_status and len(sandbox_status) > 0:
return sandbox_status[0]
return None
187 changes: 143 additions & 44 deletions rock/sandbox/operator/k8s/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"""

import asyncio
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any

from aiolimiter import AsyncLimiter
Expand Down Expand Up @@ -54,7 +56,6 @@ def __init__(
watch_timeout_seconds: Watch timeout before reconnect (default: 60)
watch_reconnect_delay_seconds: Delay after watch failure (default: 5)
"""
self._api_client = api_client
self._group = group
self._version = version
self._plural = plural
Expand All @@ -73,6 +74,11 @@ def __init__(
self._cache_lock = asyncio.Lock()
self._watch_task = None
self._initialized = False
self._event_queue: asyncio.Queue | None = None
self._watch_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="k8s-watch-")
self._stop_event = threading.Event()
self._resource_version: str | None = None
self._resource_version_lock = threading.Lock()

async def start(self):
"""Start the API client and initialize cache watch."""
Expand Down Expand Up @@ -105,74 +111,167 @@ async def _list_and_sync_cache(self) -> str:
name = item.get("metadata", {}).get("name")
if name:
self._cache[name] = item
return resource_version
self._advance_resource_version(resource_version)
return self._resource_version

async def _watch_resources(self):
"""Background task to watch resources and maintain cache.
def _watch_in_thread(self, resource_version: str | None, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
"""Watch resources in a background thread and put events to queue.

Implements Kubernetes Informer pattern:
1. Initial list-and-sync to populate cache
2. Continuous watch for ADDED/MODIFIED/DELETED events
3. Auto-reconnect on watch timeout or network failures
4. Re-sync on reconnect to avoid event loss
Args:
resource_version: The resource version to watch from
event_queue: Queue to put events for async processing
loop: The asyncio event loop to use for queue operations
"""
resource_version = None
try:
resource_version = await self._list_and_sync_cache()
logger.info(
f"Initial cache populated with {len(self._cache)} resources, resourceVersion={resource_version}"
)
w = watch.Watch()
for event in w.stream(
self._custom_api.list_namespaced_custom_object,
group=self._group,
version=self._version,
namespace=self._namespace,
plural=self._plural,
resource_version=resource_version,
timeout_seconds=self._watch_timeout_seconds,
):
if self._stop_event.is_set():
break
# Put event to queue for async processing
asyncio.run_coroutine_threadsafe(event_queue.put(("event", event)), loop)
except Exception as e:
logger.error(f"Failed to populate initial cache: {e}")
logger.warning(f"Watch thread error: {e}")
finally:
# Signal thread exit
asyncio.run_coroutine_threadsafe(event_queue.put(("exit", None)), loop)

while True:
def _advance_resource_version(self, rv: str | None) -> bool:
"""Advance resource_version only when rv is strictly newer.

K8s resourceVersions are opaque strings but etcd encodes them as
monotonically increasing integers. Prevents stale responses from
rolling back the watch cursor.

Returns:
True if version was updated, False if skipped (old version or error)
"""
if not rv:
return False
with self._resource_version_lock:
if self._resource_version is None:
self._resource_version = rv
return True
try:
if int(rv) > int(self._resource_version):
self._resource_version = rv
return True
return False
except ValueError:
# Non-integer resourceVersion — skip to avoid downgrade
logger.error(f"Non-integer resourceVersion detected: rv={rv}, current={self._resource_version}, skipping")
return False

async def _process_events(self, event_queue: asyncio.Queue, thread_future: Future) -> str | None:
"""Process events from queue and update cache.

def _watch_in_thread():
w = watch.Watch()
stream = w.stream(
self._custom_api.list_namespaced_custom_object,
group=self._group,
version=self._version,
namespace=self._namespace,
plural=self._plural,
resource_version=resource_version,
timeout_seconds=self._watch_timeout_seconds,
)
events = []
for event in stream:
events.append(event)
return events
Args:
event_queue: Queue containing events from watch thread
thread_future: Future representing the watch thread

Returns:
Latest resource version or None if disconnected
"""
while True:
try:
msg_type, event = await asyncio.wait_for(event_queue.get(), timeout=1.0)

events = await asyncio.to_thread(_watch_in_thread)
if msg_type == "exit": # Watch thread exited
break

async with self._cache_lock:
for event in events:
event_type = event["type"]
obj = event["object"]
name = obj.get("metadata", {}).get("name")
new_rv = obj.get("metadata", {}).get("resourceVersion")
if msg_type == "event":
event_type = event["type"]
obj = event["object"]
name = obj.get("metadata", {}).get("name")
new_rv = obj.get("metadata", {}).get("resourceVersion")

if new_rv:
resource_version = new_rv
# Skip event if no resourceVersion or version is stale
if not new_rv:
continue
if not self._advance_resource_version(new_rv):
continue

if not name:
continue
if not name:
continue

async with self._cache_lock:
if event_type in ["ADDED", "MODIFIED"]:
self._cache[name] = obj
elif event_type == "DELETED":
self._cache.pop(name, None)

logger.debug(f"Cache updated: {event_type} {name}, rv={self._resource_version}")

except asyncio.TimeoutError:
# Check if thread has exited
if thread_future.done() and event_queue.empty():
break
continue
except asyncio.CancelledError:
raise

return self._resource_version

async def _watch_resources(self):
"""Background task to watch resources and maintain cache.

Implements Kubernetes Informer pattern:
1. Initial list-and-sync to populate cache
2. Continuous watch for ADDED/MODIFIED/DELETED events (real-time)
3. Auto-reconnect on watch timeout or network failures
4. Re-sync on reconnect to avoid event loss
"""
try:
await self._list_and_sync_cache()
logger.info(
f"Initial cache populated with {len(self._cache)} resources, resourceVersion={self._resource_version}"
)
except Exception as e:
logger.error(f"Failed to populate initial cache: {e}")

self._event_queue = asyncio.Queue()
loop = asyncio.get_event_loop()

while True:
try:
self._stop_event.clear()

# Start watch in background thread
thread_future = self._watch_executor.submit(
self._watch_in_thread,
self._resource_version,
self._event_queue,
loop
)

# Process events in real-time
await self._process_events(self._event_queue, thread_future)

# Wait for thread to complete
if not thread_future.done():
self._stop_event.set()
try:
thread_future.result(timeout=5.0)
except Exception:
pass

except asyncio.CancelledError:
logger.info("Watch task cancelled")
self._stop_event.set()
raise
except Exception as e:
logger.warning(f"Watch stream disconnected: {e}, reconnecting immediately...")
try:
resource_version = await self._list_and_sync_cache()
await self._list_and_sync_cache()
logger.info(
f"Re-synced cache with {len(self._cache)} resources, resourceVersion={resource_version}"
f"Re-synced cache with {len(self._cache)} resources, resourceVersion={self._resource_version}"
)
except Exception as list_err:
logger.error(
Expand Down
1 change: 1 addition & 0 deletions rock/sandbox/operator/k8s/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class K8sConstants:
# Extension keys for DockerDeploymentConfig.extended_params
EXT_POOL_NAME = "pool_name"
EXT_TEMPLATE_NAME = "template_name"
EXT_RESOURCE_VERSION = "resourceVersion"

# Nacos config keys
NACOS_POOLS_KEY = "pools"
71 changes: 48 additions & 23 deletions rock/sandbox/operator/k8s/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,55 @@
from rock.deployments.config import DockerDeploymentConfig
from rock.logger import init_logger
from rock.sandbox.operator.abstract import AbstractOperator
from rock.sandbox.operator.k8s.constants import K8sConstants
from rock.sandbox.operator.k8s.provider import BatchSandboxProvider

logger = init_logger(__name__)


def _merge_sandbox_info(redis_info: dict, sandbox_info: SandboxInfo) -> SandboxInfo:
"""Merge Redis cached info with Provider real-time status.

Merge rules:
1. Compare resourceVersion in extended_params, use newer data if available
2. Base fields: sandbox_info overrides redis_info (real-time status takes priority)
3. extended_params: deep merge, values from sandbox_info take priority

Args:
redis_info: Cached info from Redis (contains user_id, etc.)
sandbox_info: Real-time status from Provider (IP, port_mapping, is_alive, etc.)

Returns:
Merged SandboxInfo
"""
redis_extended = redis_info.get("extended_params", {}) or {}
sandbox_extended = sandbox_info.get("extended_params", {}) or {}

# Check resourceVersion: return redis_info if it has newer version
redis_rv = redis_extended.get(K8sConstants.EXT_RESOURCE_VERSION)
sandbox_rv = sandbox_extended.get(K8sConstants.EXT_RESOURCE_VERSION)
if redis_rv is not None and sandbox_rv is not None:
try:
if int(redis_rv) > int(sandbox_rv):
return redis_info
except (ValueError, TypeError) as e:
raise ValueError(f"Invalid resourceVersion format: redis_rv={redis_rv}, sandbox_rv={sandbox_rv}") from e

# Deep merge extended_params
merged_extended = dict(redis_extended)
merged_extended.update(sandbox_extended)

# Merge base fields (sandbox_info takes priority)
merged = dict(redis_info)
merged.update(sandbox_info)

# Set the merged extended_params
if merged_extended:
merged["extended_params"] = merged_extended

return merged


class K8sOperator(AbstractOperator):
"""Operator for managing sandboxes via Kubernetes BatchSandbox CRD."""

Expand Down Expand Up @@ -62,32 +106,13 @@ async def get_status(self, sandbox_id: str) -> SandboxInfo:

# Get user info from redis if available
if self._redis_provider:
redis_info = await self._get_sandbox_info_from_redis(sandbox_id)
redis_info = await self.get_sandbox_info_from_redis(sandbox_id)
if redis_info:
redis_info.update(sandbox_info)
return redis_info

return _merge_sandbox_info(redis_info, sandbox_info)
else:
raise Exception(f"Sandbox {sandbox_id} not found in Redis")
return sandbox_info

async def _get_sandbox_info_from_redis(self, sandbox_id: str) -> dict | None:
"""Get sandbox user info from Redis.

Args:
sandbox_id: Sandbox identifier

Returns:
Sandbox info dict from Redis or None if not found
"""
from rock.admin.core.redis_key import alive_sandbox_key

try:
sandbox_status = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$")
if sandbox_status and len(sandbox_status) > 0:
return sandbox_status[0]
except Exception as e:
logger.debug(f"Failed to get sandbox info from redis for {sandbox_id}: {e}")
return None

async def stop(self, sandbox_id: str) -> bool:
"""Stop and delete a sandbox.

Expand Down
Loading
Loading