"""
This module (``dataworkspaces.kits.scikit_learn``)
provides integration with the `scikit-learn <https://scikit-learn.org>`_
framework. The main class provided here is :class:`~LineagePredictor`,
which wraps any class following sklearn's predictor protocol. It captures
inputs, model parameters and results. This module also provides
:class:`~Metrics` and its subclasses, which
support the computation of common metrics and the writing of them
to a results file. Finally, there is
:func:`~train_and_predict_with_cv`, which
runs a common sklearn classification workflow, including grid search.
"""
from typing import Optional, Union, Dict, List, Any, cast
from abc import ABCMeta, abstractmethod
from sklearn.base import ClassifierMixin # type: ignore
from sklearn import metrics # type: ignore
from sklearn.model_selection import GridSearchCV, train_test_split # type: ignore
from sklearn.utils import Bunch # type: ignore
import sklearn.utils.metaestimators # type: ignore
import sys
import numpy as np # type: ignore
import os
from os.path import join, abspath, expanduser, exists, isabs
from tempfile import NamedTemporaryFile
from dataworkspaces.errors import ConfigurationError
from dataworkspaces.lineage import LineageBuilder
from dataworkspaces.workspace import (
find_and_load_workspace,
LocalStateResourceMixin,
FileResourceMixin,
)
from dataworkspaces.utils.lineage_utils import ResourceRef
from dataworkspaces.kits.wrapper_utils import _DwsModelState, _add_to_hash
from .jupyter import is_notebook, get_step_name_for_notebook, get_notebook_directory
try:
import joblib # type: ignore
except ImportError as e:
raise ConfigurationError('Please install the joblib package (via "pip install joblib")') from e
def _load_dataset_file(dataset_path, filename):
filepath = join(dataset_path, filename)
if filename.endswith(".txt") or filename.endswith(".rst"):
with open(filepath, "r") as f:
return f.read()
elif filename.endswith(".csv") or filename.endswith(".csv.gz") or filename.endswith(".csv.bz2"):
try:
return np.loadtxt(filepath, delimiter=",")
except ValueError:
# try with pandas
import pandas # type: ignore
df = pandas.read_csv(filepath)
if len(df.values.shape) == 2 and df.values.shape[1] == 1: # this is just a list
return df.values.reshape(df.values.shape[0])
else:
return df.values
elif filename.endswith(".npy"):
return np.load(filepath)
[docs]def load_dataset_from_resource(
resource_name: str, subpath: Optional[str] = None, workspace_dir: Optional[str] = None
) -> Bunch:
"""
Load a datset (data and targets) from the specified resource, and returns an
sklearn-style Bunch (a dictionary-like object). The bunch will include at least
three attributes:
* ``data`` - a NumPy array of shape number_samples * number_features
* ``target`` - a NumPy array of length number_samples
* ``resource`` - a :class:`~dataworkspaces.workspace.ResourceRef` that provides the resource name and
subpath (if any) for the data
Some other attributes that may also be present, depending on the data set:
* ``DESCR`` - text containing a full description of the data set (for humans)
* ``feature_names`` - an array of length number_features containing the name
of each feature.
* ``target_names`` - an array containing the name of each target class
Data sets may define their own attributes as well (see below).
**Parameters**
resource_name
The name of the resource containing the dataset.
subpath
Optional subpath within the resource where this specific dataset is located.
If not specified, the root of the resource is used.
workspace_dir
The root directory of your workspace in the local file system. Usually,
this can be left unspecified and inferred by DWS, which will search up
from the current working directory.
**Creating a Dataset**
To create a dataset in your resource that is suitable for importing by this function,
you simply need to create a file for each attribute you want in the bunch and place
all these files in the same directory within your resource.
The names of the files should be ``ATTRIBUTE.extn`` where ``ATTRIBUTE`` is the
attribute name (e.g. ``data`` or ``DESCR``) and ``.extn`` is a file extension
indicating the format. Supported file extensions are:
* ``.txt`` or ``.rst`` - text files
* ``.csv`` - csv files. These are read in using ``numpy.loadtxt()``. If this
fails because the csv does not contain all numeric data, pandas is used to read
in the file. It is then converted back to a numpy array.
* ``.csv.gz`` or ``.csv.bz2`` - these are compressed csv files which are treated
the same was as csv files (numpy and pandas will automatically uncompress before parsing).
* ``.npy`` - this a a file containing a serialized NumPy array saved via ``numpy.save()``.
It is loaded using ``numpy.load()``.
"""
workspace = find_and_load_workspace(True, False, workspace_dir)
workspace.validate_resource_name(resource_name, subpath)
dataset_name = (
"Resource " + resource_name + " subpath " + subpath
if subpath is not None
else "Resource " + resource_name
)
r = workspace.get_resource(resource_name)
if not isinstance(r, LocalStateResourceMixin) or (r.get_local_path_if_any() is None):
# TODO: Support a data access api
raise ConfigurationError(
"Unable to instantiate a data set for resource '%s': currently not supported for non-local resources"
% resource_name
)
local_path = r.get_local_path_if_any()
assert local_path is not None
dataset_path = join(local_path, subpath) if subpath is not None else local_path
result = {} # this will be the args to the result Bunch
# First load data and target files, which are required
data_file = join(dataset_path, "data.csv")
if exists(data_file):
pass
elif exists(data_file + ".gz"):
data_file += ".gz"
elif exists(data_file + ".bz2"):
data_file += ".bz2"
else:
raise ConfigurationError(
"Did not find data file for %s at '%s'" % (dataset_name, data_file)
)
result["data"] = np.loadtxt(data_file, delimiter=",")
target_file = join(dataset_path, "target.csv")
if exists(target_file):
pass
elif exists(target_file + ".gz"):
target_file += ".gz"
elif exists(target_file + ".bz2"):
target_file += ".bz2"
else:
raise ConfigurationError(
"Did not find target file for %s at '%s'" % (dataset_name, target_file)
)
result["target"] = np.loadtxt(target_file, delimiter=",")
if result["data"].shape[0] != result["target"].shape[0]:
raise ConfigurationError(
"Data matrix at '%s' has %d rows, but target at '%s' has %d rows"
% (data_file, result["data"].shape[0], target_file, result["target"].shape[0])
)
result["resource"] = ResourceRef(resource_name, subpath)
# check for and load any other attributes
for fname in os.listdir(dataset_path):
if fname.endswith(".txt"):
result[fname[:-4]] = _load_dataset_file(dataset_path, fname)
elif fname.endswith(".rst"):
result[fname[:-4]] = _load_dataset_file(dataset_path, fname)
elif fname.endswith(".csv"):
result[fname[:-4]] = _load_dataset_file(dataset_path, fname)
elif fname.endswith(".csv.gz"):
result[fname[:-7]] = _load_dataset_file(dataset_path, fname)
elif fname.endswith(".csv.bz2"):
result[fname[:-8]] = _load_dataset_file(dataset_path, fname)
elif fname.endswith(".npy"):
result[fname[:-4]] = _load_dataset_file(dataset_path, fname)
return Bunch(**result)
[docs]class Metrics(metaclass=ABCMeta):
"""Metrics and its subclasses are convenience classes
for sklearn metrics. The subclasses
of Matrics are used by :func:`~train_and_predict_with_cv`
in printing a metrics report and generating the metrics
json file.
"""
def __init__(self, expected, predicted, sample_weight=None):
self.expected = expected
self.predicted = predicted
self.sample_weight = sample_weight
@abstractmethod
def to_dict(self) -> Dict[str, Any]:
pass
[docs] @abstractmethod
def score(self) -> float:
"""Given the expected and predicted values, compute the metric
for this type of predictor, as needed for the predictor's score()
method. This is used in the wrapped classes to avoid multiple
calls to predict()."""
pass
[docs] @abstractmethod
def print_metrics(self, file=sys.stdout) -> None:
"""Print the metrics to a file
"""
pass
[docs]class BinaryClassificationMetrics(Metrics):
"""Given an array of expected (target) values
and the actual predicted values from a classifier,
compute metrics that make sense for a binary
classifier, including accuracy, precision, recall, roc auc,
and f1 score.
"""
def __init__(self, expected, predicted, sample_weight=None):
super().__init__(expected, predicted, sample_weight)
self.accuracy = metrics.accuracy_score(expected, predicted, sample_weight=sample_weight)
self.precision = metrics.precision_score(expected, predicted, sample_weight=sample_weight)
self.recall = metrics.recall_score(expected, predicted, sample_weight=sample_weight)
self.roc_auc = metrics.roc_auc_score(expected, predicted, sample_weight=sample_weight)
self.f1_score = metrics.f1_score(expected, predicted, sample_weight=sample_weight)
def to_dict(self) -> Dict[str, Any]:
return {
"accuracy": self.accuracy,
"precision": self.precision,
"recall": self.recall,
"roc_auc_score": self.roc_auc,
"f1_score": self.f1_score,
}
[docs] def score(self) -> float:
"""Metric for binary classification is accuracy
"""
return self.accuracy
[docs] def print_metrics(self, file=sys.stdout) -> None:
for k, v in self.to_dict().items():
print("%13s: %.02f" % (k, cast(Union[int, float], v)), file=file)
[docs]class MulticlassClassificationMetrics(Metrics):
"""Given an array of expected (target) values
and the actual predicted values from a classifier,
compute metrics that make sense for a multi-class
classifier, including accuracy and sklearn's
"classification report" showing per-class metrics.
"""
def __init__(self, expected, predicted, sample_weight=None):
super().__init__(expected, predicted, sample_weight)
self.accuracy = metrics.accuracy_score(expected, predicted, sample_weight=sample_weight)
self.classification_report = metrics.classification_report(
expected, predicted, sample_weight=sample_weight, output_dict=True
)
[docs] def score(self) -> float:
"""Metric for multiclass classification is accuracy
"""
return self.accuracy
def to_dict(self):
return {"accuracy": self.accuracy, "classification_report": self.classification_report}
[docs] def print_metrics(self, file=sys.stdout):
print("accuracy: %.02f" % self.accuracy, file=file)
print("classification report:", file=file)
print(metrics.classification_report(self.expected, self.predicted))
class RegressionMetrics(Metrics):
"""For regression, we capture the r-squared score and
the mean squared error.
"""
def __init__(self, expected, predicted, sample_weight=None):
super().__init__(expected, predicted, sample_weight)
self.r2_score = metrics.r2_score(self.expected, self.predicted, sample_weight=sample_weight)
self.mean_squared_error = metrics.mean_squared_error(
self.expected, self.predicted, sample_weight=sample_weight
)
def score(self) -> float:
"""Metric for regression is r2_score
"""
return self.r2_score
def to_dict(self):
return {"r2_score": self.r2_score, "mean_squared_error": self.mean_squared_error}
def print_metrics(self, file=sys.stdout):
print("r2_score: %.02f" % self.r2_score, file=file)
print("mean_squared_error: %.02f" % self.mean_squared_error, file=file)
_METRICS = {
"binary_classification": BinaryClassificationMetrics,
"multiclass_classification": MulticlassClassificationMetrics,
"regression": RegressionMetrics,
}
[docs]class LineagePredictor(sklearn.utils.metaestimators._BaseComposition):
"""This is a wrapper for adding lineage to any predictor in sklearn.
To use it, instantiate the predictor (for classification or regression)
and then create a new instance of :class:`~LineagePredictor`.
The initializer finds the associated workspace and initializes a
:class:`~dataworkspaces.lineage.Lineage` instance. The input_resource
is recorded in this lineage. Other methods call the underlying wrapped
predictor's methods, with additional functionality as needed (see below).
**Parameters**
predictor
Any sklearn predictor instance. It must have ``fit`` and ``predict``
methods.
metrics
Either a string naming a metrics type or a subclass of :class:`~Metrics`.
If a string, it should be one of: binary_classification,
multiclass_classification, or regression.
input_resource
Resource providing the input data to this model. May be
specified by name, by a local file path, or via a
:class:`~dataworkspaces.workspace.ResourceRef`.
resource_resource
(optional) Resource where the results are to be stored.
May be specified by name, by a local file path, or via a
:class:`!ResourceRef`.
If not specified, will try to infer from the workspace.
model_save_file
(optional) Name of file to store a (joblib-formmatted)
serialization of the trained model upon completion of the ``fit()``
method. This should be a relative path, as it is stored under
the results resource. If model_save_file is not specified,
no model is saved.
workspace_dir
(optional) Directory specifying the workspace. Usually can be
inferred from the current directory.
verbose
If True, print a lot of detailed information about the execution
of Data Workspaces.
**Example**
Here is an example useage of the wrapper, taken from the
:ref:`Quick Start <quickstart>`::
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from dataworkspaces.kits.scikit_learn import load_dataset_from_resource
from dataworkspaces.kits.scikit_learn import LineagePredictor
dataset = load_dataset_from_resource('sklearn-digits-dataset')
X_train, X_test, y_train, y_test = train_test_split(
dataset.data, dataset.target, test_size=0.5, shuffle=False)
classifier = LineagePredictor(SVC(gamma=0.001),
metrics='multiclass_classification',
input_resource=dataset.resource,
model_save_file='digits.joblib')
classifier.fit(X_train, y_train)
score = classifier.score(X_test, y_test)
**Methods**
"""
_dws_model_wrap = True
def __init__(
self,
predictor,
metrics: Union[str, type],
input_resource: Union[str, ResourceRef],
results_resource: Optional[Union[str, ResourceRef]] = None,
model_save_file: Optional[str] = None,
workspace_dir: Optional[str] = None,
verbose: bool = False,
):
if hasattr(predictor, "_dws_model_wrap") and predictor._dws_model_wrap is True: # type: ignore
print("dws>> %s is already wrapped" % repr(predictor))
return predictor # already wrapped
self.predictor = predictor
assert metrics in _METRICS.keys() or (
isinstance(metrics, type) and issubclass(metrics, Metrics)
), (
"%s is not a subclass of Metrics and not one of %s"
% (repr(metrics), ", ".join([repr(s) for s in _METRICS.keys()]))
)
self.metrics = metrics
self.input_resource = input_resource
self.results_resource = results_resource
self.model_save_file = model_save_file
if model_save_file is not None:
assert not isabs(model_save_file), "Model save file should not be an absolute path"
self.workspace_dir = workspace_dir
self.metrics = metrics
self.verbose = verbose
self.score_has_been_run = False
self._init_dws_state()
def _init_dws_state(self):
workspace = find_and_load_workspace(
batch=True, verbose=self.verbose, uri_or_local_path=self.workspace_dir
)
self._dws_state = _DwsModelState(workspace, self.input_resource, self.results_resource)
def _save_model(self):
assert self.model_save_file
if not self.model_save_file.endswith(".joblib"):
model_save_file = self.model_save_file + ".joblib"
else:
model_save_file = self.model_save_file
tempname = None
try:
with NamedTemporaryFile(delete=False, suffix=".joblib") as f:
tempname = f.name
joblib.dump(self, tempname)
resource = self._dws_state.workspace.get_resource(self._dws_state.results_ref.name)
if self._dws_state.results_ref.subpath is not None:
target_name = join(self._dws_state.results_ref.subpath, model_save_file)
else:
target_name = model_save_file
cast(FileResourceMixin, resource).upload_file(tempname, target_name)
finally:
if (tempname is not None) and exists(tempname):
os.remove(tempname)
if self.verbose:
print("dws> saved model file to %s:%s" % (resource.name, target_name))
def __getstate__(self):
state = super().__getstate__()
if "_dws_state" in state:
del state["_dws_state"]
return state
def __setstate__(self, state):
super().__setstate__(state)
self._init_dws_state()
def set_params(self, **params):
""""""
super().set_params(**params)
self._init_dws_state()
return self
[docs] def fit(self, X, y, *args, **kwargs):
"""The underlying fit() method of a predictor trains the predictio based
on the input data (X) and labels (y).
If the input resource is an api resource, the wrapper captures the hash of
the inputs.
If ``model_save_file`` was specified, it also saves the trained model."""
api_resource = self._dws_state.find_input_resources_and_return_if_api(X, y)
if api_resource is not None:
api_resource.init_hash_state()
hash_state = api_resource.get_hash_state()
_add_to_hash(X, hash_state)
_add_to_hash(y, hash_state)
api_resource.save_current_hash() # in case we evaluate in a separate process
result = self.predictor.fit(X, y, *args, **kwargs)
if self.model_save_file is not None:
self._save_model()
return result
[docs] def score(self, X, y, sample_weight=None):
"""This method make predictions from a trained model and scores them
according to the metrics specified when instantiated the wrapper.
If the input resource is an api resource, the wrapper captures its hash.
The wapper runs the wrapped predictor's :meth:`~predict` method to
generate predictions. A `metrics` object is instantiated to compute the metrics
for the predictions and a ``results.json`` file is written to the
results resource. The lineage data is saved and finally the score
is computed from the predictions and returned to the caller."""
if self.score_has_been_run:
# This might be from a saved model, so we reset the
# execution time, etc.
self._dws_state.reset_lineage()
for (param, value) in self.predictor.get_params(deep=True).items():
self._dws_state.lineage.add_param(param, value)
api_resource = self._dws_state.find_input_resources_and_return_if_api(X, y)
if api_resource is not None:
api_resource.dup_hash_state()
hash_state = api_resource.get_hash_state()
_add_to_hash(X, hash_state)
if y is not None:
_add_to_hash(y, hash_state)
api_resource.save_current_hash()
api_resource.pop_hash_state()
predictions = self.predictor.predict(X)
if isinstance(self.metrics, str):
metrics_inst = _METRICS[self.metrics](y, predictions, sample_weight=sample_weight) # type: ignore
else:
metrics_inst = self.metrics(y, predictions, sample_weight=sample_weight)
self._dws_state.write_metrics_and_complete(metrics_inst.to_dict())
self.score_has_been_run = True
return metrics_inst.score()
[docs] def predict(self, X):
"""The underlying :meth:`~predict` method is called directly,
without affecting the lineage."""
return self.predictor.predict(X)
def train_and_predict_with_cv(
classifier_class: ClassifierMixin,
param_grid: Union[Dict[str, List[Any]], List[Dict[str, List[Any]]]],
dataset: Bunch,
results_dir: str,
test_size: float = 0.2,
folds: int = 5,
cv_scoring: str = "accuracy",
model_name: Optional[str] = None,
random_state: Optional[int] = None,
run_description: Optional[str] = None,
) -> None:
"""NOTE: This function is under consideration for DEPRECATION and
may be removed from a future version.
This function implements a common workflow for sklearn classifiers:
1. Splits the data into training set and a final validation set.
2. Runs a grid search cross validation to find the best combination
of hyperparameters for the classifier on the training data.
3. Trains the model on the training data using the best hyperparameter
values.
4. Predicts the classes of the validation test data set and computes
common metrics comparing the training and testing data.
5. Writes the metrics to a results file at RESULTS_DIR/results.json.
6. If the ``model_name`` parameter was specified, retrain the classifier
on all the data and save the (pickled) model to the results directory.
7. Write out the lineage data for this experiment.
**Parameters**
classifier_class
An sklearn classifier or a class implementing the same interface
param_grid
As described in the documentation for
`GridSearchCV <https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html#sklearn.model_selection.GridSearchCV>`_,
a dictionary with parameters names as keys and lists of parameter settings to
try as values, or a list of such dictionaries. The various combinations
of these parameters will be searched to find the best classifiation results
on the training data.
dataset
A sklean Bunch object with members for data, target, and resource. This can
be loaded by calling :func:`~load_dataset_from_resource`
results_dir
The directory on the local filesystem to which the results should be
written.
test_size
The fraction of the input data samples to be held back for the final
validation. Defaults to 0.2 (20%).
folds
Number of cross validation folds. Defaults to 5.
cv_scoring
Name of scoring algorithm to use in evaluating the hyperparameter
combinations in cross validation. Defaults to 'accuracy'. See
`here <https://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter>`_
for details.
model_name
If specified, retrain the model using the entire data set (train + test)
and the best parameters found during cross validation. Pickle and save
this model to the file RESULTS_DIR/MODEL_NAME.pkl. If the model name is
not specified, skip this step.
random_state
Optional integer to be used as a random seed.
run_description
Optional text describing this particular run. This is saved in the results
file and the lineage.
**Example**
Here is an example (taken from the :ref:`Quick Start <quickstart>`)::
import numpy as np
from os.path import join
from sklearn.svm import SVC
from dataworkspaces.kits.scikit_learn import load_dataset_from_resource,\
train_and_predict_with_cv
RESULTS_DIR='../results'
dataset = load_dataset_from_resource('sklearn-digits-dataset')
train_and_predict_with_cv(SVC, {'gamma':[0.01, 0.001, 0.0001]}, dataset,
RESULTS_DIR, random_state=42)
This trains a Support Vector Classifier with three different values of gamma
(0.01, 0.001, and 0.0001) and then evaluates the trained classifier on the
holdout data. The results are writen to ``../results/results.json``.
"""
X_train, X_test, y_train, y_test = train_test_split(
dataset.data, dataset.target, test_size=test_size, random_state=random_state
)
# find the best combination of hyperparameters
search = GridSearchCV(classifier_class(), param_grid=param_grid, scoring=cv_scoring, cv=folds)
search.fit(X_train, y_train)
best_params = search.best_params_
print("Best params were: %s" % repr(best_params))
lineage_params = {
"classifier": classifier_class.__name__,
"test_size": test_size,
"cv_params": param_grid,
"cv_scoring": cv_scoring,
"random_state": random_state,
}
for (k, v) in best_params.items():
lineage_params[k] = v
lb = (
LineageBuilder()
.with_parameters(lineage_params)
.as_results_step(results_dir, run_description)
.with_input_ref(dataset.resource)
)
if is_notebook():
lb = lb.with_code_path(get_notebook_directory())
step_name = get_step_name_for_notebook()
if step_name is not None:
lb = lb.with_step_name(step_name) # not always able to determine this
else:
lb = lb.as_script_step()
with lb.eval() as lineage:
# Instantiate a classifier with the best parameters and train
classifier = classifier_class(**best_params)
classifier.fit(X_train, y_train)
# Now predict the value of the digit on the test set
predicted = classifier.predict(X_test)
m = (
MulticlassClassificationMetrics(y_test, predicted)
if len(np.unique(dataset.target)) > 2
else BinaryClassificationMetrics(y_test, predicted)
) # type: Metrics
m.print_metrics()
lineage.write_results(m.to_dict())
if model_name is not None:
classifier = classifier_class(**best_params)
classifier.fit(dataset.data, dataset.target)
model_file = join(abspath(expanduser(results_dir)), model_name + ".pkl")
joblib.dump(classifier, model_file)
print("Wrote trained model to %s" % model_file)