Ray Job Submission Tutorial¶
A short guide to running a checkmaite capability asynchronously with the default registry-backed ray job backend.
When to use this backend¶
Use ray when jobs should be tracked through shared Ray actors so clients can list, fetch, and reattach to jobs while the Ray cluster remains alive.
Important assumptions:
idempotency_scopeis required and should identify your workspace or project;- the registry/controller actors live in the Ray cluster, not in the notebook process;
- completed results are written to the configured analytics store before
job.result()succeeds.
Setup¶
import tempfile
import uuid
from pathlib import Path
from checkmaite.core.analytics_store import AnalyticsStore, ParquetBackend
from checkmaite.core.object_detection import DataevalCleaning
from checkmaite.core.object_detection.dataset_loaders import CocoDetectionDataset
from checkmaite.jobs import (
JobStatus,
configure_job_backend,
get_job,
list_jobs,
shutdown_job_backend,
submit_capability,
)
# Find the repository root whether the notebook is run from docs/tool-usage or elsewhere.
REPO_ROOT = next(
path for path in [Path.cwd(), *Path.cwd().parents]
if (path / "pyproject.toml").exists()
)
# Use the tiny COCO fixture included with the repository.
dataset_root = REPO_ROOT / "tests/data_for_tests/coco_dataset"
dataset_ann = dataset_root / "ann_file.json"
dataset = CocoDetectionDataset(
root=str(dataset_root),
ann_file=str(dataset_ann),
dataset_id="coco-job-tutorial",
)
store_dir = Path(tempfile.mkdtemp(prefix="checkmaite_jobs_")) / "analytics_store"
analytics_store_config = {"backend": "parquet", "uri": str(store_dir)}
print(f"Dataset: {dataset.metadata['id']} ({len(dataset)} images)")
print(f"Analytics store: {store_dir}")
/home/runner/work/checkmaite/checkmaite/.venv/lib/python3.10/site-packages/xaitk_saliency/__init__.py:3: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81. import pkg_resources
/home/runner/work/checkmaite/checkmaite/.venv/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html from .autonotebook import tqdm as notebook_tqdm
2026-05-22 18:41:58,777 INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
Dataset: coco-job-tutorial (4 images) Analytics store: /tmp/checkmaite_jobs_eiljygz1/analytics_store
Configure the default ray backend¶
This local tutorial uses a unique idempotency_scope for isolation. In a shared environment, choose a stable scope for the project or workspace.
idempotency_scope = f"tool-usage-ray-{uuid.uuid4().hex}"
configure_job_backend(
"ray",
address="local",
force_reinit=True,
idempotency_scope=idempotency_scope,
analytics_store=analytics_store_config,
# Keep the local tutorial cluster tidy after terminal state is committed.
controller_retention_s=0.0,
max_retained_terminal_controllers=0,
)
print(f"Configured ray job backend with scope: {idempotency_scope}")
2026-05-22 18:42:03,989 INFO worker.py:2004 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
Configured ray job backend with scope: tool-usage-ray-923e30da49f8498baeb6e93f30dc06d1
/home/runner/work/checkmaite/checkmaite/.venv/lib/python3.10/site-packages/ray/_private/worker.py:2052: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0 warnings.warn(
Submit a capability job¶
capability = DataevalCleaning()
job = submit_capability(
capability,
datasets=[dataset],
use_cache=False,
)
print(f"Job ID: {job.job_id}")
print(f"Initial status: {job.status}")
Job ID: 5f22151bff884c9fa957ddb1d8b7a6d3 Initial status: JobStatus.RUNNING
Wait for completion and get the result reference¶
job.result() returns a CapabilityRunRef, not the full capability output object.
final_status = job.wait(timeout=120)
print(f"Final status: {final_status}")
ref = job.result(timeout=10)
print(f"Run UID: {ref.run_uid}")
print(f"Store URI: {ref.store_uri}")
Final status: JobStatus.COMPLETED Run UID: 7a553ac2b7b42fd836159672b761d777fba055ad19f77f39cdf2c3c17b9234ed Store URI: /tmp/checkmaite_jobs_eiljygz1/analytics_store/dataeval_cleaning/1779475338635_6969afdd.parquet
List and fetch tracked jobs¶
The default ray backend stores job metadata in the shared registry actor, so list_jobs() and get_job(...) read registry-backed state.
for tracked_job in list_jobs(limit=10):
print(tracked_job.job_id, tracked_job.status)
fetched = get_job(job.job_id)
print("Fetched by ID:", fetched.job_id, fetched.status)
5f22151bff884c9fa957ddb1d8b7a6d3 JobStatus.COMPLETED Fetched by ID: 5f22151bff884c9fa957ddb1d8b7a6d3 JobStatus.COMPLETED
Duplicate submission in the same scope¶
Submitting the same logical run again in the same idempotency_scope returns the existing tracked job instead of launching duplicate work.
same_job = submit_capability(
capability,
datasets=[dataset],
use_cache=False,
)
print(f"Original job: {job.job_id}")
print(f"Second submit: {same_job.job_id}")
print("Same tracked job:", same_job.job_id == job.job_id)
Original job: 5f22151bff884c9fa957ddb1d8b7a6d3 Second submit: 5f22151bff884c9fa957ddb1d8b7a6d3 Same tracked job: True
Query the analytics store¶
store = AnalyticsStore(ParquetBackend(str(store_dir)))
print(f"Tables: {store.list_tables()}")
cleaning_results = store.query_sql("""
SELECT
dataset_id,
exact_duplicate_count,
image_outlier_count,
image_outlier_ratio,
mean_brightness
FROM dataeval_cleaning
""")
print(cleaning_results)
Tables: ['dataeval_cleaning', 'runs'] shape: (1, 5) ┌───────────────────┬────────────────────┬───────────────────┬───────────────────┬─────────────────┐ │ dataset_id ┆ exact_duplicate_co ┆ image_outlier_cou ┆ image_outlier_rat ┆ mean_brightness │ │ --- ┆ unt ┆ nt ┆ io ┆ --- │ │ str ┆ --- ┆ --- ┆ --- ┆ f64 │ │ ┆ i64 ┆ i64 ┆ f64 ┆ │ ╞═══════════════════╪════════════════════╪═══════════════════╪═══════════════════╪═════════════════╡ │ coco-job-tutorial ┆ 0 ┆ 0 ┆ 0.0 ┆ 0.305882 │ └───────────────────┴────────────────────┴───────────────────┴───────────────────┴─────────────────┘
Shut down¶
shutdown_job_backend(wait=True)
print("Job backend shut down.")
Job backend shut down.