{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\"Dask\n", "\n", "# dask.delayed - parallelize any code\n", "\n", "What if you don't have an array or dataframe? Instead of having blocks where the function is applied to each block, you can decorate functions with `@delayed` and _have the functions themselves be lazy_. \n", "\n", "This is a simple way to use `dask` to parallelize existing codebases or build [complex systems](https://blog.dask.org/2018/02/09/credit-models-with-dask). \n", "\n", "**Related Documentation**\n", "\n", "* [Delayed documentation](https://docs.dask.org/en/latest/delayed.html)\n", "* [Delayed screencast](https://www.youtube.com/watch?v=SHqFmynRxVU)\n", "* [Delayed API](https://docs.dask.org/en/latest/delayed-api.html)\n", "* [Delayed examples](https://examples.dask.org/delayed.html)\n", "* [Delayed best practices](https://docs.dask.org/en/latest/delayed-best-practices.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we'll see in the [distributed scheduler notebook](05_distributed.ipynb), Dask has several ways of executing code in parallel. We'll use the distributed scheduler by creating a `dask.distributed.Client`. For now, this will provide us with some nice diagnostics. We'll talk about schedulers in depth later." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client\n", "\n", "client = Client(n_workers=4)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A Typical Workflow\n", "\n", "Typically if a workflow contains a for-loop it can benefit from delayed. The following example outlines a read-transform-write:\n", "\n", "```python\n", "import dask\n", " \n", "@dask.delayed\n", "def process_file(filename):\n", " data = read_a_file(filename)\n", " data = do_a_transformation(data)\n", " destination = f\"results/{filename}\"\n", " write_out_data(data, destination)\n", " return destination\n", "\n", "results = []\n", "for filename in filenames:\n", " results.append(process_file(filename))\n", " \n", "dask.compute(results)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Basics\n", "\n", "First let's make some toy functions, `inc` and `add`, that sleep for a while to simulate work. We'll then time running these functions normally.\n", "\n", "In the next section we'll parallelize this code." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import sleep\n", "\n", "\n", "def inc(x):\n", " sleep(1)\n", " return x + 1\n", "\n", "\n", "def add(x, y):\n", " sleep(1)\n", " return x + y" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We time the execution of this normal code using the `%%time` magic, which is a special function of the Jupyter Notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# This takes three seconds to run because we call each\n", "# function sequentially, one after the other\n", "\n", "x = inc(1)\n", "y = inc(2)\n", "z = add(x, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parallelize with the `dask.delayed` decorator\n", "\n", "Those two increment calls *could* be called in parallel, because they are totally independent of one-another.\n", "\n", "We'll make the `inc` and `add` functions lazy using the `dask.delayed` decorator. When we call the delayed version by passing the arguments, exactly as before, the original function isn't actually called yet - which is why the cell execution finishes very quickly.\n", "Instead, a *delayed object* is made, which keeps track of the function to call and the arguments to pass to it.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask\n", "\n", "\n", "@dask.delayed\n", "def inc(x):\n", " sleep(1)\n", " return x + 1\n", "\n", "\n", "@dask.delayed\n", "def add(x, y):\n", " sleep(1)\n", " return x + y" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# This runs immediately, all it does is build a graph\n", "\n", "x = inc(1)\n", "y = inc(2)\n", "z = add(x, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This ran immediately, since nothing has really happened yet.\n", "\n", "To get the result, call `compute`. Notice that this runs faster than the original code." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# This actually runs our computation using a local thread pool\n", "\n", "z.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What just happened?\n", "\n", "The `z` object is a lazy `Delayed` object. This object holds everything we need to compute the final result, including references to all of the functions that are required and their inputs and relationship to one-another. We can evaluate the result with `.compute()` as above or we can visualize the task graph for this value with `.visualize()`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "z" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Look at the task graph for `z`\n", "z.visualize()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that this includes the names of the functions from before, and the logical flow of the outputs of the `inc` functions to the inputs of `add`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Some questions to consider:\n", "\n", "- Why did we go from 3s to 2s? Why weren't we able to parallelize down to 1s?\n", "- What would have happened if the inc and add functions didn't include the `sleep(1)`? Would Dask still be able to speed up this code?\n", "- What if we have multiple outputs or also want to get access to x or y?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise: Parallelize a for loop\n", "\n", "`for` loops are one of the most common things that we want to parallelize. Use `dask.delayed` on `inc` and `sum` to parallelize the computation below:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = [1, 2, 3, 4, 5, 6, 7, 8]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# Sequential code\n", "\n", "\n", "def inc(x):\n", " sleep(1)\n", " return x + 1\n", "\n", "\n", "results = []\n", "for x in data:\n", " y = inc(x)\n", " results.append(y)\n", "\n", "total = sum(results)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "total" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# Your parallel code here..." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true }, "tags": [] }, "outputs": [], "source": [ "@dask.delayed\n", "def inc(x):\n", " sleep(1)\n", " return x + 1\n", "\n", "\n", "results = []\n", "for x in data:\n", " y = inc(x)\n", " results.append(y)\n", "\n", "total = sum(results)\n", "print(\"Before computing:\", total) # Let's see what type of thing total is\n", "result = total.compute()\n", "print(\"After computing :\", result) # After it's computed" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "How do the graph visualizations compare with the given solution, compared to a version with the `sum` function used directly rather than wrapped with `delayed`? Can you explain the latter version? You might find the result of the following expression illuminating\n", "```python\n", "inc(1) + inc(2)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise: Parallelize a for-loop code with control flow\n", "\n", "Often we want to delay only *some* functions, running a few of them immediately. This is especially helpful when those functions are fast and help us to determine what other slower functions we should call. This decision, to delay or not to delay, is usually where we need to be thoughtful when using `dask.delayed`.\n", "\n", "In the example below we iterate through a list of inputs. If that input is even then we want to call `inc`. If the input is odd then we want to call `double`. This `is_even` decision to call `inc` or `double` has to be made immediately (not lazily) in order for our graph-building Python code to proceed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def double(x):\n", " sleep(1)\n", " return 2 * x\n", "\n", "\n", "def is_even(x):\n", " return not x % 2\n", "\n", "\n", "data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# Sequential code\n", "\n", "results = []\n", "for x in data:\n", " if is_even(x):\n", " y = double(x)\n", " else:\n", " y = inc(x)\n", " results.append(y)\n", "\n", "total = sum(results)\n", "print(total)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# Your parallel code here...\n", "# TODO: parallelize the sequential code above using dask.delayed\n", "# You will need to delay some functions, but not all" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true }, "tags": [] }, "outputs": [], "source": [ "@dask.delayed\n", "def double(x):\n", " sleep(1)\n", " return 2 * x\n", "\n", "\n", "results = []\n", "for x in data:\n", " if is_even(x): # even\n", " y = double(x)\n", " else: # odd\n", " y = inc(x)\n", " results.append(y)\n", "\n", "total = sum(results)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time total.compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "total.visualize()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Some questions to consider:\n", "\n", "- What are other examples of control flow where we can't use delayed?\n", "- What would have happened if we had delayed the evaluation of `is_even(x)` in the example above?\n", "- What are your thoughts on delaying `sum`? This function is both computational but also fast to run." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise: Parallelize a Pandas Groupby Reduction\n", "\n", "In this exercise we read several CSV files and perform a groupby operation in parallel. We are given sequential code to do this and parallelize it with `dask.delayed`.\n", "\n", "The computation we will parallelize is to compute the mean departure delay per airport from some historical flight data. We will do this by using `dask.delayed` together with `pandas`. In a future section we will do this same exercise with `dask.dataframe`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create data\n", "\n", "Run this code to prep some data.\n", "\n", "This downloads and extracts some historical flight data for flights out of NYC between 1990 and 2000. The data is originally from [here](http://stat-computing.org/dataexpo/2009/the-data.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run prep.py -d flights" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Inspect data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "sorted(os.listdir(os.path.join(\"data\", \"nycflights\")))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read one file with `pandas.read_csv` and compute mean departure delay" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "df = pd.read_csv(os.path.join(\"data\", \"nycflights\", \"1990.csv\"))\n", "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# What is the schema?\n", "df.dtypes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# What originating airports are in the data?\n", "df.Origin.unique()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Mean departure delay per-airport for one year\n", "df.groupby(\"Origin\").DepDelay.mean()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Sequential code: Mean Departure Delay Per Airport\n", "\n", "The above cell computes the mean departure delay per-airport for one year. Here we expand that to all years using a sequential for loop." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from glob import glob\n", "\n", "filenames = sorted(glob(os.path.join(\"data\", \"nycflights\", \"*.csv\")))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "sums = []\n", "counts = []\n", "for fn in filenames:\n", " # Read in file\n", " df = pd.read_csv(fn)\n", "\n", " # Groupby origin airport\n", " by_origin = df.groupby(\"Origin\")\n", "\n", " # Sum of all departure delays by origin\n", " total = by_origin.DepDelay.sum()\n", "\n", " # Number of flights by origin\n", " count = by_origin.DepDelay.count()\n", "\n", " # Save the intermediates\n", " sums.append(total)\n", " counts.append(count)\n", "\n", "# Combine intermediates to get total mean-delay-per-origin\n", "total_delays = sum(sums)\n", "n_flights = sum(counts)\n", "mean = total_delays / n_flights" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mean" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parallelize the code above\n", "\n", "Use `dask.delayed` to parallelize the code above. Some extra things you will need to know.\n", "\n", "1. Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.\n", " \n", "2. Calling the `.compute()` method works well when you have a single output. When you have multiple outputs you might want to use the `dask.compute` function. This way Dask can share the intermediate values.\n", " \n", "So your goal is to parallelize the code above (which has been copied below) using `dask.delayed`. You may also want to visualize a bit of the computation to see if you're doing it correctly." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# your code here" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you load the solution, add `%%time` to the top of the cell to measure the running time." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true }, "tags": [] }, "outputs": [], "source": [ "%%time\n", "\n", "# This is just one possible solution, there are\n", "# several ways to do this using `dask.delayed`\n", "\n", "\n", "@dask.delayed\n", "def read_file(filename):\n", " # Read in file\n", " return pd.read_csv(filename)\n", "\n", "\n", "sums = []\n", "counts = []\n", "for fn in filenames:\n", " # Delayed read in file\n", " df = read_file(fn)\n", "\n", " # Groupby origin airport\n", " by_origin = df.groupby(\"Origin\")\n", "\n", " # Sum of all departure delays by origin\n", " total = by_origin.DepDelay.sum()\n", "\n", " # Number of flights by origin\n", " count = by_origin.DepDelay.count()\n", "\n", " # Save the intermediates\n", " sums.append(total)\n", " counts.append(count)\n", "\n", "# Combine intermediates to get total mean-delay-per-origin\n", "total_delays = sum(sums)\n", "n_flights = sum(counts)\n", "mean, *_ = dask.compute(total_delays / n_flights)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "(sum(sums)).visualize()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ensure the results still match\n", "mean" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Some questions to consider:\n", "\n", "- How much speedup did you get? Is this how much speedup you'd expect?\n", "- Experiment with where to call `compute`. What happens when you call it on `sums` and `counts`? What happens if you wait and call it on `mean`?\n", "- Experiment with delaying the call to `sum`. What does the graph look like if `sum` is delayed? What does the graph look like if it isn't?\n", "- Can you think of any reason why you'd want to do the reduction one way over the other?\n", "\n", "### Learn More\n", "\n", "Visit the [Delayed documentation](https://docs.dask.org/en/latest/delayed.html). In particular, this [delayed screencast](https://www.youtube.com/watch?v=SHqFmynRxVU) will reinforce the concepts you learned here and the [delayed best practices](https://docs.dask.org/en/latest/delayed-best-practices.html) document collects advice on using `dask.delayed` well." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Close the Client\n", "\n", "Before moving on to the next exercise, make sure to close your client or stop this kernel." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client.close()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" } }, "nbformat": 4, "nbformat_minor": 4 }