Ray: A Framework for Scaling Python Applications
Ray is a framework for scaling Python applications. Ray allows you to execute
the same Python program that you would run on your laptop on a cluster of
computers with minimum effort. All you need is to tell Ray what part of your
program needs to be scalable by decorating your Python functions and
classes, and Ray takes care of the rest for you. Ray has been used by
several companies including
OpenAI for building ChatGPT.
Ray provides both task-parallel and actor-based
computations. The programming model is as simple and intuitive as it seems
it is just an extension to Python. You simply instantiate tasks/actors out
of Python functions/classes. By passing around references to these tasks and
actors, you can define dependencies between them. Ray provides high
flexibility and allows dynamic execution necessary for many ML problems.
There are a lot of educational materials on Ray. Other than resources you can
find on
Ray Github,
you might want to check out Anyscale Academy, Ray Summit, and
Scaling Python with Ray book. In this post, I share a quick overview of Ray programming
model, some examples, and my understanding of Ray internals and how it
achieves scalability and fault-tolerance.
What is the deal with Ray?
Using Ray you can scale the same Python program you run on your laptop to run
on a cluster of computers in no time; you don't need to worry about managing a
cluster, scheduling your tasks, network communications, health checking,
tolerating faults, etc. Instead, you simply decorate your Python
functions/classes with @ray.remote,
and Ray takes care of the rest for you. Considering that many data scientists
or ML practitioners are not distributed systems experts, what Ray provides is
very valuable. What distinguishes Ray from other data processing frameworks
such as Spark is its flexibility, fine-grained control, and dynamic execution
among other things. While Spark is very good for data processing and classical
machine learning, Ray is better for deep learning and RL applications.
Ray programming model has two main concepts: Tasks and Actors.
Tasks
Tasks are used to perform stateless computation. A task is an execution
of a Python function decorated with
@ray.remote.
For example, consider the following function that adds two arrays.
@ray.remote
def add_array(arr1: np.array, arr2: np.array) -> np.array:
return np.add(arr1, arr1)
We can call a remote function, using
.remote(). The requested task will be executed somewhere in the cluster
picked by Ray. Thus, by simply decorating our function with @ray.remote and calling with .remote(), we can benefit from the processing power of our cluster.
result1 = add_array.remote(arr1, arr2)
The above call is an asynchronous call, and result1 is a future. You can get
the value of the object that this future refers to using ray.get().
arr_result = ray.get(result1)
You can pass the future to other tasks as parameters, thereby defining
dependency between tasks. For example:
result2 = add_array.remote(arr3, result1)
In this example, the task that computes result2 depends on the one that
computes result1. This ability to define dependencies between tasks using
futures is one of the key advantages of Ray.
The .remote() asynchronous API
lets us define parallelism. For example, suppose we are given a list
of tuples of arrays and we want to compute the sum of two arrays of each
tuple. With tasks, we can run this computation in parallel as follows.
sum_list = [] for tuple in list_of_tuples: arr1, arr2 = tuple sum_list.append(add_array.remote(arr1, arr2))
With the code above, entries of the sum_list will be computed in parallel.
Actors
Actors are used to perform stateful computation. An actor is an
instance of a Python class that is decorated with
@ray.remote. You can execute the
methods of an actor, just like tasks, somewhere in the cluster, except they
access the state of the actor, i.e., the class variables.
For example, consider the following class that multiplies the sum of two
arrays by a number.
@ray.remote
class AddArray:
def __init__(self, m: int):
self.m = m
def update_m(new_m: int):
self.m = new_m
def add_array(arr1: np.array, arr2: np.array) -> np.array:
return np.multiply(np.add(arr1, arr1), m)
You can instantiate an actor using
.remote() and calls its
functions.
add_array_actor = AddArray.remote(2)
result3 = add_array_actor.add_array.remote(arr1, arr2)
In add_array, the state of the actor (i.e., the variable m) is used
to compute the result. We can mutate this state by calling update_m function
of the actor.
Supporting stateful distributed computation is another key feature of Ray.
Ray Clusters
After installing Ray on your laptop, you can start using it by simply
having ray.init in
your Python program (see examples below). To run your code on a cluster,
however, you should first start a Ray cluster. Deploying a Ray
cluster manually on perm is very straightforward. Ray cluster consists
of a head node a set of worker nodes.
To create a cluster, you first start ray on the head node
(via ray start) and then
start Ray on worker nodes by giving the address of the head to them. The
head node is the node that runs system processes (needed for Ray
internals), and usually the driver processes which run the user Python
code.
Starting the head node:
ray start --head --port=6379
Starting a worker node:
ray start --address=<head-node-address:port>
Ray also supports deployment on Kubernetes, AWS, and GCP. Read more
on Ray clusters.
Examples
Installing Ray on Apple M2
Ray has experimental support for machines running Apple Silicon (such as
M1/M2). I was able to install and use Ray on an Apple M2
following these instructions. Note that Ray clusters are not supported for Windows and OSX.
This is how I installed Ray on M2:
mkdir myRayEnv
cd myRayEnv
wget https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-MacOSX-arm64.sh
bash Miniforge3-MacOSX-arm64.sh
rm Miniforge3-MacOSX-arm64.sh
source ~/.bash_profile
conda create --name myRayEnv
conda activate myRayEnv
conda install grpcio=1.43.0 -c conda-forge
pip install ray
Parallel Computing
As the first example, let's see how using Ray, we can easily speed up
our computation by running our tasks in parallel. Inside myRayEnv folder, I created parallel_computing.py with this content:
import osimport timeimport loggingimport rayray.init(ignore_reinit_error=True,logging_level=logging.ERROR,include_dashboard=False)def some_work():time.sleep(1)return "some_result"@ray.remotedef distributed_some_work():return some_work()print(f'Number of CPUs: {os.cpu_count()}')def run_local():results = [some_work() for _ in range(os.cpu_count())]return resultsstart_time = time.time()print(f'result from local: {run_local()}')end_time = time.time()print(f'elapsed time (s): {(end_time - start_time)}')def run_remote():results = ray.get([distributed_some_work.remote() for _ in range(os.cpu_count())])return resultsstart_time = time.time()print(f'result from Ray: {run_remote()}')end_time = time.time()print(f'elapsed time (s): {(end_time - start_time)}')
Output:
(myRayEnv)$ python3 parallel_computing.py
Number of CPUs: 8
result from local: ['some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result']
elapsed time (s): 8.035311937332153
result from Ray: ['some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result', 'some_result']
elapsed time (s): 1.0391979217529297
Fault-tolerance
For the second example, let's see how Ray handles task failures. Note
that I changed the logging level to warning to see the task failure
warning messages.
import loggingimport rayimport sysimport randomray.init(ignore_reinit_error=True,logging_level=logging.WARNING,include_dashboard=False)@ray.remotedef some_work():if random.randint(0, 2) == 1:sys.exit(0)return "some_result"print(f'result from Ray: {ray.get(some_work.remote())}')
Output (added new lines to the warning message to make it more
readable):
(myRayEnv)$ python3 fault-tolerance.py
2023-05-06 21:49:33,121 WARNING worker.py:1986 -- A worker died or was killed while executing a task by an unexpected system error.
To troubleshoot the problem, check the logs for the dead worker.
RayTask ID: 1109b4db659eb718fd8ef13a3193fe2d6dfea77701000000
Worker ID: 1312a49ddc8363414c1395a08ea3596a84c448f537135790228fd217
Node ID: b854a53730bc9c8f6f546f922f665f1562697319d8cdd69cb97f1a7f
Worker IP address: 127.0.0.1
Worker port: 53391
Worker PID: 5160
Worker exit type: SYSTEM_ERROR
Worker exit detail: The leased worker has unrecoverable failure.
Worker is requested to be destroyed when it is returned.
result from Ray: some_result
So after the first failure of the task, Ray retried it and in the second
time successfully finished it.
Ray Internals
Ray has been actively under development since the publication of [1] in
2018. It seems [2] is the most recent design. Despite the changes, Ray
remains a distribute future system [3] to utilize the
processing power of a cluster of computers. Ray achieves that goal by
distributing the computation. Specifically, to execute a task, Ray picks a
node in the cluster and sends the function definition to it to execute. Each
node maintains objects that works with in its memory. The selected node to
execute a task may not have a copy of the object that the task needs. In that
case, the object will be copied to the memory of the node. The result of a
remote task can be accessed through a distributed future, i.e., a future
whose eventual value might be stored on a remote node.
Selecting a node to execute a task is the job of the
scheduler. Maintaining the objects in the node memory is the job
of the object store. Finally, maintaining the state is the job of the
Global Control Store (GCS). Section 4.3 of [1] provides a good example
that demonstrates how a remote task is executed on Ray.
Scheduling
By default, Ray tries to schedule a task locally, if the local node has all
the objects needed by the task. However, the user can change the default
scheduling strategy, for example, to spread the tasks for better
performance.
You can define resource requirements for your task or class with @ray.remote decorator. Note that there are hard requirements, so a node that does not meet the resource requirements of a task, is never selected to run that task.
@ray.remote(num_cpus=2, num_gpus=2)
def func():
Strategies
- DEFAULT:
- First prefers locality: Locality picks a node that has more of the arguments. So if a function needs a and b and b is larger than a, the task will be scheduled on the node hosting b.
- If there is no node with locality, Ray ranks nodes based on different aspects such as utilization, and network latency [1]. It picks a random machine out of the top k nodes where k is configurable.
- SPREAD:
- Ignores locality.
- Tries to maximize the spreading of the workload throughout the cluster.
- You can also reserve resources for certain tasks using Placement Groups (PGs).
The original Ray [1] used local and global schedulers. That design has
changed, and now local schedulers directly schedule tasks on remote machines.
The GCS now pulls information about resources on all nodes. The schedulers
refer to GCS to access this information to schedule a task [2].
Scalability
The GCS is the only stateful component in Ray; the rest of the components
(including the scheduler) are stateless which makes it easy to scale them by
simply adding more instances. The GCS itself is sharded, so to scale it, we
should add more shards.
What if we have a big object (like a large matrix) that does not fit into
the memory of a single node?
At least at the time of writing [1], distributed objects are not supported
by Ray, so you would need to break down large objects in the application
layer, i.e., in your Python program. For example, if you want to read a
large file that does not fit into your memory, you have to load it to two
objects hosted by two different nodes.
Fault-tolerance
All components except GCS are stateless. Thus, to recover from their failure,
we can simply spawn a new instance of them.
But what if we lose a node? After failure, the objects in its memory are
gone. How does Ray tolerate node failures?
When an object is lost, Ray first tries to find a copy of it on other nodes.
In cases where no copy is available, Ray relies on lineage-based
fault-tolerance. Specifically, the owner of each object which is the node that created the objects keeps track of how the object is created,
and upon failure, it reconstructs the object by re-executing the task that
created it. If any of the parameters of the task is also lost, it will be
recursively recovered.
For GCS, the original paper [1] uses one Redis per shard and provides high
availability for each shard using chain replication [4]. However, that seems
not to be the case anymore [2]. GCS has changed its name to Global Control
Service (instead of Store) and now it is maintained on the head node
memory. Read more on GCS changes
here.
It seems you can still back your Ray cluster by Redis HA. By storing GCS data
on Redis, upon failure of the current head node, the new head node can recover
the state from Redis. To back Ray with Redis, simply provide Redis cluster
information to the Ray head node when starting it.
RAY_REDIS_ADDRESS=redis_ip:port ray start --head --redis-password PASSWORD
Read more
here.
Consistency
You can imagine a situation where a single object is passed to more than one
task running concurrently. If several tasks update the object, we might have
inconsistency and now have to deal with inconsistency challenges. To avoid
inconsistency altogether, Ray simply makes all objects
immutable. Specifically, having a future (which is basically an
object reference), you can read the value of the object using
ray.get(future), but there is
no ray.set(future) to change
the value of the object, so once the object is created, you can only read it.
Note that there is
future = ray.put(object_value),
which allows you to create a brand-new object out of a value and receive
its reference/future.
Inter-Process Communication
Each worker process maintains an in-process store that maintains small objects
(<100KB). Larger objects are stored in a distributed in-memory object
store. To access an object owned by another process, the object is copied to
the in-process store of the requests. Workers on the same node access larger
objects from the object store through shared memory. To access a large
remote object, the object store copies it from the object store of the owner
process to the local object store. Ray relies on gRPC for non-local inter-process communication. Read more here.
Conclusion
Ray is a great tool for ML practitioners to scale their Python applications
with minimum effort. With its intuitive and flexible programming model, ML
engineers can focus on their core ML problems instead of dealing with the challenges of managing and utilizing a distributed system. Ray design has been
evolving since its initial introduction, but the overall goal which is to be a
distribute future system remains the same. What I love about Ray is its simplicity, intuitive programming model, and its integration with Python.
References
[1] Moritz, P., Nishihara, R., Wang, S., Tumanov, A., Liawsdfh, R.,
Liang, E., Elibol, M., Yang, Z., Paul, W., Jordan, M.I. and Stoica, I., 2018.
Ray: A distributed framework for emerging {AI} applications. In 13th {USENIX}
Symposium on Operating Systems Design and Implementation ({OSDI} 18) (pp.
561-577).
[2] Ray v2 Architecture, Ray Team. https://docs.google.com/document/d/1tBw9A4j62ruI5omIJbMxly-la5w4q_TjyJgJL_jN2fI/preview#heading=h.iyrm5j2gcdoq
[3] Wang, S., Liang, E., Oakes, E., Hindman, B., Luan, F.S., Cheng, A.
and Stoica, I., 2021, April. Ownership: A Distributed Futures System for
Fine-Grained Tasks. In NSDI (pp. 671-686).
[4] Van Renesse, R. and Schneider, F.B., 2004, December. Chain
Replication for Supporting High Throughput and Availability. In OSDI (Vol.
4, No. 91–104).
Comments
Post a Comment