You can run this notebook in a live session Binder or view it on Github.

Dask logo\

Distributed - spread your data and computation across a cluster

As we covered at the beginning Dask has the ability to run work on multiple machines using the distributed scheduler.

Until now we have actually been using the distributed scheduler for our work, but just on a single machine.

When we instantiate a Client() object with no arguments it will attempt to locate a Dask cluster. It will check your local Dask config and environment variables to see if connection information has been specified. If not it will create an instance of LocalCluster and use that.

Specifying connection information in config is useful for system administrators to provide access to their users. We do this in the Dask Helm Chart for Kubernetes, the chart installs a multi-node Dask cluster and a Jupyter server on a Kubernetes cluster and Jupyter is preconfigured to discover the distributed cluster.

Local Cluster

Let’s explore the LocalCluster object ourselves and see what it is doing.

[1]:
from dask.distributed import LocalCluster, Client
[2]:
cluster = LocalCluster()
cluster

Creating a cluster object will create a Dask scheduler and a number of Dask workers. If no arguments are specified then it will autodetect the number of CPU cores your system has and the amount of memory and create workers to appropriately fill that.

You can also specify these arguments yourself. Let’s have a look at the docstring to see the options we have available.

These arguments can also be passed to ``Client`` and in the case where it creates a ``LocalCluster`` they will just be passed on down the line.

[3]:
?LocalCluster

Our cluster object has attributes and methods which we can use to access information about our cluster. For instance we can get the log output from the scheduler and all the workers with the get_logs() method.

[4]:
cluster.get_logs()
[4]:
Cluster

Scheduler

2022-09-11 11:39:13,426 - distributed.scheduler - INFO - State start

2022-09-11 11:39:13,430 - distributed.scheduler - INFO - Clear task state

2022-09-11 11:39:13,431 - distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:38263

2022-09-11 11:39:13,431 - distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787

2022-09-11 11:39:14,646 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:45751', name: 1, status: running, memory: 0, processing: 0>

2022-09-11 11:39:14,649 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45751

2022-09-11 11:39:14,677 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:42697', name: 0, status: running, memory: 0, processing: 0>

2022-09-11 11:39:14,679 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:42697

tcp://127.0.0.1:42697

2022-09-11 11:39:14,336 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:42697

2022-09-11 11:39:14,336 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:42697

2022-09-11 11:39:14,336 - distributed.worker - INFO - dashboard at: 127.0.0.1:46681

2022-09-11 11:39:14,336 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:38263

2022-09-11 11:39:14,336 - distributed.worker - INFO - -------------------------------------------------

2022-09-11 11:39:14,336 - distributed.worker - INFO - Threads: 1

2022-09-11 11:39:14,336 - distributed.worker - INFO - Memory: 3.39 GiB

2022-09-11 11:39:14,336 - distributed.worker - INFO - Local Directory: /home/runner/work/dask-tutorial/dask-tutorial/dask-worker-space/worker-l_9y9cvg

2022-09-11 11:39:14,336 - distributed.worker - INFO - -------------------------------------------------

2022-09-11 11:39:14,679 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:38263

2022-09-11 11:39:14,679 - distributed.worker - INFO - -------------------------------------------------

tcp://127.0.0.1:45751

2022-09-11 11:39:14,309 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:45751

2022-09-11 11:39:14,309 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:45751

2022-09-11 11:39:14,309 - distributed.worker - INFO - dashboard at: 127.0.0.1:38625

2022-09-11 11:39:14,309 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:38263

2022-09-11 11:39:14,309 - distributed.worker - INFO - -------------------------------------------------

2022-09-11 11:39:14,309 - distributed.worker - INFO - Threads: 1

2022-09-11 11:39:14,309 - distributed.worker - INFO - Memory: 3.39 GiB

2022-09-11 11:39:14,309 - distributed.worker - INFO - Local Directory: /home/runner/work/dask-tutorial/dask-tutorial/dask-worker-space/worker-_n_p34yu

2022-09-11 11:39:14,309 - distributed.worker - INFO - -------------------------------------------------

2022-09-11 11:39:14,651 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:38263

2022-09-11 11:39:14,651 - distributed.worker - INFO - -------------------------------------------------

We can access the url that the Dask dashboard is being hosted at.

