Ray job backend (kind="ray")
The default Ray job backend uses Ray Core with a detached registry actor and detached per-job controller actors.
This page explains why Ray is a good fit, how the default "ray" job backend maps onto Ray's execution model, and how to use it from checkmaite. For the direct process-local Ray task-based job backend, see Ray simple job backend.
Why Ray
Ray is a distributed Python runtime designed for:
- task-parallel execution,
- actor-based stateful services, where an actor is a named Python object that keeps state across remote method calls,
- and dynamic CPU/GPU scheduling.
That lines up well with checkmaite's needs:
- notebook users want to submit work without blocking,
- capabilities may need CPUs or GPUs,
- development should work locally while the same API scales to larger infrastructure when available.
Ray's execution model in one paragraph
At the Ray Core level, distributed computation is built from a few primitives:
ray.init(...)connects to or starts a Ray runtime,ray.remote(...)turns a Python function or class into distributed work,- calling
.remote(...)submits that work and returns anObjectRef, ray.get(...)waits for the value,ray.wait(...)checks whether it is ready,- and
ray.cancel(...)requests cancellation.
The current checkmaite job backend uses one detached per-job controller actor plus one Ray task per submitted capability run. The controller actor owns the live ObjectRef, while the public RayJob handle reads shared lifecycle metadata from the registry.
End-to-end flow
sequenceDiagram
participant Client
participant JobBackend as RayJobBackend
participant Registry as Detached registry actor
participant Controller as Detached job controller actor
participant Ray as Ray runtime
participant Worker as Ray worker
participant Store as AnalyticsStore
Client->>JobBackend: submit_capability(...)
JobBackend->>Registry: register_or_get(scope, scoped_run_key)
alt existing
Registry-->>JobBackend: existing job metadata + controller name
JobBackend-->>Client: RayJob (reattached)
else new
Registry-->>JobBackend: new reservation
JobBackend->>Controller: create/get named detached controller
JobBackend->>Controller: start(capability, kwargs, resources)
Controller->>Registry: mark_running(...)
Controller->>Ray: ray.remote(_execute_capability_ref).remote(...)
JobBackend-->>Client: RayJob
Ray->>Worker: execute _execute_capability_ref
Worker->>Worker: capability.run(..., use_cache=False)
Worker->>Store: write_with_receipt([run])
Store-->>Worker: payload URI
Worker-->>Controller: CapabilityRunRef or exception
Controller->>Registry: update_terminal(..., result_ref/error)
Registry->>Registry: retain controller briefly, then cleanup/sweep
end
The registry/controller split is the key design shown in the flow:
- The registry actor is the shared directory for job metadata.
register_or_get(...)either reserves a new job ID or returns an existing record for the same scoped run key. Laterget_job(...)andlist_jobs(...)calls read from this registry, so clients do not need the original submitting process to still be alive. - The controller actor is the per-job owner of live execution. After the registry accepts a new reservation, the backend creates or finds the named detached controller, asks it to mark the job
RUNNING, and the controller launches the Ray worker task. - The controller owns the Ray task
ObjectRef, not the notebook/client. It watches the task complete, handles cancellation requests, and writes terminal status plus the serializedCapabilityRunRefback to the registry. - The public
RayJobhandle is therefore a lightweight client-side view over shared registry state and, while the controller is retained, controller reconciliation/cancellation methods.
Public usage
1. Configure the job backend
from checkmaite.jobs import configure_job_backend
configure_job_backend(
"ray",
address="local",
analytics_store={"backend": "parquet", "uri": "./analytics_store"},
idempotency_scope="team-a-notebooks",
registry_namespace="checkmaite_jobs",
)
Important:
analytics_store=...is required,idempotency_scope=...is required and has no default,- capability-local caching is disabled in job submission mode;
use_cache=Trueis rejected because Ray workers are ephemeral and do not share a local cache, - the scope should be a stable workspace, project, or experiment identifier,
- analytics-store configuration is separate from Ray connection/runtime settings,
- and it is forwarded to worker tasks so they know where durable results should be written.
- submission is deduplicated by
(idempotency_scope, scoped_run_key), andget_job(job_id)/list_jobs()can reattach across client restarts as long as the same actor identity and scope are reused.
2. Submit work
from checkmaite.jobs import submit_capability
job = submit_capability(
capability,
datasets=[dataset],
models=[model],
metrics=[metric],
config=config,
use_cache=False,
)
use_cache=False is not just a recommendation: job submission rejects
use_cache=True. Worker-local caches are ephemeral and are not shared with the
client or other workers. Reuse in this backend comes from registry dedupe for the
same (idempotency_scope, scoped_run_key) and durable analytics-store writes;
future shared-cache support would need an explicit remote cache backend.
3. Inspect lifecycle and retrieve the result reference
print(job.job_id)
print(job.status)
print(job.wait(timeout=0.1))
ref = job.result(timeout=300)
print(ref.run_uid)
print(ref.store_uri)
print(ref.outputs_uri) # None today
The returned object is CapabilityRunRef, not a full CapabilityRunBase.
4. List jobs
from checkmaite.jobs import JobStatus, list_jobs
recent = list_jobs(limit=100)
completed = list_jobs(limit=50, status_filter=JobStatus.COMPLETED)
list_jobs(...) is intentionally limited/paginated. The registry applies
filters and limits before copying records back to the client. Handles returned
from listing are attached to the current backend instance, so shutdown(wait=True)
treats them as tracked jobs. Use before_submitted_at_ts as a simple cursor for
older pages.
Resource scheduling
The job backend resolves CPU/GPU requirements in the following order:
- explicit
resources={...}passed at submission time, - hints on the config object (
config.num_cpus,config.num_gpus), - capability defaults (
default_num_cpus,default_num_gpus), - fallback (
num_cpus=1,num_gpus=0).
Example:
job = submit_capability(
capability,
datasets=[dataset],
resources={"num_cpus": 4, "num_gpus": 1},
)
Status mapping
RayJob is a thin protocol wrapper over shared registry lifecycle state.
Primary mapping comes from registry state:
SUBMITTING->PENDINGRUNNING/CANCELLING->RUNNINGCOMPLETED->COMPLETEDFAILED->FAILEDCANCELLED->CANCELLED
RayJob does not own or use the live task ObjectRef directly. The detached
controller owns that reference. Client handles resolve terminal payloads from
registry metadata (result_ref) and ask the controller to reconcile registry
state from its owned task when needed.
Timeouts and cancellation
from checkmaite.jobs import JobTimeoutError
try:
ref = job.result(timeout=120)
except JobTimeoutError:
job.cancel()
raise
Notes:
job.result(timeout=...)is a client-side wait timeout,- timing out does not automatically cancel the remote task,
job.cancel()records shared cancellation intent (CANCELLING) and asks the detached controller actor to callray.cancel(...)when the controller is reachable,CANCELLINGstill maps to publicRUNNINGuntil the controller observes and commits a terminal state,- if the cancellation request is dispatched but the acknowledgement times out,
cancel()may returnTrueeven though final cancellation is still pending, - if the controller cannot be reached, cancellation returns
Falsebecause the handle cannot prove that the underlying Ray task stopped, - if the controller has already been cleaned up then the job is already terminal and cancellation is a no-op.
Shutdown semantics
RayJobBackend.shutdown(wait=True) waits for jobs attached to that backend
instance and then calls ray.shutdown() for the local client process.
RayJobBackend.shutdown(wait=False) is intentionally a non-blocking local-handle
handoff: it does not wait for jobs and does not call ray.shutdown(). This is
used by configure_job_backend(...) when replacing the active backend so running
detached controllers can continue in the Ray cluster and a new backend can
reattach to them. Call ray.shutdown() explicitly, or use shutdown(wait=True),
when you really want to disconnect the local Ray runtime.
Registry configuration notes
Keep registry_namespace and idempotency_scope stable across sessions if you
want reattach behavior. By default, the registry actor name is a deterministic
hash of idempotency_scope, which avoids accidental collisions between unrelated
teams on the same Ray cluster. Pass registry_actor_name explicitly only when
clients should intentionally share one registry actor across scopes.
Different scopes intentionally do not dedupe with each other.
Pending-call limits and backpressure
The registry is a single serialized actor, so queued registry calls should be
bounded. By default, registry_max_pending_calls=1024 caps queued calls on each
registry actor handle and controller_max_pending_calls=64 caps queued calls on
each per-job controller actor handle. These limits protect the Ray control plane
from unbounded memory growth during bursty submits, polling, or cancellation.
If Ray rejects a call because an actor pending-call limit is full, client APIs
raise BackpressureError. Treat that as a retryable overload signal: retry with
exponential backoff and jitter, reduce client-side concurrency, or tune
registry_max_pending_calls / controller_max_pending_calls for expected burst
size. Higher limits absorb larger bursts but increase memory use and tail
latency; lower limits fail fast and protect the registry/controller actors.
Passing None opts back into Ray's unbounded pending-call behavior and should be
reserved for controlled local/debug workloads. These options are applied when an
actor is created; an already-running detached registry/controller actor keeps the
limits it was created with until that actor is replaced.
Dedupe policy in the current implementation:
SUBMITTING,RUNNING, andCOMPLETEDrecords keep the dedupe key, so a duplicate submit in the same scope returns the existing job.FAILEDandCANCELLEDrecords release the dedupe key, so a later submit of the same logical work can create a fresh job.- expired
SUBMITTINGreservations are markedFAILEDand also release the dedupe key. - stale
RUNNING/CANCELLINGrecords whose controller heartbeat lease expires are markedFAILEDby registry sweep and release the dedupe key.
Controller terminal updates are best-effort and bounded by
registry_update_timeout_s (default: 5 seconds). If one terminal update times
out, the controller keeps the terminal result in its own actor state, retries the
registry commit with bounded backoff, and clients can also commit that terminal
state on the next observe/reconcile path. While a live controller still has
uncommitted terminal state, its heartbeat keeps the registry lease fresh so the
job is not swept as stale.
Assumptions
Memory and payload assumptions
The registry actor is intentionally thin. Its Python dictionaries live in the registry actor worker's heap, not in Ray's object store. Registry records should stay small and JSON-like:
- job id, scope, status,
- timestamps,
- controller actor name,
- short error summary,
- small terminal
CapabilityRunRefmetadata.
The registry and controllers must not store large logs, model weights, datasets,
checkpoints, full CapabilityRunBase payloads, or unbounded history. Large data
belongs in the analytics store, object storage, shared filesystems, databases,
or platform logging systems. Terminal job records are retained only for the
configured registry retention window/count; once purged, their dedupe entries are
also removed and identical future submissions may create fresh jobs. If markdown
summaries or reports become large, store them externally and keep only a
URI/preview in CapabilityRunRef.summary.
During controller.start(...), the controller temporarily receives the
capability and run arguments so it can launch the worker task. It does not retain
those objects on actor state after submission. If future workloads involve very
large submission payloads, a more memory-conscious design can pass a Ray object
store reference instead:
client -> ray.put(submission_payload)
client -> controller.start(payload_ref)
controller -> worker task consumes payload_ref
That optimization trades controller heap pressure for Ray object-store pressure and should be introduced only with explicit object-lifetime management.
Cluster-lifetime and durability assumptions
Detached registry/controller actors survive notebook or client-process exits, but they do not survive Ray cluster loss. Current behavior assumes:
- if a client process exits but the Ray cluster remains alive, running jobs can continue and later clients can reattach using the same derived or explicit registry actor name, namespace, and idempotency scope;
- if the Ray cluster dies, in-memory registry state, controller actors, and running tasks are lost;
- durable run data that was successfully written before cluster loss remains in the configured analytics store;
- jobs that were
RUNNINGwhen the cluster died require a future persistent job-metadata store and recovery policy to mark failed, reconcile from the analytics store, or resubmit.
The current in-memory registry is not a long-term job-history database. A production deployment that needs history across cluster restarts should add a persistent registry metadata store (for example Postgres, MongoDB, or object storage) with one small row/document per job. The current record shape is kept small and serializable to leave room for that future durable store.
Scaling assumptions
This design targets relatively heavyweight ML evaluation jobs where each job runs long enough that one controller actor per job is acceptable. It is not a high-throughput system for huge numbers of tiny tasks.
Operational considerations:
- each detached controller actor has Ray control-plane metadata and Python heap overhead;
controller_num_cpusdefaults to a small non-zero value to give the Ray scheduler backpressure;controller_memorycan reserve Ray actor heap memory when supported by the deployed Ray version;- platform deployments can use
controller_resourcesto place controllers on control-plane nodes via custom Ray resources; - terminal controller retention must be bounded with
controller_retention_sandmax_retained_terminal_controllers.
If a deployment needs thousands of concurrent small jobs or very high submit rates, a fixed controller-pool design may be better than one detached controller actor per job. In that model, a bounded set of actors owns many job refs and the registry maps each job to a pool actor. That design reduces actor churn but is more complex and has a larger failure domain per pool actor.