You can run this notebook in a live session or view it on Github.

# Parallel and Distributed Machine Learning¶

Dask-ML has resources for parallel and distributed machine learning.

## Types of Scaling¶

There are a couple of distinct scaling problems you might face. The scaling strategy depends on which problem you’re facing.

CPU-Bound: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.

Memory-bound: Data is larger than RAM, and sampling isn’t an option.

For in-memory problems, just use scikit-learn (or your favorite ML library).

For large models, use

`dask_ml.joblib`

and your favorite scikit-learn estimatorFor large datasets, use

`dask_ml`

estimators

## Scikit-Learn in 5 Minutes¶

Scikit-Learn has a nice, consistent API.

You instantiate an

`Estimator`

(e.g.`LinearRegression`

,`RandomForestClassifier`

, etc.). All of the models*hyperparameters*(user-specified parameters, not the ones learned by the estimator) are passed to the estimator when it’s created.You call

`estimator.fit(X, y)`

to train the estimator.Use

`estimator`

to inspect attributes, make predictions, etc.

Let’s generate some random data.

```
[ ]:
```

```
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000, n_features=4, random_state=0)
X[:8]
```

```
[ ]:
```

```
y[:8]
```

We’ll fit a Support Vector Classifier.

```
[ ]:
```

```
from sklearn.svm import SVC
```

Create the estimator and fit it.

```
[ ]:
```

```
estimator = SVC(random_state=0)
estimator.fit(X, y)
```

Inspect the learned attributes.

```
[ ]:
```

```
estimator.support_vectors_[:4]
```

Check the accuracy.

```
[ ]:
```

```
estimator.score(X, y)
```

## Hyperparameters¶

Most models have *hyperparameters*. They affect the fit, but are specified up front instead of learned during training.

```
[ ]:
```

```
estimator = SVC(C=0.00001, shrinking=False, random_state=0)
estimator.fit(X, y)
estimator.support_vectors_[:4]
```

```
[ ]:
```

```
estimator.score(X, y)
```

## Hyperparameter Optimization¶

There are a few ways to learn the best *hyper*parameters while training. One is `GridSearchCV`

. As the name implies, this does a brute-force search over a grid of hyperparameter combinations.

```
[ ]:
```

```
from sklearn.model_selection import GridSearchCV
```

```
[ ]:
```

```
%%time
estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
'C': [0.001, 10.0],
'kernel': ['rbf', 'poly'],
}
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
grid_search.fit(X, y)
```

## Single-machine parallelism with scikit-learn¶

Scikit-Learn has nice *single-machine* parallelism, via Joblib. Any scikit-learn estimator that can operate in parallel exposes an `n_jobs`

keyword. This controls the number of CPU cores that will be used.

```
[ ]:
```

```
%%time
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
grid_search.fit(X, y)
```

## Multi-machine parallelism with Dask¶

Dask can talk to scikit-learn (via joblib) so that your *cluster* is used to train a model.

If you run this on a laptop, it will take quite some time, but the CPU usage will be satisfyingly near 100% for the duration. To run faster, you would need a disrtibuted cluster. That would mean putting something in the call to `Client`

something like

```
c = Client('tcp://my.scheduler.address:8786')
```

Details on the many ways to create a cluster can be found here.

Let’s try it on a larger problem (more hyperparameters).

```
[ ]:
```

```
import joblib
import dask.distributed
c = dask.distributed.Client()
```

```
[ ]:
```

```
param_grid = {
'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
# Uncomment this for larger Grid searches on a cluster
# 'kernel': ['rbf', 'poly', 'linear'],
# 'shrinking': [True, False],
}
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1)
```

```
[ ]:
```

```
%%time
with joblib.parallel_backend("dask", scatter=[X, y]):
grid_search.fit(X, y)
```

```
[ ]:
```

```
grid_search.best_params_, grid_search.best_score_
```

# Training on Large Datasets¶

Sometimes you’ll want to train on a larger than memory dataset. `dask-ml`

has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine’s RAM.

```
[ ]:
```

```
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np
```

We’ll make a small (random) dataset locally using scikit-learn.

```
[ ]:
```

```
n_centers = 12
n_features = 20
X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)
centers = np.zeros((n_centers, n_features))
for i in range(n_centers):
centers[i] = X_small[y_small == i].mean(0)
centers[:4]
```

The small dataset will be the template for our large random dataset. We’ll use `dask.delayed`

to adapt `sklearn.datasets.make_blobs`

, so that the actual dataset is being generated on our workers.

```
[ ]:
```

```
n_samples_per_block = 200000
n_blocks = 500
delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
centers=centers,
n_features=n_features,
random_state=i)[0]
for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)
for obj in delayeds]
X = da.concatenate(arrays)
X
```

```
[ ]:
```

```
X = X.persist() # Only run this on the cluster.
```

The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.

They follow the scikit-learn API, so if you’re familiar with scikit-learn, you’ll feel at home with Dask-ML.

```
[ ]:
```

```
from dask_ml.cluster import KMeans
```

```
[ ]:
```

```
clf = KMeans(init_max_iter=3, oversampling_factor=10)
```

```
[ ]:
```

```
%time clf.fit(X)
```

```
[ ]:
```

```
clf.labels_
```

```
[ ]:
```

```
clf.labels_[:10].compute()
```