Tecton

Efficient & Accurate Data Generation for ML Models

, By , Published: October 17, 2023
Last updated: October 31, 2023

Every effective machine learning model depends on well-structured, representative training data. This involves building relevant features, keys, timestamps, and organizing them into a dataset that guides the model’s learning.

In this post, we’ll cover the main challenges faced when constructing queries for feature retrieval and how we solved them.

Concepts: Transforming data for real-time machine learning

Materialization

Materialization is the process of precomputing feature data using a feature pipeline, followed by writing the results to either the online or offline feature store. The main objective of materialization is to enable quick feature retrieval during training and inference, which helps reduce latencies and improves the efficiency of ML applications.

Training data generation

Training data generation is the process of computing the associated feature data for a given set of training events. The complexity and scale of these queries can vary extensively. They may be as simple as retrieving a single row in conjunction with a small table or as intricate as operations spanning trillions of training events across hundreds of tables. In some cases, queries may involve vast aggregations and joins across large datasets, encapsulating tens of terabytes of data amassed over half a decade, all while upholding point-in-time accuracy.

Challenge: Avoiding training / serving skew

In real-time ML, ensuring data integrity is key. For accurate training, the system should automatically compute the most recent feature based on the training event’s timing. This often depends on which features would have been available in the online store at the training event’s timestamp. There are a few factors we must factor in to determine which features are eligible:

  1. Materialization schedule: Check when data enters the online store. You want to join against the value available at the time of request, not some future value.
  2. Backfill range: Consider the backfill period and ignore data outside this range.
  3. Time-to-Live (TTL): Discard values outside the feature store’s TTL range.

Our training events can only use data available prior to the prediction time, since we can’t see into the future at inference time. Using future data in offline training data can skew results, making offline performance seem better than it really is.

Challenge: Difficulties with our original query engine

With Tecton, you can build data pipelines for batch, streaming, and request-time data, enabling seamless integration of user-defined transformation code written in PySpark, Snowflake SQL, Python, and Pandas.

In our original query engine, the absence of a shared foundation across these dialects posed a significant hurdle. Whenever new functionality was added to Tecton, it often required substantial effort to port to all platforms due to the lack of a unified framework. This disjointed approach hindered our team’s ability to build new capabilities.

Furthermore, we had implemented several performance optimizations over the years, but it was difficult to tell whether our query hints or choices of join types would be optimal for new use cases. Because changes could have a lot of unforeseen impact, only a few engineers were comfortable making changes to the query generation code.

We briefly experimented with generating code using JINJA templates rather than in Python, thinking it could make the queries more readable. However, this resulted in several issues. For instance, templates were inflexible, so we had to compose queries in a very generic and wasteful fashion—but doing so caused the query to be inefficient, slow to compile, and in some cases, even exceeded the system character limit.

We needed something that could allow us to construct queries in a modular way and translate them into a performant implementation on multiple compute engines. So we built QueryTree.

QueryTree: An abstracted approach to query construction & execution

QueryTree is a framework that re-imagines the logical components of a query as nodes within a tree-like structure. This abstraction opens up a whole new way of managing and manipulating data, allowing us to handle complex queries with enhanced readability and maintainability. Each node within this QueryTree signifies a distinct part of the query, making it easier to understand, modify, and optimize. This is a concept borrowed from traditional SQL query planners; however, our QueryTree operates at a higher level of abstraction.

The primary workflow of the QueryTree framework involves three steps:

1. Building the tree: The first step involves constructing the QueryTree using Python. This is where you define your query’s logic using the building blocks provided by the framework. You start by creating a root node, then branch out to construct the various parts of your query. Each node represents a logical unit of your query, and the parent-child relationships between nodes illustrate the dependencies between different parts of the query.

data_source = DataSourceScanNode(
    data_source_spec=feature_view.input_data_source,
    time_limits=feature_view.input_data_source.get_time_limits(time_limits),
)
feature_view_pipeline = FeatureViewPipelineNode(
    inputs=(data_source,),
    feature_view_spec=feature_view,
    feature_time_limits=time_limits,
)
tree = RenameColsNode(feature_view_pipeline, mapping={TECTON_INTERNAL_TIME_KEY: feature_view.time_key})

