You can run this notebook in a live session Binder or view it on Github.

Distributed, Advanced

Distributed futures

[1]:
from dask.distributed import Client
c = Client(n_workers=4)
c.cluster

In chapter Distributed, we showed that executing a calculation (created using delayed) with the distributed executor is identical to any other executor. However, we now have access to additional functionality, and control over what data is held in memory.

To begin, the futures interface (derived from the built-in concurrent.futures) allow map-reduce like functionality. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with submit() and map(). Notice that the call returns immediately, giving one or more futures, whose status begins as “pending” and later becomes “finished”. There is no blocking of the local Python session.

Here is the simplest example of submit in action:

[2]:
def inc(x):
    return x + 1

fut = c.submit(inc, 1)
fut
[2]:
Future: inc status: pending, key: inc-ae0ff1cb6f5c736d2f30b1b1b13931fb

We can re-execute the following cell as often as we want as a way to poll the status of the future. This could of course be done in a loop, pausing for a short time on each iteration. We could continue with our work, or view a progressbar of work still going on, or force a wait until the future is ready.

In the meantime, the status dashboard (link above next to the Cluster widget) has gained a new element in the task stream, indicating that inc() has completed, and the progress section at the problem shows one task complete and held in memory.

[3]:
fut
[3]:
Future: inc status: finished, type: builtins.int, key: inc-ae0ff1cb6f5c736d2f30b1b1b13931fb

Possible alternatives you could investigate:

from dask.distributed import wait, progress
progress(fut)

would show a progress bar in this notebook, rather than having to go to the dashboard. This progress bar is also asynchronous, and doesn’t block the execution of other code in the meanwhile.

wait(fut)

would block and force the notebook to wait until the computation pointed to by fut was done. However, note that the result of inc() is sitting in the cluster, it would take no time to execute the computation now, because Dask notices that we are asking for the result of a computation it already knows about. More on this later.

[4]:
# grab the information back - this blocks if fut is not ready
c.gather(fut)
# equivalent action when only considering a single future
# fut.result()
[4]:
2

Here we see an alternative way to execute work on the cluster: when you submit or map with the inputs as futures, the computation moves to the data rather than the other way around, and the client, in the local Python session, need never see the intermediate values. This is similar to building the graph using delayed, and indeed, delayed can be used in conjunction with futures. Here we use the delayed object total from before.

[5]:
# Some trivial work that takes time
# repeated from the Distributed chapter.

from dask import delayed
import time

def inc(x):
    time.sleep(5)
    return x + 1

def dec(x):
    time.sleep(3)
    return x - 1

def add(x, y):
    time.sleep(7)
    return x + y

x = delayed(inc)(1)
y = delayed(dec)(2)
total = delayed(add)(x, y)
[6]:
# notice the difference from total.compute()
# notice that this cell completes immediately
fut = c.compute(total)
[7]:
c.gather(fut) # waits until result is ready
[7]:
3

Client.submit

submit takes a function and arguments, pushes these to the cluster, returning a Future representing the result to be computed. The function is passed to a worker process for evaluation. Note that this cell returns immediately, while computation may still be ongoing on the cluster.

[8]:
fut = c.submit(inc, 1)
fut
[8]:
Future: inc status: pending, key: inc-22a0514160481fa01cbf9621a2db3fc1

This looks a lot like doing compute(), above, except now we are passing the function and arguments directly to the cluster. To anyone used to concurrent.futures, this will look familiar. This new fut behaves the same way as the one above. Note that we have now over-written the previous definition of fut, which will get garbage-collected, and, as a result, that previous result is released by the cluster

Exercise: Rebuild the above delayed computation using Client.submit instead

The arguments passed to submit can be futures from other submit operations or delayed objects. The former, in particular, demonstrated the concept of moving the computation to the data which is one of the most powerful elements of programming with Dask.

[9]:
# Your code here
[10]:
x = c.submit(inc, 1)
y = c.submit(dec, 2)
total = c.submit(add, x, y)

print(total)     # This is still a future
c.gather(total)   # This blocks until the computation has finished

<Future: pending, key: add-26587b4557bda00a6b4240998c9537dc>
[10]:
3

Each futures represents a result held, or being evaluated by the cluster. Thus we can control caching of intermediate values - when a future is no longer referenced, its value is forgotten. In the solution, above, futures are held for each of the function calls. These results would not need to be re-evaluated if we chose to submit more work that needed them.

We can explicitly pass data from our local session into the cluster using scatter(), but usually better is to construct functions that do the loading of data within the workers themselves, so that there is no need to serialise and communicate the data. Most of the loading functions within Dask, sudh as dd.read_csv, work this way. Similarly, we normally don’t want to gather() results that are too big in memory.

The full API of the distributed scheduler gives details of interacting with the cluster, which remember, can be on your local machine or possibly on a massive computational resource.

The futures API offers a work submission style that can easily emulate the map/reduce paradigm (see c.map()) that may be familiar to many people. The intermediate results, represented by futures, can be passed to new tasks without having to bring the pull locally from the cluster, and new work can be assigned to work on the output of previous jobs that haven’t even begun yet.

Generally, any Dask operation that is executed using .compute() can be submitted for asynchronous execution using c.compute() instead, and this applies to all collections. Here is an example with the calculation previously seen in the Bag chapter. We have replaced the .compute() method there with the distributed client version, so, again, we could continue to submit more work (perhaps based on the result of the calculation), or, in the next cell, follow the progress of the computation. A similar progress-bar appears in the monitoring UI page.

