4. Lineage API¶
The Lineage API is provided by the module dataworkspaces.lineage
.
This module provides an API for tracking data lineage – the history of how a given result was created, including the versions of original source data and the various steps run in the data pipeline to produce the final result.
The basic idea is that your workflow is a sequence of pipeline steps:
---------- ---------- ---------- ----------
| | | | | | | |
| Step 1 |---->| Step 2 |---->| Step 3 |---->| Step 4 |
| | | | | | | |
---------- ---------- ---------- ----------
A step could be a command line script, a Jupyter notebook or perhaps a step in an automated workflow tool (e.g. Apache Airflow). Each step takes a number of inputs and parameters and generates outputs. The inputs are resources in your workspace (or subpaths within a resource) from which the step will read to perform its task. The parameters are configuration values passed to the step (e.g. the command line arguments of a script). The outputs are the resources (or subpaths within a resource), which are written to by the step. The outputs may represent results or intermediate data to be consumed by downstream steps.
The lineage API captures this data for each step. Here is a view of the data captured:
Parameters
|| || ||
\/ \/ \/
------------
=>| |=>
Input resources =>| Step i |=> Output resources
=>| |=>
------------
/\
||
Code
Dependencies
To do this, we need use the following classes:
ResourceRef
- A reference to a resource for use as a step input or output. A ResourceRef contains a resource name and an optional path within that resource. This lets you manage lineage down to the directory or even file level. The APIs also support specifying a path on the local filesystem instead of a ResourceRef. This path is automatically resolved to a ResourceRef (it must map to the a location under the local path of a resource). By storing :class:`~ResourceRef`s instead of hard-coded filesystem paths, we can include non-local resources (like an S3 bucket) and ensure that the workspace is easily deployed on a new machine.Lineage
- The main lineage object, instantiated at the start of your step. At the beginning of your step, you specify the inputs, parameters, and outputs. At the end of the step, the data is saved, along with any results you might have from that step. Lineage instances are context managers, which means you can use awith
statement to manage their lifecycle.LineageBuilder
- This is a helper class to guide the creation of your lineage object.
Example
Here is an example usage of the lineage API in a command line script:
import argparse
from dataworkspaces.lineage import LineageBuilder
def main():
...
parser = argparse.ArgumentParser()
parser.add_argument('--gamma', type=float, default=0.01,
help="Regularization parameter")
parser.add_argument('input_data_dir', metavar='INPUT_DATA_DIR', type=str,
help='Path to input data')
parser.add_argument('results_dir', metavar='RESULTS_DIR', type=str,
help='Path to where results should be stored')
args = parser.parse_args()
...
# Create a LineageBuilder instance to specify the details of the step
# to the lineage API.
builder = LineageBuilder()\
.as_script_step()\
.with_parameters({'gamma':args.gamma})\
.with_input_path(args.input_data_dir)\
.as_results_step(args.results_dir)
# builder.eval() will construct the lineage object. We call it within a
# with statement to get automatic save/cleanup when we leave the
# with block.
with builder.eval() as lineage:
... do your work here ...
# all done, write the results
lineage.write_results({'accuracy':accuracy,
'precision':precision,
'recall':recall,
'roc_auc':roc_auc})
# When leaving the with block, the lineage is automatically saved to the
# workspace. If an exception is thrown, the lineage is not saved, but the
# outputs are marked as being in an unknown state.
return 0
# boilerplate to call our main function if this is called as a script.
if __name__ == '__main__:
sys.exit(main())
Classes¶
- class dataworkspaces.lineage.ResourceRef(name: str, subpath: Optional[str] = None)[source]¶
A namedtuple that is used to identify an input or output of a step. The
name
parameter is the name of a resource. The optionalsubpath
parameter is a relative path within that resource. The subpath lets you store inputs/outputs from multiple steps within the same resource and track them independently.
- class dataworkspaces.lineage.Lineage[source]¶
This is the main object for tracking the execution of a step. Rather than instantiating it directly, use the
LineageBuilder
class to construct yourLineage
instance.- abort()[source]¶
The step has failed, so we mark its outputs in an unknown state. If you create the lineage via a “with” statement, then this will be called for you automatically.
- add_output_path(path: str) None [source]¶
Resolve the path to a resource name and subpath. Add that to the lineage as an output of the step. From this point on, if the step fails (
abort()
is called), the associated resource and subpath will be marked as being in an “unknown” state.
- add_output_ref(ref: dataworkspaces.utils.lineage_utils.ResourceRef)[source]¶
Add the resource reference to the lineage as an output of the step. From this point on, if the step fails (
abort()
is called), the associated resource and subpath will be marked as being in an “unknown” state.
- class dataworkspaces.lineage.ResultsLineage[source]¶
Lineage for a results step. This subclass is returned by the
LineageBuilder
whenas_results_step()
is called. This marks theLineage
object as generating results. It adds thewrite_results()
method for writing a JSON summary of the final results.Results resources will also have a
lineage.json
file added when the next snapshot is taken. This file contains the full lineage graph collected for the resource.- write_results(metrics: Dict[str, Any])[source]¶
Write a
results.json
file to the results directory specified when creating the lineage object (e.g. viaas_results_step()
). This json file contains information about the step execution (e.g. start time), parameters, and the provided metrics.
- class dataworkspaces.lineage.LineageBuilder[source]¶
Use this class to declaratively build
Lineage
objects. Instantiate a LineageBuilder instance, and call a sequence of configuration methods to specify your inputs, parameters, your workspace (if the script is not already inside the workspace), and whether this is a results step. Each configuration method returns the builder, so you can chain them together. Finally, calleval()
to instantiate theLineage
object.Configuration Methods
To specify the workflow step’s name, call one of:
as_script_step()
- the script’s name will be used to infer the step and the associated code resourcewith_step_name - explicitly specify the step name
To specify the parameters of the step (e.g. command line arguments), use the
with_parameters()
method.To specify the input of the step call one or more of:
with_input_path()
- resolve the local filesystem path to a resource and subpath and add it to the lineage as inputs. May be called more than once.with_input_paths()
- resolve a list of local filesystem paths to resources and subpaths and add them to the lineage as inputs. May be called more than once.with_input_ref()
- add the resource and subpath to the lineage as an input. May be called more than once.with_no_inputs()
- mutually exclusive with the other input methods. This signals that there are no inputs to this step.
To specify code resource dependencies for the step, you can call
with_code_ref()
. For command-line Python scripts, the main code resource is handled automatically inas_script_step()
. Other subclasses of the LineageBuilder may provide similar functionality (e.g. the LineageBuilder for JupyterNotebooks will try to figure out the resource containing your notebook and set it in the lineage).If you need to specify the workspace’s root directory, use the
with_workspace_directory()
method. Otherwise, the lineage API will attempt to infer the workspace directory by looking at the path of the script.Call
as_results_step()
to indicate that this step is producing results. This will add a methodwrite_results()
to theLineage
object returned byeval()
. The methodas_results_step()
takes two parameters: results_dir and, optionally, run_description. The results directory should correspond to either the root directory of a results resource or a subdirectory within the resource. If you have multiple steps of your workflow that produce results, you can create separate subdirectories for each results-producing step.Example
Here is an example where we build a
Lineage
object for a script, that has one input, and that produces results:lineage = LineageBuilder()\ .as_script_step()\ .with_parameters({'gamma':0.001})\ .with_input_path(args.intermediate_data)\ .as_results_step('../results').eval()
Methods
- as_results_step(results_dir: str, run_description: Optional[str] = None) dataworkspaces.lineage.LineageBuilder [source]¶
- as_script_step() dataworkspaces.lineage.LineageBuilder [source]¶
- eval() dataworkspaces.lineage.Lineage [source]¶
Validate the current configuration, making sure all required properties have been specified, and return a
Lineage
object with the requested configuration.
- with_code_path(path: str) dataworkspaces.lineage.LineageBuilder [source]¶
- with_code_ref(ref: dataworkspaces.utils.lineage_utils.ResourceRef) dataworkspaces.lineage.LineageBuilder [source]¶
- with_input_path(path: str) dataworkspaces.lineage.LineageBuilder [source]¶
- with_input_paths(paths: List[str]) dataworkspaces.lineage.LineageBuilder [source]¶
- with_input_ref(ref: dataworkspaces.utils.lineage_utils.ResourceRef) dataworkspaces.lineage.LineageBuilder [source]¶
- with_no_inputs() dataworkspaces.lineage.LineageBuilder [source]¶
- with_parameters(parameters: Dict[str, Any]) dataworkspaces.lineage.LineageBuilder [source]¶
- with_step_name(step_name: str) dataworkspaces.lineage.LineageBuilder [source]¶
- with_workspace_directory(workspace_dir: str) dataworkspaces.lineage.LineageBuilder [source]¶
Using Lineage¶
Once you have instrumented the individual steps of your workflow,
you can run the steps as normal. Lineage data is stored in the
directory .dataworkspace/current_lineage
, but not checked into
the associated Git repository.
When you take a snapshot, this
lineage data is copied to .dataworkspace/snapshot_lineage/HASH
,
where HASH is the hashcode associated with the snapshot,
and checked into git. This data is available as a record of how
you obtained the results associated with the snapshot. In the
future, more tools will be provided to analyze and operate on
this lineage (e.g. replaying workflows).
When you restore a snapshot, the lineage data assocociated
with the snapshot is restored to .dataworkspace/current_lineage
.
Consistency¶
In order to fully track the status of your workflow, we make a few restrictions:
Independent steps should not overwrite the same ResourceRef or a ResourceRef where one ResourceRef refers to the subdirectory of another ResourceRef.
A step’s execution should not transitively depend on two different versions of the same ResourceRef. If you try to run a step in this situation, an exception will be thrown.
These restrictions should not impact reasonable workflows in practice. Furthermore, they help to catch some common classes of errors (e.g. not rerunning all the dependent steps when a change is made to an input).