[5]:
cluster.dashboard_link
[5]:
'http://127.0.0.1:8787/status'

In order for Dask to use our cluster we still need to create a Client object, but as we have already created a cluster we can pass that directly to our client.

[6]:
client = Client(cluster)
client
[6]:

Client

Client-5cf50f55-31c6-11ed-9236-0022480808f4

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

[7]:
del client, cluster

Remote clusters via SSH

A common way to distribute your work onto multiple machines is via SSH. Dask has a cluster manager which will handle creating SSH connections for you called SSHCluster.

from dask.distributed import SSHCluster

When constructing this cluster manager we need to pass a list of addresses, either hostnames or IP addresses, which we will SSH into and attempt to start a Dask scheduler or worker on.

cluster = SSHCluster(["localhost", "hostA", "hostB"])
cluster

When we create our SSHCluster object we have given a list of three hostnames.

The first host in the list will be used as the scheduler, all other hosts will be used as workers. If you’re on the same network it wouldn’t be unreasonable to set your local machine as the scheduler and then use other machines as workers.

If your servers are remote to you, in the cloud for instance, you may want the scheduler to be a remote machine too to avoid network bottlenecks.

Scalable clusters

Both of the clusters we have seen so far are fixed size clusters. We are either running locally and using all the resources in our machine, or we are using an explicit number of other machines via SSH.

With some cluster managers it is possible to increase and descrease the number of workers either by calling cluster.scale(n) in your code where n is the desired number of workers. Or you can let Dask do this dynamically by calling cluster.adapt(minimum=1, maximum=100) where minimum and maximum are your preferred limits for Dask to abide to.

It is always good to keep your minimum to at least 1 as Dask will start running work on a single worker in order to profile how long things take and extrapolate how many additional workers it thinks it needs. Getting new workers may take time depending on your setup so keeping this at 1 or above means this profilling will start immediately.

We currently have cluster managers for Kubernetes, Hadoop/Yarn, cloud platforms and batch systems including PBS, SLURM and SGE.

These cluster managers allow users who have access to resources such as these to bootstrap Dask clusters on to them. If an institution wishes to provide a central service that users can request Dask clusters from there is also Dask Gateway.

Cluster components

The minimum requirements for a functioning Dask cluster is a scheduler process and one worker process.

We can start these processes manually via the CLI. Let’s start with the scheduler.

$ dask-scheduler
2022-07-07 14:11:35,661 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-07 14:11:37,405 - distributed.scheduler - INFO - State start
2022-07-07 14:11:37,408 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-07 14:11:37,409 - distributed.scheduler - INFO - Clear task state
2022-07-07 14:11:37,409 - distributed.scheduler - INFO -   Scheduler at:   tcp://10.51.100.80:8786
2022-07-07 14:11:37,409 - distributed.scheduler - INFO -   dashboard at:                     :8787

Then we can connect a worker on the address that the scheduler is listening on.

$ dask-worker tcp://10.51.100.80:8786 --nworkers=auto
2022-07-07 14:12:53,915 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58051'
2022-07-07 14:12:53,922 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58052'
2022-07-07 14:12:53,924 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58053'
2022-07-07 14:12:53,925 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58054'
2022-07-07 14:12:55,222 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58065
2022-07-07 14:12:55,222 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58065
2022-07-07 14:12:55,223 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58068
2022-07-07 14:12:55,223 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,223 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,223 - distributed.worker - INFO -               Threads:                          3
2022-07-07 14:12:55,223 - distributed.worker - INFO -                Memory:                   4.00 GiB
2022-07-07 14:12:55,224 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-hlvac6m5
2022-07-07 14:12:55,225 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,227 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58066
2022-07-07 14:12:55,227 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58066
2022-07-07 14:12:55,227 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58070
2022-07-07 14:12:55,227 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,227 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,227 - distributed.worker - INFO -               Threads:                          3
2022-07-07 14:12:55,228 - distributed.worker - INFO -                Memory:                   4.00 GiB
2022-07-07 14:12:55,228 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-e1suf_7o
2022-07-07 14:12:55,229 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,231 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58063
2022-07-07 14:12:55,233 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58063
2022-07-07 14:12:55,233 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58067
2022-07-07 14:12:55,233 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,233 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,234 - distributed.worker - INFO -               Threads:                          3
2022-07-07 14:12:55,234 - distributed.worker - INFO -                Memory:                   4.00 GiB
2022-07-07 14:12:55,235 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-oq39ihb4
2022-07-07 14:12:55,236 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,246 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,246 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,249 - distributed.core - INFO - Starting established connection
2022-07-07 14:12:55,264 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,264 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,267 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,267 - distributed.core - INFO - Starting established connection
2022-07-07 14:12:55,267 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,269 - distributed.core - INFO - Starting established connection
2022-07-07 14:12:55,273 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58064
2022-07-07 14:12:55,273 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58064
2022-07-07 14:12:55,273 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58069
2022-07-07 14:12:55,273 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,274 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,274 - distributed.worker - INFO -               Threads:                          3
2022-07-07 14:12:55,275 - distributed.worker - INFO -                Memory:                   4.00 GiB
2022-07-07 14:12:55,275 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-zfie55ku
2022-07-07 14:12:55,276 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,299 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,300 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,302 - distributed.core - INFO - Starting established connection