[11]:
%run prep.py -d accounts
[12]:
import dask.bag as db
import os
import json
filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
js = lines.map(json.loads)

f = c.compute(js.filter(lambda record: record['name'] == 'Alice')
       .pluck('transactions')
       .flatten()
       .pluck('amount')
       .mean())
[13]:
from dask.distributed import progress
# note that progress must be the last line of a cell
# in order to show up
progress(f)
[14]:
# get result.
c.gather(f)
[14]:
366.1473166946851
[15]:
# release values by deleting the futures
del f, fut, x, y, total

Persist

Considering which data should be loaded by the workers, as opposed to passed, and which intermediate values to persist in worker memory, will in many cases determine the computation efficiency of a process.

In the example here, we repeat a calculation from the Array chapter - notice that each call to compute() is roughly the same speed, because the loading of the data is included every time.

[16]:
%run prep.py -d random
[17]:
import h5py
import os
f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')
dset = f['/x']
import dask.array as da
x = da.from_array(dset, chunks=(1000000,))

%time x.sum().compute()
%time x.sum().compute()
CPU times: user 43.9 ms, sys: 678 µs, total: 44.6 ms
Wall time: 584 ms
CPU times: user 20.4 ms, sys: 0 ns, total: 20.4 ms
Wall time: 57.4 ms
[17]:
4999958.5

If, instead, we persist the data to RAM up front (this takes a few seconds to complete - we could wait() on this process), then further computations will be much faster.

[18]:
# changes x from a set of delayed prescriptions
# to a set of futures pointing to data in RAM
# See this on the UI dashboard.
x = c.persist(x)
[19]:
%time x.sum().compute()
%time x.sum().compute()
CPU times: user 15.4 ms, sys: 1.19 ms, total: 16.5 ms
Wall time: 37.1 ms
CPU times: user 11 ms, sys: 496 µs, total: 11.5 ms
Wall time: 27.1 ms
[19]:
4999958.5

Naturally, persisting every intermediate along the way is a bad idea, because this will tend to fill up all available RAM and make the whole system slow (or break!). The ideal persist point is often at the end of a set of data cleaning steps, when the data is in a form which will get queried often.

Exercise: how is the memory associated with x released, once we know we are done with it?

[ ]:

Asynchronous computation

One benefit of using the futures API is that you can have dynamic computations that adjust as things progress. Here we implement a simple naive search by looping through results as they come in, and submit new points to compute as others are still running.

Watching the diagnostics dashboard as this runs you can see computations are being concurrently run while more are being submitted. This flexibility can be useful for parallel algorithms that require some level of synchronization.

Lets perform a very simple minimization using dynamic programming. The function of interest is known as Rosenbrock:

[20]:
# a simple function with interesting minima
import time

def rosenbrock(point):
    """Compute the rosenbrock function and return the point and result"""
    time.sleep(0.1)
    score = (1 - point[0])**2 + 2 * (point[1] - point[0]**2)**2
    return point, score

Initial setup, including creating a graphical figure. We use Bokeh for this, which allows for dynamic update of the figure as results come in.

[21]:
from bokeh.io import output_notebook, push_notebook
from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure, show
import numpy as np
output_notebook()

# set up plot background
N = 500
x = np.linspace(-5, 5, N)
y = np.linspace(-5, 5, N)
xx, yy = np.meshgrid(x, y)
d = (1 - xx)**2 + 2 * (yy - xx**2)**2
d = np.log(d)

p = figure(x_range=(-5, 5), y_range=(-5, 5))
p.image(image=[d], x=-5, y=-5, dw=10, dh=10, palette="Spectral11");
Loading BokehJS ...

We start off with a point at (0, 0), and randomly scatter test points around it. Each evaluation takes ~100ms, and as result come in, we test to see if we have a new best point, and choose random points around that new best point, as the search box shrinks.

We print the function value and current best location each time we have a new best value.

[22]:
from dask.distributed import as_completed
from random import uniform

scale = 5                  # Intial random perturbation scale
best_point = (0, 0)        # Initial guess
best_score = float('inf')  # Best score so far
startx = [uniform(-scale, scale) for _ in range(10)]
starty = [uniform(-scale, scale) for _ in range(10)]

# set up plot
source = ColumnDataSource({'x': startx, 'y': starty, 'c': ['grey'] * 10})
p.circle(source=source, x='x', y='y', color='c')
t = show(p, notebook_handle=True)

# initial 10 random points
futures = [c.submit(rosenbrock, (x, y)) for x, y in zip(startx, starty)]
iterator = as_completed(futures)

for res in iterator:
    # take a completed point, is it an improvement?
    point, score = res.result()
    if score < best_score:
        best_score, best_point = score, point
        print(score, point)

    x, y = best_point
    newx, newy = (x + uniform(-scale, scale), y + uniform(-scale, scale))

    # update plot
    source.stream({'x': [newx], 'y': [newy], 'c': ['grey']}, rollover=20)
    push_notebook(t)

    # add new point, dynamically, to work on the cluster
    new_point = c.submit(rosenbrock, (newx, newy))
    iterator.add(new_point)  # Start tracking new task as well

    # Narrow search and consider stopping
    scale *= 0.99
    if scale < 0.001:
        break
point