You can run this notebook in a live session Binder or view it on Github.

Dask logo\

Futures - non-blocking distributed calculations

Submit arbitrary functions for computation in a parallelized, eager, and non-blocking way.

The futures interface (derived from the built-in concurrent.futures) provide fine-grained real-time execution for custom situations. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with submit() and map(). The call returns immediately, giving one or more futures, whose status begins as “pending” and later becomes “finished”. There is no blocking of the local Python session.

This is the important difference between futures and delayed. Both can be used to support arbitrary task scheduling, but delayed is lazy (it just constructs a graph) whereas futures are eager. With futures, as soon as the inputs are available and there is compute available, the computation starts.

Related Documentation

[1]:
from dask.distributed import Client

client = Client(n_workers=4)
client
2022-08-09 08:55:20,415 - distributed.diskutils - INFO - Found stale lock file and directory '/home/runner/work/dask-tutorial/dask-tutorial/dask-worker-space/worker-ypbk5_k7', purging
2022-08-09 08:55:20,416 - distributed.diskutils - INFO - Found stale lock file and directory '/home/runner/work/dask-tutorial/dask-tutorial/dask-worker-space/worker-iv1_pui0', purging
[1]:

Client

Client-fe36203f-17c0-11ed-9270-00224804ae66

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

A Typical Workflow

This is the same workflow that we saw in the delayed notebook. It is for-loopy and the data is not necessarily an array or a dataframe. The following example outlines a read-transform-write:

def process_file(filename):
    data = read_a_file(filename)
    data = do_a_transformation(data)
    destination = f"results/{filename}"
    write_out_data(data, destination)
    return destination

futures = []
for filename in filenames:
    future = client.submit(process_file, filename)
    futures.append(future)

futures

Basics

Just like we did in the delayed notebook, let’s make some toy functions, inc and add, that sleep for a while to simulate work. We’ll then time running these functions normally.

[2]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def double(x):
    sleep(2)
    return 2 * x

def add(x, y):
    sleep(1)
    return x + y

We can run these locally

[3]:
inc(1)
[3]:
2

Or we can submit them to run remotely with Dask. This immediately returns a future that points to the ongoing computation, and eventually to the stored result.

[4]:
future = client.submit(inc, 1)  # returns immediately with pending future
future
[4]:
Future: inc status: pending, type: NoneType, key: inc-cdc8a98879bc6d18bd017529d9015f70

If you wait a second, and then check on the future again, you’ll see that it has finished.

[5]:
future
[5]:
Future: inc status: pending, type: NoneType, key: inc-cdc8a98879bc6d18bd017529d9015f70

You can block on the computation and gather the result with the .result() method.

[6]:
future.result()
[6]:
2

Other ways to wait for a future

from dask.distributed import wait, progress
progress(future)

shows a progress bar in this notebook, rather than having to go to the dashboard. This progress bar is also asynchronous, and doesn’t block the execution of other code in the meanwhile.

wait(future)

blocks and forces the notebook to wait until the computation pointed to by future is done. However, note that if the result of inc() is sitting in the cluster, it would take no time to execute the computation now, because Dask notices that we are asking for the result of a computation it already knows about. More on this later.

Other ways to gather results

client.gather(futures)

gathers results from more than one future.

client.compute

Generally, any Dask operation that is executed using .compute() or dask.compute() can be submitted for asynchronous execution using client.compute() instead.

Here is an example from the delayed notebook:

[7]:
import dask

@dask.delayed
def inc(x):
    sleep(1)
    return x + 1

@dask.delayed
def add(x, y):
    sleep(1)
    return x + y

x = inc(1)
y = inc(2)
z = add(x, y)

So far we have a regular dask.delayed output. When we pass z to client.compute we get a future back and Dask starts evaluating the task graph.

[8]:
# notice the difference from z.compute()
# notice that this cell completes immediately
future = client.compute(z)
future
[8]:
Future: add status: pending, type: NoneType, key: add-64a7aa0d-6193-4e0c-9a33-ada60dd54b44
[9]:
future.result()  # waits until result is ready
[9]:
5

When using futures, the computation moves to the data rather than the other way around, and the client, in the local Python session, need never see the intermediate values.

client.submit

client.submit takes a function and arguments, pushes these to the cluster, returning a Future representing the result to be computed. The function is passed to a worker process for evaluation. This looks a lot like doing client.compute(), above, except now we are passing the function and arguments directly to the cluster.

[10]:
def inc(x):
    sleep(1)
    return x + 1