Then in Python we can connect a client to this cluster and submit some work.

>>> from dask.distributed import Client
>>> client = Client("tcp://10.51.100.80:8786")
>>> client.submit(lambda: 1+1)

We can also do this in Python by importing cluster components and creating them directly.

[8]:
from dask.distributed import Scheduler, Worker, Client

async with Scheduler() as scheduler:
    async with Worker(scheduler.address) as worker:
        async with Client(scheduler.address, asynchronous=True) as client:
            print(await client.submit(lambda: 1 + 1))
/usr/share/miniconda3/envs/dask-tutorial/lib/python3.9/site-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 33951 instead
  warnings.warn(
2

Most of the time we never have to create these components ourselves and instead can rely on cluster manager objects to do this for us. But in some situations it can be useful to be able to contruct a cluster yourself manually.

You may also see a Nanny process being referenced from time to time. This is a wrapper for the worker that handles restarting the process if it is killed. When we run dask-worker via the CLI a nanny is automatically created for us.

Cluster networking

By default Dask uses a custom TCP based remote procedure call protocol to communicate between processes. The scheduler and workers all listen on TCP ports for communication.

When you start a scheduler it typically listens on port 8786. When a worker is created it listens on a random high port and communicates that port to the scheduler when it first connects.

The scheduler maintains a list of all workers and their address which can be accessed via the workers, therefore both the scheduler and any of the workers can open connections to any other worker at any time. Connections are closed automatically when not in use.

The Client will only ever connect to the scheduler and all communication to the workers will pass through it. This means that when deploying Dask clusters the scheduler and workers must typically be on the same network and able to access each other via IP and port directly. But the client can run wherever as long as it can access the scheduler communication port. It is common to configure firewall rules or load balancers to provide access to just the scheduler port.

Dask also supports other network protocols such as TLS, websockets and UCX.

TLS/SSL for secure communication

Dask cluster components can use certificates to mutually authenticate and communicate securely if run in an untrusted envronment. You can either generate certificates for the scheduler, worker and client automatically and distribute those or you can generate temporary credentials.

Some cluster managers such as dask-cloudprovider will automatically enable TLS and generate one-time certificates when exposing clusters to the internet from the public cloud.

[9]:
from dask.distributed import Scheduler, Worker, Client
from distributed.security import Security

security = Security.temporary()

async with Scheduler(security=security) as scheduler:
    async with Worker(scheduler.address, security=security) as worker:
        async with Client(
            scheduler.address, security=security, asynchronous=True
        ) as client:
            print(await client.submit(lambda: 1 + 1))
/usr/share/miniconda3/envs/dask-tutorial/lib/python3.9/site-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37149 instead
  warnings.warn(
2

Websockets

Dask can also communicate via websockets instead of TCP. There is a very small performance overhead to doing this but it means that the dashboard and communication happen on the same port and can be reverse proxied by a layer 7 proxy like nginx. This is necessary for some deployment scenarios where you cannot exprt ports but you can proxy web services.

UCX

On systems with high performance networking such as Infiniband or NVLink Dask can also leverage UCX which provides a unified communication protocol that automatically upgrades communication to use the fastest hardware available. This is vital for good performance on HPC systems with Infiniband or systems with multiple GPU workers.