4. Lineage API

The Lineage API is provided by the module dataworkspaces.lineage.

This module provides an API for tracking data lineage – the history of how a given result was created, including the versions of original source data and the various steps run in the data pipeline to produce the final result.

The basic idea is that your workflow is a sequence of pipeline steps:

----------     ----------     ----------     ----------
|        |     |        |     |        |     |        |
| Step 1 |---->| Step 2 |---->| Step 3 |---->| Step 4 |
|        |     |        |     |        |     |        |
----------     ----------     ----------     ----------

A step could be a command line script, a Jupyter notebook or perhaps a step in an automated workflow tool (e.g. Apache Airflow). Each step takes a number of inputs and parameters and generates outputs. The inputs are resources in your workspace (or subpaths within a resource) from which the step will read to perform its task. The parameters are configuration values passed to the step (e.g. the command line arguments of a script). The outputs are the resources (or subpaths within a resource), which are written to by the step. The outputs may represent results or intermediate data to be consumed by downstream steps.

The lineage API captures this data for each step. Here is a view of the data captured:

                    Parameters
                    ||  ||  ||
                    \/  \/  \/
                   ------------
                 =>|          |=>
Input resources  =>|  Step i  |=> Output resources
                 =>|          |=>
                   ------------
                        /\
                        ||
                       Code
                   Dependencies

To do this, we need use the following classes:

  • ResourceRef - A reference to a resource for use as a step input or output. A ResourceRef contains a resource name and an optional path within that resource. This lets you manage lineage down to the directory or even file level. The APIs also support specifying a path on the local filesystem instead of a ResourceRef. This path is automatically resolved to a ResourceRef (it must map to the a location under the local path of a resource). By storing :class:`~ResourceRef`s instead of hard-coded filesystem paths, we can include non-local resources (like an S3 bucket) and ensure that the workspace is easily deployed on a new machine.

  • Lineage - The main lineage object, instantiated at the start of your step. At the beginning of your step, you specify the inputs, parameters, and outputs. At the end of the step, the data is saved, along with any results you might have from that step. Lineage instances are context managers, which means you can use a with statement to manage their lifecycle.

  • LineageBuilder - This is a helper class to guide the creation of your lineage object.

Example

Here is an example usage of the lineage API in a command line script:

import argparse 
from dataworkspaces.lineage import LineageBuilder

def main():
    ...
    parser = argparse.ArgumentParser()
    parser.add_argument('--gamma', type=float, default=0.01,
                        help="Regularization parameter")
    parser.add_argument('input_data_dir', metavar='INPUT_DATA_DIR', type=str,
                        help='Path to input data')
    parser.add_argument('results_dir', metavar='RESULTS_DIR', type=str,
                        help='Path to where results should be stored')
    args = parser.parse_args()
    ...
    # Create a LineageBuilder instance to specify the details of the step
    # to the lineage API.
    builder = LineageBuilder()\
                .as_script_step()\
                .with_parameters({'gamma':args.gamma})\
                .with_input_path(args.input_data_dir)\
                .as_results_step(args.results_dir)

    # builder.eval() will construct the lineage object. We call it within a
    # with statement to get automatic save/cleanup when we leave the
    # with block.
    with builder.eval() as lineage:

        ... do your work here ...

        # all done, write the results
        lineage.write_results({'accuracy':accuracy,
                               'precision':precision,
                               'recall':recall,
                               'roc_auc':roc_auc})

    # When leaving the with block, the lineage is automatically saved to the
    # workspace. If an exception is thrown, the lineage is not saved, but the
    # outputs are marked as being in an unknown state.

    return 0

# boilerplate to call our main function if this is called as a script.
if __name__ == '__main__:
    sys.exit(main())

Classes

class dataworkspaces.lineage.ResourceRef(name: str, subpath: Optional[str] = None)[source]

A namedtuple that is used to identify an input or output of a step. The name parameter is the name of a resource. The optional subpath parameter is a relative path within that resource. The subpath lets you store inputs/outputs from multiple steps within the same resource and track them independently.

class dataworkspaces.lineage.Lineage[source]

This is the main object for tracking the execution of a step. Rather than instantiating it directly, use the LineageBuilder class to construct your Lineage instance.

abort()[source]

The step has failed, so we mark its outputs in an unknown state. If you create the lineage via a “with” statement, then this will be called for you automatically.

add_output_path(path: str) None[source]

Resolve the path to a resource name and subpath. Add that to the lineage as an output of the step. From this point on, if the step fails (abort() is called), the associated resource and subpath will be marked as being in an “unknown” state.

add_output_ref(ref: dataworkspaces.utils.lineage_utils.ResourceRef)[source]

Add the resource reference to the lineage as an output of the step. From this point on, if the step fails (abort() is called), the associated resource and subpath will be marked as being in an “unknown” state.

add_param(name: str, value) None[source]

Add or update one of the step’s parameters.

complete()[source]

The step has completed. Save the outputs. If you create the lineage via a “with” statement, then this will be called for you automatically.

class dataworkspaces.lineage.ResultsLineage[source]