future_x = client.submit(inc, 1)
future_y = client.submit(inc, 2)
future_z = client.submit(sum, [future_x, future_y])
future_z
[10]:
Future: sum status: pending, type: NoneType, key: sum-360394aa927a1668eebf271e35cc3de5
[11]:
future_z.result()  # waits until result is ready
[11]:
5

The arguments toclient.submit can be regular Python functions and objects, futures from other submit operations or dask.delayed objects.

Each future represents a result held, or being evaluated by the cluster. Thus we can control caching of intermediate values - when a future is no longer referenced, its value is forgotten. In the solution, above, futures are held for each of the function calls. These results would not need to be re-evaluated if we chose to submit more work that needed them.

We can explicitly pass data from our local session into the cluster using client.scatter(), but usually it is better to construct functions that do the loading of data within the workers themselves, so that there is no need to serialize and communicate the data. Most of the loading functions within Dask, such as dd.read_csv, work this way. Similarly, we normally don’t want to gather() results that are too big in memory.

Example: Sporadically failing task

Let’s imagine a task that sometimes fails. You might encounter this when dealing with input data where sometimes a file is malformed, or maybe a request times out.

[12]:
from random import random

def flaky_inc(i):
    if random() < 0.2:
        raise ValueError("You hit the error!")
    return i + 1

If you run this function over and over again, it will sometimes fail.

>>> flaky_inc(2)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Input In [65], in <cell line: 1>()
----> 1 flaky_inc(2)

Input In [61], in flaky_inc(i)
      3 def flaky_inc(i):
      4     if random() < 0.5:
----> 5         raise ValueError("You hit the error!")
      6     return i + 1

ValueError: You hit the error!

We can run this function on a range of inputs using client.map.

[13]:
futures = client.map(flaky_inc, range(10))
2022-08-09 08:55:26,512 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-d26268bef42695a31499474052bc29fc
Function:  flaky_inc
args:      (0)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2022-08-09 08:55:26,516 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-2a052b46ffeeb97eb08d150fd9aaea17
Function:  flaky_inc
args:      (4)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2022-08-09 08:55:26,519 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-c982faa8532f77bd735eee9d2ac0ad13
Function:  flaky_inc
args:      (6)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

Notice how the cell returned even though some of the computations failed. We can inspect these futures one by one and find the ones that failed:

[14]:
for i, future in enumerate(futures):
    print(i, future.status)
0 error
1 pending
2 pending
3 pending
4 pending
5 pending
6 pending
7 pending
8 pending
9 pending
2022-08-09 08:55:26,522 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-301c3196e793187fc9461b63293697c9
Function:  flaky_inc
args:      (2)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

You can rerun those specific futures to try to get the task to successfully complete:

[15]:
futures[5].retry()
[16]:
for i, future in enumerate(futures):
    print(i, future.status)
0 error
1 finished
2 error
3 finished
4 error
5 finished
6 error
7 finished
8 finished
9 finished

A more concise way of retrying in the case of sporadic failures is by setting the number of retries in the client.compute, client.submit or client.map method.

Note: In this example we also need to set pure=False to let Dask know that the arguments to the function do not totally determine the output.

[17]:
futures = client.map(flaky_inc, range(10), retries=5, pure=False)
future_z = client.submit(sum, futures)
future_z.result()
2022-08-09 08:55:26,576 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-a66ea46d-1aaa-4ccd-afd2-c970b245fe29-2
Function:  flaky_inc
args:      (2)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2022-08-09 08:55:26,578 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-a66ea46d-1aaa-4ccd-afd2-c970b245fe29-3
Function:  flaky_inc
args:      (3)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2022-08-09 08:55:26,579 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-a66ea46d-1aaa-4ccd-afd2-c970b245fe29-8
Function:  flaky_inc
args:      (8)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2022-08-09 08:55:26,580 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-a66ea46d-1aaa-4ccd-afd2-c970b245fe29-9
Function:  flaky_inc
args:      (9)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2022-08-09 08:55:26,592 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-a66ea46d-1aaa-4ccd-afd2-c970b245fe29-8
Function:  flaky_inc
args:      (8)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2022-08-09 08:55:26,599 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-a66ea46d-1aaa-4ccd-afd2-c970b245fe29-8
Function:  flaky_inc
args:      (8)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

[17]:
55

You will see a lot of warnings, but the computation should eventually succeed.

Why use Futures?

The futures API offers a work submission style that can easily emulate the map/reduce paradigm. If that is familiar to you then futures might be the simplest entrypoint into Dask.

The other big benefit of futures is that the intermediate results, represented by futures, can be passed to new tasks without having to pull data locally from the cluster. New operations can be setup to work on the output of previous jobs that haven’t even begun yet.