> **Beta.** The Ray integration is the most complex surface in `roar`. It's exercised end-to-end against a real Ray cluster in CI (see [Validation](#validation) below), but some edges — client-mode connections, exotic `runtime_env` policies, framework-side library mocks — may still bite. Treat it as beta: API and capture details may shift in minor releases. Issue reports welcome.

## Why Ray gets its own integration

[Ray](https://www.ray.io/) is the common way teams scale ML work across machines — distributed training, multi-node data preprocessing, hyperparameter sweeps. The unit of execution is a `@ray.remote` task or actor method that runs on whichever worker the scheduler picks, possibly on a different host than the driver. Tracers attached to the driver process don't see any of it.

So `roar`'s Ray integration is what makes lineage tracking work when workloads scale out: it ensures the tracer rides along to every Ray worker, captures per-task I/O at the right granularity, attributes events back to the originating task, and stitches everything back into the driver's local `.roar/roar.db` after the job ends.

You shouldn't have to think about most of this. The user-facing surface is one command — `roar run ray job submit ...` (or `roar run python your_script.py` for local Ray) — and the rest just works. The "Cluster setups" section near the end has the specifics for each topology.

## What gets captured

Per Ray task, `roar` records:

- **Local file I/O** — reads and writes through `builtins.open` (Python) and the LD_PRELOAD native tracer (raw libc), with content hashes computed on write.
- **S3 operations** — `PutObject`, `GetObject`, `UploadPart`, `CompleteMultipartUpload`, `DeleteObject` through patched boto3 and pyarrow filesystem clients. ETags are recorded as the hash.
- **Per-task attribution** — every event is tagged with the Ray task ID that produced it, plus actor/node/worker IDs when applicable. This works across `subprocess.Popen` and `threading.Thread` so library code that spawns helpers stays attached to the right task.
- **Task ordering** — derived from the artifact dependency graph: task B reads X, task A wrote X, therefore A → B.

Each Ray task becomes a job record in `.roar/roar.db` alongside the local-mode jobs that drive them. `roar dag`, `roar show`, and `roar register` work the same way they do for purely local pipelines.

## How it works

The integration boots `roar` inside every worker process via Ray's own extension points — no monkey-patching of the user's script, no special imports.

1. **`roar run ray job submit ...`** rewrites the entrypoint through `python -m roar.execution.runtime.driver_entrypoint` and registers a `worker_process_setup_hook` that calls into `roar.execution.runtime.worker_bootstrap.startup` on every worker.
2. **Workers also boot through `roar-worker`** — a tiny entrypoint binary installed alongside `roar` itself — which sets `LD_PRELOAD` before exec'ing Python. By the time user code runs, the libc-interposed native tracer is already attached.
3. **`sitecustomize.py`** activates inside the worker's Python interpreter (via a packaged `roar_inject.pth` that puts `roar` on `sys.path`). It installs the patches on `builtins.open`, `boto3.client`, `pandas.DataFrame.to_parquet`, `subprocess.Popen`, `threading.Thread`, and the relevant Ray internals.
4. **Per-task fragments** stream out as the task runs. Each task event becomes a `TaskFragment` carrying the task's reads/writes, attribution, and timing.
5. **Fragments are shipped** via one of two paths:
   - to GLaaS as encrypted fragment batches (`GlaasFragmentStreamer`) when GLaaS is configured for the session;
   - or to a local actor + filesystem fallback when it isn't.
6. **After the Ray job exits**, the driver-side reconstituter pulls fragments back, decrypts them, dedupes, and merges them into the host's `.roar/roar.db` — restoring `working_dir`-extracted paths to their host equivalents and materializing composite outputs along the way.

The driver and the workers both produce fragments. The driver's fragments record S3 traffic that passes through the per-node proxy; workers' fragments record everything observed inside each task.

> **For developers**: the canonical architecture diagram lives in [`docs/developer/ray-integration.md`](https://github.com/treqs/roar/blob/main/docs/developer/ray-integration.md) in the `roar` repo. The integration landed in PRs [#17](https://github.com/treqs/roar/pull/17) (initial Ray support) and [#32](https://github.com/treqs/roar/pull/32) (the rework around `roar-worker`, fragments, and host-submit lineage that's the current shape).

## Hashing strategy

Hashing is **per-worker, computed locally on the machine that handled the I/O**. There is no centralized hasher; the driver doesn't read worker bytes; nothing ships data around the cluster just to compute a digest.

Three flavors per capture surface:

| Surface | Hash | When computed |
|---|---|---|
| `builtins.open` (write) | `blake3` (with `sha256` fallback if blake3 isn't installed) | Streamed during `write()` calls in the worker's interpreter. Final digest emitted on `close()`. |
| `builtins.open` (read) | none recorded — the file's content hash isn't needed for "task X read this file"; lineage matches on path + producer's hash. | — |
| S3 via boto3 / pyarrow | `etag` from the S3 response | Computed server-side by S3; recorded as the artifact's `etag` hash. Not always a full-content digest (multipart uploads return a hash-of-part-hashes). |
| Native LD_PRELOAD tracer | none — the native event records the path only. | — (the hasher path runs at the Python layer above, which sees the content) |

Two consequences worth knowing:

- **Composite artifacts are aggregated on the driver side.** Per-worker hashes feed up into composite-artifact reconstruction at reconstitute time — but each component's hash is still the worker's local computation.
- **ETag is not a content-equality guarantee for large S3 objects.** Two byte-identical multipart uploads with different part-size choices produce different ETags. `roar` records both `etag` (whatever S3 returned) and, when the worker also opened the file directly, a content `blake3` for the same artifact.

## What you do as a user

For local Ray (driver and workers in the same process tree, no `ray job submit`):

```bash
roar run python your_script.py
```

For host-submit (the common multi-node case):

```bash
roar run ray job submit \
  --runtime-env-json '{"working_dir": "."}' \
  -- python your_script.py
```

`roar` inserts itself into the `worker_process_setup_hook` of the runtime env. You don't need to install `roar` separately on the cluster — it ships into the working_dir alongside your code.

After the job finishes:

```bash
roar status        # see what got recorded
roar dag           # the per-task DAG
roar show @5       # details of one Ray task
```

The same commands you'd run for a local pipeline. Ray tasks become regular jobs in the DAG.

## Configuration

Under `[ray]` in `roar.toml`:

| Key | Default | Effect |
|---|---|---|
| `ray.enabled` | `true` | Master switch for Ray instrumentation. Set `false` to disable injection. |
| `ray.actor_attribution` | `per_call` | `per_call` — attribute each actor method call separately. `per_actor` — group attribution by actor instance. |
| `ray.pip_install` | (varies) | Controls whether `roar` injects itself into `runtime_env.pip`. Disable when shipping `roar` via a working_dir bundle and you don't want a pip step on workers. |

Relevant environment variables (mostly set automatically by `roar run`):

- `ROAR_WRAP=1` — enables the `sitecustomize.py` patching of `ray.init` / `ray.shutdown`. Set automatically.
- `ROAR_PROJECT_DIR` — the host repo root that owns `.roar/roar.db`. Set automatically by `roar run`.
- `ROAR_CLUSTER_GLAAS_URL` — cluster-visible GLaaS URL override. Use when workers can't reach the same URL the host uses (e.g., the host uses `localhost` but workers need a NAT'd address).
- `ROAR_CLUSTER_AWS_ENDPOINT_URL` — same idea for the upstream S3 endpoint.

## Cluster setups

- **Local Ray** (`ray.init()` in the same process). `roar run python your_script.py` works. Driver and workers share a process tree; the tracer follows.
- **Local Docker Ray**. Same — just run `roar run python your_script.py` against the local cluster.
- **Host-submit to a remote cluster** (`ray job submit --address ...`). This is what the integration is primarily built for. `roar run ray job submit ...` wires up cluster-visible GLaaS and S3 endpoints, ships `roar` via the runtime env, and reconstitutes lineage after the job exits.
- **Client mode** (`ray://...`). Partially supported; client-mode connections don't go through `ray job submit`, so the worker setup hook injection isn't guaranteed. Use host-submit for confidence.

## Limitations

- **Cluster `runtime_env` policies** can block worker setup hooks or LD_PRELOAD. If you're seeing partial lineage on a managed Ray cluster, check whether the cluster's policy allows `worker_process_setup_hook` and arbitrary `env_vars`.
- **S3 ETag** isn't a full content digest for multipart uploads. `roar` records both ETag and (when available) a content blake3 from the local `open` capture so downstream consumers have an unambiguous identifier.
- **eBPF tracing** doesn't apply to Ray workers from the host's perspective — each worker uses the LD_PRELOAD native tracer instead. See [Tracers](/docs/tracers#cloud-and-managed-gpu-platforms) for which clouds support eBPF on the host side; for cross-worker tracing, preload is what `roar-worker` ships.
- **Client-mode connections** (`ray://`) may register partial lineage. Use host-submit for the strongest coverage.
- **Hidden internal tasks** — `ray_task:unknown`, `ray_task:__init__`, `ray_task:shutdown`, and proxy/bootstrap commands are treated as internal noise and don't show up as jobs in the DAG.

## Validation

The Ray integration is exercised against a real Ray cluster in CI, not against a mock. The end-to-end harness brings up Ray (currently `rayproject/ray:2.54.0-py312`) plus a MinIO S3 backend via Docker Compose, then runs `roar run ray job submit ...` against the cluster for each scenario. A parallel live-test surface exercises the same workflows against a real GLaaS server build.

The cross-cutting paths that trip up most "I just patched libc" integrations have explicit coverage: native LD_PRELOAD attribution across `subprocess.Popen` boundaries, across `threading.Thread` boundaries, and across Ray task boundaries; fragment streaming under partial-crash conditions; reconstitution of composite outputs; and host-submit topology with cluster-visible-vs-host-visible endpoint overrides. The "does it still work when my Ray task spawns workers that spawn workers" answer has a test on it.

Coverage list lives under `tests/backends/ray/e2e/` in the `roar` repo.

## Where to look next

- [Tracers](/docs/tracers) — the underlying observation mechanism, including preload (used by Ray workers) and platform notes.
- [roar Guide](/docs/roar-guide) — full CLI reference, including `roar register` for publishing Ray-shaped lineage to GLaaS.
- [Use Cases](/docs/use-cases) — example workflows.