Lineage for a results step. This subclass is returned by the LineageBuilder when as_results_step() is called. This marks the Lineage object as generating results. It adds the write_results() method for writing a JSON summary of the final results.

Results resources will also have a lineage.json file added when the next snapshot is taken. This file contains the full lineage graph collected for the resource.

write_results(metrics: Dict[str, Any])[source]

Write a results.json file to the results directory specified when creating the lineage object (e.g. via as_results_step()). This json file contains information about the step execution (e.g. start time), parameters, and the provided metrics.

class dataworkspaces.lineage.LineageBuilder[source]

Use this class to declaratively build Lineage objects. Instantiate a LineageBuilder instance, and call a sequence of configuration methods to specify your inputs, parameters, your workspace (if the script is not already inside the workspace), and whether this is a results step. Each configuration method returns the builder, so you can chain them together. Finally, call eval() to instantiate the Lineage object.

Configuration Methods

To specify the workflow step’s name, call one of:

  • as_script_step() - the script’s name will be used to infer the step and the associated code resource

  • with_step_name - explicitly specify the step name

To specify the parameters of the step (e.g. command line arguments), use the with_parameters() method.

To specify the input of the step call one or more of:

  • with_input_path() - resolve the local filesystem path to a resource and subpath and add it to the lineage as inputs. May be called more than once.

  • with_input_paths() - resolve a list of local filesystem paths to resources and subpaths and add them to the lineage as inputs. May be called more than once.

  • with_input_ref() - add the resource and subpath to the lineage as an input. May be called more than once.

  • with_no_inputs() - mutually exclusive with the other input methods. This signals that there are no inputs to this step.

To specify code resource dependencies for the step, you can call with_code_ref(). For command-line Python scripts, the main code resource is handled automatically in as_script_step(). Other subclasses of the LineageBuilder may provide similar functionality (e.g. the LineageBuilder for JupyterNotebooks will try to figure out the resource containing your notebook and set it in the lineage).

If you need to specify the workspace’s root directory, use the with_workspace_directory() method. Otherwise, the lineage API will attempt to infer the workspace directory by looking at the path of the script.

Call as_results_step() to indicate that this step is producing results. This will add a method write_results() to the Lineage object returned by eval(). The method as_results_step() takes two parameters: results_dir and, optionally, run_description. The results directory should correspond to either the root directory of a results resource or a subdirectory within the resource. If you have multiple steps of your workflow that produce results, you can create separate subdirectories for each results-producing step.

Example

Here is an example where we build a Lineage object for a script, that has one input, and that produces results:

lineage = LineageBuilder()\
            .as_script_step()\
            .with_parameters({'gamma':0.001})\
            .with_input_path(args.intermediate_data)\
            .as_results_step('../results').eval()

Methods

as_results_step(results_dir: str, run_description: Optional[str] = None) dataworkspaces.lineage.LineageBuilder[source]
as_script_step() dataworkspaces.lineage.LineageBuilder[source]
eval() dataworkspaces.lineage.Lineage[source]

Validate the current configuration, making sure all required properties have been specified, and return a Lineage object with the requested configuration.

with_code_path(path: str) dataworkspaces.lineage.LineageBuilder[source]
with_code_ref(ref: dataworkspaces.utils.lineage_utils.ResourceRef) dataworkspaces.lineage.LineageBuilder[source]
with_input_path(path: str) dataworkspaces.lineage.LineageBuilder[source]
with_input_paths(paths: List[str]) dataworkspaces.lineage.LineageBuilder[source]
with_input_ref(ref: dataworkspaces.utils.lineage_utils.ResourceRef) dataworkspaces.lineage.LineageBuilder[source]
with_no_inputs() dataworkspaces.lineage.LineageBuilder[source]
with_parameters(parameters: Dict[str, Any]) dataworkspaces.lineage.LineageBuilder[source]
with_step_name(step_name: str) dataworkspaces.lineage.LineageBuilder[source]
with_workspace_directory(workspace_dir: str) dataworkspaces.lineage.LineageBuilder[source]

Using Lineage

Once you have instrumented the individual steps of your workflow, you can run the steps as normal. Lineage data is stored in the directory .dataworkspace/current_lineage, but not checked into the associated Git repository.

When you take a snapshot, this lineage data is copied to .dataworkspace/snapshot_lineage/HASH, where HASH is the hashcode associated with the snapshot, and checked into git. This data is available as a record of how you obtained the results associated with the snapshot. In the future, more tools will be provided to analyze and operate on this lineage (e.g. replaying workflows).

When you restore a snapshot, the lineage data assocociated with the snapshot is restored to .dataworkspace/current_lineage.

Consistency

In order to fully track the status of your workflow, we make a few restrictions:

  1. Independent steps should not overwrite the same ResourceRef or a ResourceRef where one ResourceRef refers to the subdirectory of another ResourceRef.

  2. A step’s execution should not transitively depend on two different versions of the same ResourceRef. If you try to run a step in this situation, an exception will be thrown.

These restrictions should not impact reasonable workflows in practice. Furthermore, they help to catch some common classes of errors (e.g. not rerunning all the dependent steps when a change is made to an input).