Python

2. Generating the compute-specific query: Once you have built your QueryTree, the next step is to generate a compute-specific query from it. The framework supports various compute platforms and can generate queries in the respective language of the target compute infrastructure. This stage is where the framework’s abstraction capabilities truly shine, allowing you to create queries for a variety of compute infrastructures without worrying about the specifics of each.

spark_tree = translate.spark_convert(tree)

Python

3. Executing the query on compute infrastructure: The final step involves executing the generated query on your chosen compute infrastructure. The QueryTree framework’s adaptability allows for seamless execution of queries across different compute platforms, eliminating the need to write platform-specific code.

spark_df = spark_tree.to_dataframe(spark)

Python

Debugging experience

<1> RenameColsNode
└── <2> RespectFeatureStartTimeNode
    └── <3> AsofJoinFullAggNode(spine, partial_aggregates)
        ├── <4> [spine] AddRetrievalAnchorTimeNode
        │   └── <5> UserSpecifiedDataNode
        └── <6> [partial_aggregates] PartialAggNode
            └── <7> EntityFilterNode(feature_data, entities)
                ├── <8> [feature_data] FeatureViewPipelineNode(transactions)
                │   └── <9> [transactions] DataSourceScanNode
                └── <10> [entities] SelectDistinctNode
                    └── <11> AddRetrievalAnchorTimeNode
                        └── <12> UserSpecifiedDataNode

Python

The QueryTree framework enabled us to build a suite of debugging functionality on top of built query trees that enable interactive debugging and analysis of Tecton-defined queries, without digging into the underlying code!

Users (both internal and external) are able to call `tecton_df.explain()` to output the QueryTree representation of the resulting query from Tecton (with optional descriptions, and schemas). They can slice and dice the query into sub-trees using `tecton_df.subtree(node_id)` to do interactive, step-through debugging of their query.

Enhancements enabled by the QueryTree framework

The implementation of the QueryTree framework has enabled us to easily debug and improve the correctness to our offline training data generation. Fixing the issues once and have it translate to all supported compute engines:

Handling null-valued features

Previously, our offline query ignored null-valued features, introducing data skew from our online implementation that would return null-valued features. QueryTree’s design allowed us to easily adjust our query logic to return these null values, reducing this source of skew in our datasets.

Resolving point-in-time correctness bugs

The QueryTree framework has also helped us resolve a number of point-in-time correctness issues. These were the subject of one of our previous blog posts on online/offline skew.

Addressing edge cases

We’ve been able to better manage situations involving column renames, type casting, and instances where there are multiple features with the same name.

Performance improvements

Tecton’s QueryTree framework made it easy for us to test the impact of various performance changes. Here are some of the optimizations we evaluated and their results.

Window function implementation of ASOF join

To construct a historical training data set, we need to generate the values of features at the point-in-time of the training event. We can do this using an ASOF join, but popular compute platforms like Databricks, Dataproc, and Snowflake that are used with Tecton don’t have native support for ASOF joins.

The most simplistic method to implement an ASOF join is to join both tables with minimal time filtering, then use a window function to only return the feature values that came from the most recent source timestamp.

SELECT *,
       row_number() OVER (
PARTITION BY user_id, ORDER BY source_timestamp DESC
  ) AS rownum
FROM
  (SELECT training_events.*,
          historical_data.feature_value,
          historical_data.timestamp AS source_timestamp
   FROM training_events
   JOIN historical_data ON historical_data.timestamp < training_events.timestamp
   AND historical_data.user_id = training_events.user_id) sub
HAVING rownum = 1

This can work okay if your user data is updated infrequently, but the query can blow up—in the worst-case scenario, the join between training_events and historical_data could result in almost a full Cartesian product.

An alternative we tested is to use window function instead of join.

SELECT last(feature_value) 
OVER (PARTITION BY user_id ORDER BY timestamp DESC)
FROM (SELECT *, TRUE is_training_event FROM training_events) UNION (SELECT *, FALSE is_training_event FROM historical_data)
...

This is more efficient because the size of the intermediate data set is just the size of our two input tables.

With this change, small-scale queries ran 2-3x faster, and large-scale queries that previously OOMed were now able to succeed.

Join optimization

Feature values relevant to a prediction event may come from different sources. Tecton uses a FeatureService to represent the point-in-time join across all the different sources of feature data.

We noticed at around 13+ joins, performance would become severely degraded.

Our pseudo-SQL looked like the following:

SELECT *
FROM training_data AS td
ASOF_JOIN feature_data_1 AS fd1 ON td.common_column = fd1.common_column
ASOF_JOIN feature_data_2 AS fd2 ON td.common_column = fd2.common_column
ASOF_JOIN feature_data_3 AS fd3 ON td.common_column = fd3.common_column
-- ... repeat LEFT JOIN for feature_data_4 to feature_data_12 ...
ASOF_JOIN feature_data_13 AS fd13 ON td.common_column = fd13.common_column;

Especially because ASOF_JOIN is not a built-in join type, too many layers of nesting would result in an extremely complex query that would be optimized and executed inefficiently.

We improved things by pre-joining all the feature data individually, like the following pseudo-SQL:

SELECT *
FROM (
    SELECT *
    FROM training_data AS td
    ASOF_JOIN feature_data_1 AS fd1 ON td.common_column = fd1.common_column
) AS result1 INNER JOIN
( 
		SELECT *
    FROM training_data AS td
    ASOF_JOIN feature_data_2 AS fd2 ON td.common_column = fd2.common_column
) AS result2
...
INNER JOIN result13 ON result1.common_column = result2.common_column AND ...

This change improved runtimes by 10x and also reduced memory usage in several cases due to the query being easier for the optimizer to plan, resulting in shuffling a smaller amount of data to perform each join.

Registering temporary tables

Beyond a certain number of nested CTEs in Snowflake, the query compiler would OOM or time out. QueryTree made it easy to define certain breakpoints in query construction as temporary tables that would be later joined together. This allowed a query that previously would never finish compiling to compile and execute in just a few minutes.

Results / Summary

Since its release in Tecton 0.6, users have seen a drastic improvement in training dataset generation speeds. In one instance, queries that previously ran into S3 rate limits before timing out could now run in under 10 minutes.

Tecton’s engineers have also been able to build more capabilities on top of the new framework. Thanks to QueryTree, we were able to test the accuracy and performance in different environments for the launch of new aggregations like a HyperLogLog implementation of count distinct, as well as secondary key aggregations.

Acknowledgements

Thank you to Alvin Ho, Danny Chiao, Felix Wang, Jake Noble, Jiadong Zhang, Michael Eastham, Oleksii Moskalenko, Samantha Rydzewski, Sanika Natu, and Zhongyang Fu for all your contributions and extensions to QueryTree over the past year and a half!

If you’re building a feature platform, we hope you found this blog post helpful. Curious to see this in action? Check out this Tecton demo!

Request a Demo

Unfortunately, Tecton does not currently support these clouds. We’ll make sure to let you know when this changes!

However, we are currently looking to interview members of the machine learning community to learn more about current trends.

If you’d like to participate, please book a 30-min slot with us here and we’ll send you a $50 amazon gift card in appreciation for your time after the interview.

CTA link

or

CTA button

Request a free trial

Interested in trying Tecton? Leave us your information below and we’ll be in touch.​

Unfortunately, Tecton does not currently support these clouds. We’ll make sure to let you know when this changes!

However, we are currently looking to interview members of the machine learning community to learn more about current trends.

If you’d like to participate, please book a 30-min slot with us here and we’ll send you a $50 amazon gift card in appreciation for your time after the interview.

CTA link

or

CTA button

Contact Sales

Interested in trying Tecton? Leave us your information below and we’ll be in touch.​

Unfortunately, Tecton does not currently support these clouds. We’ll make sure to let you know when this changes!

However, we are currently looking to interview members of the machine learning community to learn more about current trends.

If you’d like to participate, please book a 30-min slot with us here and we’ll send you a $50 amazon gift card in appreciation for your time after the interview.

CTA link

or

CTA button