Data for feature store exercise – Vertex AI Feature Store

For this exercise, data is downloaded from Kaggle (link is provided below) and the dataset is listed under CC0 public domain licenses. Data contains information regarding employee promotion data. Since we are not building any model from the data, we are considering only 5 attributes (employee ID, education, gender, no. of trainings, and age), and only 50 samples are considered.

https://www.kaggle.com/datasets/arashnic/hr-ana

feature_store_input bucket is created under us-centra1 (single region) and the CSV file is uploaded from the bucket as shown in Figure 9.2:

Figure 9.2: Data stored in cloud storage

Working on feature store using GUI

Before we ingest data to the feature store or feature, we need to create a feature store, entity, and features. Resources of the feature store can be created using GUI or Python code. Follow the below-mentioned steps to create the feature store resources using GUI:

Step 1: Opening of feature store.

The landing page of the Vertex AI is shown in Figure 9.3:

Figure 9.3: landing page of Vertex AI

  1. Click Feature Store to open.

Step 2: Landing page of feature store

Feature stores are region specific; the landing page provides information on the feature store under a specific region as shown in Figure 9.4:

Figure 9.4: Landing page of feature store

  1. Click CREATE FEATURESTORE.

The region needs to be selected in this step (region cannot be changed post this step).

Step 3: Creation of feature store

Follow the steps mentioned in Figure 9.5 to create a feature store:

Figure 9.5: Creation of feature store

  1. Provide a name for the feature store.
  2. Enable Online Serving if the features need to be made available for low-latency online serving.
  3. Since the volume of data is small, select Fixed Nodes and provide the value of 1.
  4. Click on CREATE.

Step 4: Feature store created successfully

Once the feature store is created it will be displayed on the landing page as shown in Figure 9.6:

Figure 9.6: Feature store created and listed on the landing page

  1. Newly created feature store.
  2. Click on Create Entity Type for its creation.

Step 5: Creation of entity type

Follow the steps shown in Figure 9.7 to create an entity type:

Figure 9.7: Creation of entity type

  1. Select the Region under which the feature store is created.
  2. All the feature stores under the region will be listed, select the newly created Featurestore.
  3. Provide Entity type name.
  4. Write the Description for the entity type.
  5. Feature monitoring (enable if the features need to be monitored).
  6. It enables the monitoring of feature stores and features. Feature store monitors CPU utilization, storage, and latency. Feature monitoring helps in monitoring changes in feature value distribution.
  7. Click CREATE.

Advantages of feature store – Vertex AI Feature Store

These are the advantages of feature store:

  • Extend features company-wide: Feature stores let you easily share features for training or serving. Different projects and use cases do not need feature re-engineering. Manage and deliver features from a central repository to preserve consistency throughout your business and prevent redundant efforts, especially for high-value features.

Vertex AI Feature Store lets people find and reuse features using search and filtering. View feature metadata to assess quality and usage. For instance, you may check feature coverage and feature value distribution.

  • Serving at scale: Online forecasts require low-latency feature serving, which Vertex AI Feature Store manages. Vertex AI Feature Store automatically builds and expands low-latency data serving infrastructure. You create features but outsource providing them. Data scientists may create new features without worrying about deployment using this management.
  • Reduce training-serving bias: Training-serving skew happens when your production feature data distribution differs from the one used to train your model. This skew causes disparities between a model’s training and production performance. Vertex AI Feature Store can handle training-serving bias with these examples:
    • Vertex AI Feature Store guarantees that feature values are ingested once and reused for training and serving. Without a feature store, training and serving features may use distinct code paths. Training and serving feature values may differ.
    • Vertex AI Feature Store offers previous data lookups for training. By collecting pre-prediction feature values, these lookups reduce data leakage.
  • Identify drift: Vertex AI Feature Store detects drift in feature data distribution. Vertex AI Feature Store monitors feature value dispersion. Retrain models using impacted features as feature drift rises.
  • Retention: The Vertex AI Feature Store preserves feature values for the allotted amount of time. This cap is determined by the feature values’ timestamp, not the date and time the values were imported. Values with timestamps that go beyond the limit are scheduled for deletion by Vertex AI Feature Store.

Disadvantages of feature store

A feature store has overhead, which can make data science more complicated, especially for smaller projects. A feature store may complicate matters if a business has numerous little data sets. Feature stores are ineffective when the data is so diverse that no standard modeling approach will assist. Reusing features created on separate data sources and metadata is tough. Additionally, a feature store might not be the ideal choice when the features are not time-dependent or when features are needed only for batch predictions.

Knowing Vertex AI feature store – Vertex AI Feature Store

Introduction

After learning about the pipelines of the platform, we will move to the feature store of GCP. In this chapter, we will start with an understanding of the feature store, and the advantages of features followed by a hands-on feature store.

Structure

In this chapter, we will cover the following topics:

  • Knowing Vertex AI feature store
  • Hierarchy of feature store
  • Advantages of feature store
  • Disadvantages of feature store
  • Working on feature store using GUI
  • Working on feature store using python
  • Deleting resources
  • Best practices for Feature store

Objectives

By the end of this chapter, users will have a good idea about the feature store, when to use it, and how to employ it with the web console of GCP and Python.

Knowing Vertex AI feature store

Vertex AI Feature Store is a centralized repository for managing and delivering machine learning features. To speed up the process of creating and delivering high-quality ML applications, many organizations are turning to centralized feature stores to facilitate the sharing, discovery, and re-use of ML features at scale.

The storage and processing power, as well as other components of the backend infrastructure, are handled by Vertex AI Feature Store, making it a fully managed solution. As a result of this strategy, data scientists may ignore the difficulties associated with delivering features into production and instead concentrate on the feature computation logic.

The feature store in Vertex AI is an integral aspect of the overall system. Use Vertex AI Feature Store on its own or include it in your existing Vertex AI workflows. For instance, the Vertex AI Feature Store may be queried for information to be used in the training of custom or AutoML models.

Hierarchy of feature store

The collection of entities for a certain entity time is stored in a feature store. Fields like entity ID, timestamp, and a series of attributes like feature 1, feature 2, and so on, are all defined for each entity type. The hierarchy of the feature store is described in Figure 9.1:

Figure 9.1: Hierarchy of feature store

  • Feature store: A top-level container for entity types, features, and their values.
  • Entity type: A collection of semantically related features (real or virtual).
  • Entity: An instance of the entity type.
  • Feature: A measurable property or attribute of an entity type.
  • Feature values: These contain values of the features at a specific point in time.

Pipeline artifacts stored in cloud storage – Pipelines using TensorFlow Extended

Step 10: Pipeline artifacts stored in cloud storage

Pipeline artifacts are stored in the cloud storage as shown in Figure 8.15:

  • Module folder contains Python files which we had pushed while creating transform and trainer components.
  • Output_model contains trained classification model.
  • Root folder contains artifacts of each of the component of the pipeline:

Figure 8.15: Pipeline artifacts stored in the cloud storage

Deletion of resources

We have utilized workbench, cloud storage to store the data and the artifacts of the pipeline. For deletion of resources, ensure to delete the workbench, clear the data stored in the cloud storage.

Conclusion

We learnt about the TFX, a few of its components and constructed pipeline using some of the standard components. Also, we understood how to use Kubeflow for the orchestration of TFX pipeline on vertex AI.

In the next chapter, we will start understanding and working on feature store of the Vertex AI.

Questions

  1. Which artifacts of the transform component is used in the training component of the pipeline?
  2. What are the different orchestration options TFX supports?
  3. Try using evaluator component between trainer and pusher component and re-construct the pipeline. (Use evaluator component to evaluate the model performance and push it only if the performance is good).

Creation of pipeline – Pipelines using TensorFlow Extended

Step 6: Creation of pipeline
Till now we have understood and executed certain components of the tfx pipeline (not all the components are needed for the pipeline construction). Let’s start constructing pipeline using example_gen, statistics_gen, schema_gen, transform, trainer and pusher components (Pusher component is used to push the trained model to a location, in this example trained model we will be pushed to the cloud storage):
• Define a function (_create_pipeline) to consisting of all the steps/components that needs to be included in the pipeline and returns the tfx pipeline. Run the below mentioned code in a new cell to define the pipeline function:
def _create_pipeline(pl_name, pipeline_root_folder, data_root,
module_file_transform, module_file_train, model_dir_save,
) -> tfx.dsl.Pipeline:
example_gen_csv = tfx.components.CsvExampleGen(input_base=data_root)
gen_statistics = tfx.components.StatisticsGen(examples=example_gen_csv.outputs[‘examples’])
gen_schema = tfx.components.SchemaGen(statistics=gen_statistics.outputs[‘statistics’])
transform_data = tfx.components.Transform(examples=example_gen_csv.outputs[‘examples’],schema=gen_schema.outputs[‘schema’],module_file=os.path.join(MODULE_FOLDER, TRANSFORM_MODULE_PATH))
model_trainer = tfx.components.Trainer(
module_file=module_file_train,
examples=example_gen_csv.outputs[‘examples’],
transform_graph=transform_data.outputs[‘transform_graph’],
schema=gen_schema.outputs[‘schema’],
train_args=tfx.proto.TrainArgs(num_steps=200),
eval_args=tfx.proto.EvalArgs(num_steps=10))
pusher = tfx.components.Pusher(
model=model_trainer.outputs[‘model’],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=model_dir_save)))
return tfx.dsl.Pipeline(
pipeline_name=pl_name,
pipeline_root=pipeline_root_folder,
components=[example_gen_csv,gen_statistics,gen_schema,transform_data,model_trainer,pusher])

Step 8: Defining a runner
As mentioned in the theory section, TFX is portable across environments and orchestration frameworks. TFX supports Airflow, Beam, and Kubeflow. It also provides flexibility for the developers to add their own orchestrators. Orchestrators must inherit TfxRunner. TFX orchestrators schedule pipeline components based on DAG dependencies using the logical pipeline object, which comprises pipeline args, components, and DAG. In our example we will be using Vertex Pipelines together with the Kubeflow V2 dag runner. In the code, we create runner using Kubeflow V2 dag and run it by passing all the pipeline parameters.
Run the following mentioned codes to define the runner:
trainer_file=”trainer.py”
file_transform=os.path.join(MODULE_FOLDER, TRANSFORM_MODULE_PATH)
file_train=os.path.join(MODULE_FOLDER, trainer_file)
pl_def_file = NAME_PIPELINE + ‘.json’
pl_runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=pl_def_file)
_ = pl_runner.run(
_create_pipeline(
pl_name=NAME_PIPELINE,
pipeline_root_folder=ROOT_PIPELINE,
data_root=INPUT_DATA_DIR,
module_file_transform=file_transform,
module_file_train=file_train,
model_dir_save=OUTPUT_MODEL_DIR))

Step 9: Pipeline execution
As the last step we need to execute the pipeline. Run the following mentioned codes in a new cell to start the pipeline execution:
import google.cloud
google.cloud.aiplatform.init(project=PROJECT_ID, location=”us-central1”)
job=google.cloud.aiplatform.pipeline_jobs.PipelineJob(template_path=pl_def_file,
display_name=NAME_PIPELINE)
job.run(sync=False)

Once the pipeline starts, users will be provided the link to check the status as shown in Figure 8.13:

Figure 8.13: Pipeline link

  1. Click on the link to check the status of the pipeline as shown in the following screenshot:

Figure 8.14: Pipeline executed successfully
Pipeline will take a few mins to complete the training and push the trained model to the cloud storage. If the pipeline is constructed using tfx or Kubeflow, the user interface of the pipeline in GCP will remain same as shown in Figure 8.14. Spend some time to understand each of the components and its artifacts in detail as described in the previous chapter.

Importing packages – Pipelines using TensorFlow Extended-2

Transform:
We will create file_transform.py which will contain information about the data labels and feature engineering steps:
• Run the following mentioned code in new cells to declare variable containing file name:
TRANSFORM_MODULE_PATH = ‘file_transform.py’

• Run the following mentioned codes in a new cell to create file_transform.py file. Preprocessing (preprocessing_fn) function is where the actual alteration of the dataset occurs. It receives and returns a tensor dictionary, where tensor refers to a Tensor or SparseTensor. In our example, we are not applying any transformations, code is just mapping to the output dictionary. (%%writefile command will create the .py file with the following code in it.):
%%writefile {TRANSFORM_MODULE_PATH}
import tensorflow as tf
import tensorflow_transform as tft
NAMES = [‘AF3’,’F7’,’F3’,’FC5’,’T7’,’P7’,’O1’,’O2’,’P8’,’T8’,’FC6’,’F4’,’F8’,’AF4’]
LABEL = ‘eyeDetection’
def preprocessing_fn(raw_inputs):
processed_data = dict()
for items in NAMES:
processed_data[items]=raw_inputs[items]
processed_data[LABEL] = raw_inputs[LABEL]
return processed_data

  1. Files needs to copy into GCS bucket, run the following line of code to copy it:
    !gsutil cp file_transform.py {MODULE_FOLDER}/
  2. Run the following mentioned lines of codes in a new cell to configure transform component. Transform component is taking inputs from the example_gen and schema_gen:
    transform_data = tfx.components.Transform(
    examples=example_gen_csv.outputs[‘examples’],schema=gen_schema.outputs[‘schema’],
    module_file=os.path.join(MODULE_FOLDER, TRANSFORM_MODULE_PATH))
    context_in.run(transform_data, enable_cache=False)

Output of the transform component is as shown in Figure 8.10. Transform graph is one of the artifacts generated by the transform component and it will be used for the trainer module.

Figure 8.10: Output of transform component
Run the following mentioned codes to check few records of the transformed data (This code will not be needed during the pipeline construction):
train_sam = os.path.join(transform_data.outputs[‘transformed_examples’].get()[0].uri,’Split-train’)
filenames_tfr = [os.path.join(train_sam, name) for name in os.listdir(train_sam)]
dataset = tf.data.TFRecordDataset(filenames_tfr, compression_type=’GZIP’)

for record in dataset.take(1):
sample = tf.train.Example()
sample.ParseFromString(record.numpy())
print(sample)

Output of 1 record will be as shown in Figure 8.11:

Figure 8.11: Transformed data example
Trainer:
The Trainer component is in charge of preparing the input data and training the model. It requires the ExampleGen examples, the transform, and the training code. TensorFlow Estimators, Keras models, or custom training loops can be used in the training code. When compared to other components, trainer requires more modifications in the code:
• Run the following mentioned code in the new cell, it will generate trainer.py file (Complete code is available in the repository). Trainer.py (training code) file will be the input for the trainer module. Trainer component generates two artifacts model (trained model itself) and modelrun which can be used for storing logs, this can be seen in Figure 8.12. The Trainer.py file contains four functions; high level description of those functions is:
o run_fn will be entry point to execute the training process
o _input_fn generates features and labels for training
o _get_serve_tf_examples_fn returns a function that parses a serialized tf.example
o _make_keras_model creates and returns the model for classification
%%writefile trainer.py
from typing import List
from absl import logging
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

COL_NAMES=[‘AF3’,’F7’,’F3’,’FC5’,’T7’,’P7’,’O1’,’O2’,’P8’,’T8’,’FC6’,’F4’,’F8’,’AF4’]
LABEL=”eyeDetection”
BATCH_SIZE_TRAIN = 40
BATCH_SIZE_EVAL = 20
def _input_fn(files,accessor,transform_output,size) -> tf.data.Dataset:
Creates datasets and apply transformations on them and return.
Refer the repository for code block of the function
return dataset.map(apply_transform_fn).repeat()
def _get_serve_tf_examples_fn(model, transform_output):
To parse the serialized examples and return.
Refer the repository for code block of the function
return serve_tf_examples_fn
def _make_keras_model() -> tf.keras.Model:
Create model with layers, loss functions to be used and return.
Refer the repository for code block of the function
return model_classification

def run_fn(fn_args: tfx.components.FnArgs):
tf_transform = tft.TFTransformOutput(fn_args.transform_output)
train_samples = _input_fn(_args.train_files, _args.data_accessor, tf_transform, =BATCH_SIZE_TRAIN)
eval_samples = _input_fn(_args.eval_files, _args.data_accessor, tf_transform, =BATCH_SIZE_EVAL)
model_classification = _make_keras_model()
model_classification.fit(
train_samples,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_samples,
validation_steps=fn_args.eval_steps)
sign = {
“serving_default”: _get_serve_tf_examples_fn(model_classification, tf_transform),
}
model_classification.save(fn_args.serving_model_dir, save_format=’tf’,signatures=sign)

  1. Run the following code to copy the trainer.py to GCS storage:
    !gsutil cp trainer.py {MODULE_FOLDER}/
  2. Run the following code to initiate the trainer component. trainer_file=”trainer.py”:
    trainer_file_path=os.path.join(MODULE_FOLDER, trainer_file)
    model_trainer = tfx.components.Trainer(
    examples=example_gen_csv.outputs[“examples”],
    transform_graph=transform_data.outputs[“transform_graph”],
    train_args=tfx.proto.TrainArgs(num_steps=200),
    eval_args=tfx.proto.EvalArgs(num_steps=10),
    module_file=trainer_file_path,
    )
    context_in.run(model_trainer, enable_cache=False)

The output of the trainer component is as shown in the following figure:

Figure 8.12: Output of the trainer component

Importing packages – Pipelines using TensorFlow Extended-1

Step 6: Importing packages
Run the following mentioned line of codes to import required packages:
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tfx.components.base import executor_spec
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tensorflow_metadata.proto.v0 import schema_pb2
import os
from typing import List

Step 7: Understanding a few TFX components from code
Even before jumping into pipeline creation and execution, let us try to understand how a few of the TFX components can be used individually and analyze the output of those components. Let us start with the data Examplegen component.
Examplegen: Run the following code in a new cell:
context_in = InteractiveContext()
example_gen_csv = tfx.components.CsvExampleGen(input_base=INPUT_DATA_DIR)
context_in.run(example_gen_csv)

Examplegen component can read the data from various sources and data types such as CSV files, TRF records, and BigQuery. An interactive widget displaying the results of ExampleGen will appear on the notebook once run is complete as shown in Figure 8.6. ExampleGen typically generates two types of artifacts, which are known as training and evaluation examples. ExampleGen will divide the data into two thirds for the training set and one third for the evaluation set by default. Location where these artifacts are stored can also be viewed as shown:

Figure 8.6: Output for example_gen component
StatisticsGen: Run the following code in a new cell:
gen_statistics = tfx.components.StatisticsGen(examples=example_gen_csv.outputs[‘examples’])
context_in.run(gen_statistics)
context_in.show(gen_statistics.outputs[‘statistics’])

Statistics over your dataset are computed using the StatisticsGen component. These statistics provide a quick overview of your data, including details such as shape, features, and value distribution. You will use the output from the ExampleGen as input to compute statistics about the data. An interactive widget displaying the statistics of train and evaluation dataset separately appears once the run is complete as shown in Figure 8.7:

Figure 8.7: Output for statsics_gen component
SchemaGen: Run the following code in a new cell:
gen_schema = tfx.components.SchemaGen(statistics=gen_statistics.outputs[‘statistics’])
context_in.run(gen_schema)
context_in.show(gen_schema.outputs[‘schema’])

From the statistics, the SchemaGen component will generate a schema for your data. A Schema is simply a data definition. It defines the data features’ types, expected properties, bounds, and so on. Output of the SchemaGen is as shown in Figure 8.8:

Figure 8.8: Output for schema_gen component
ExampleValidator: Run the following code in a new cell:
stats_validate = tfx.components.ExampleValidator(statistics=gen_statistics.outputs[‘statistics’],schema=gen_schema.outputs[‘schema’])
context_in.run(stats_validate)
context_in.show(stats_validate.outputs[‘anomalies’])

Based on the defined schema, this component validates your data and detects anomalies. When in production, this can be used to validate any new data that enters your pipeline. It can detect drift, changes, and skew in new data, unexpected types, new column which was not in the schema. Output of ExampleValidator is as shown in Figure 8.9:

Figure 8.9: Output of example validator component

Data for pipeline building – Pipelines using TensorFlow Extended

For this exercise data is downloaded from Kaggle (link is provided below) and the dataset is listed under CC0:Public domain licenses. The data contains various measurements from EEG and the state of the eye is captured via camera. 1 indicates closed eye and 0 indicates open eye.
https://www.kaggle.com/datasets/robikscube/eye-state-classification-eeg-dataset
tfx_pipeline_input_data bucket is created under us-centra1 (single region) and csv file is uploaded from to the bucket as shown in Figure 8.2:

Figure 8.2: Data in GCS for pipeline construction
Pipeline code walk through
Workbench needs to be created for run the pipeline code. Follow the steps mentioned in the chapter Vertex AI workbench & custom model training., for creation of the workbench (choose TensorFlow enterprise | TensorFlow Enterprise 2.9 | Without GPUs, refer Figure 8.3 for reference. All other steps will be the same as mentioned in the Vertex AI workbench and custom model training)

Figure 8.3: Workbench creation using TensorFlow enterprise
Step 1: Create Python notebook file
Once the workbench is created, open Jupyterlab and follow the steps mentioned in Figure 8.4 to create Python notebook file:

Figure 8.4: New launcher window

  1. Click New launcher.
  2. Double click on the Python3 Notebook file to create one.

Step 2 onwards, run the following codes in separate cells.
Step 2: Package installation
Run the following commands to install the Kubeflow, google cloud pipeline and google cloud aiplatform package. (It will take few minutes to install the packages):
USER_FLAG = “–user”
!pip install {USER_FLAG} –upgrade “tfx[kfp]<2”
!pip install {USER_FLAG} apache-beam[interactive]
!pip install python-snappy

Step 3: Kernel restart
Type the following commands in the next cell, to restart the kernel. (Users can restart kernel from the GUI as well):
import os
import IPython
if not os.getenv(“”):
IPython.Application.instance().kernel.do_shutdown(True)

Step 4: Verify packages are installed
Run the following mentioned lines of code to check if the packages are installed (if the packages are installed properly try upgrading the pip package before installing tfx and kfp packages):
import snappy
import warnings
warnings.filterwarnings(‘ignore’)
import tensorflow as tf
from tfx import v1 as tfx
import kfp
print(‘TensorFlow version:’, tf.version)
print(‘TFX version: ‘,tfx.version)
print(‘KFP version: ‘,kfp.version)

If the packages are installed properly, you should see the versions of TensorFlow, TensorFlow extended and Kubeflow package versions as shown in Figure 8.5:

Figure 8.5: Packages installed successfully
Step 5: Setting up the project and other variables
Run the following mentioned line of codes in a new cell to set the project to the current one, also define variables to store the path for multiple purpose:
PROJECT_ID=”vertex-ai-gcp-1”
!gcloud config set project {PROJECT_ID}
BUCKET_NAME=”tfx_pipeline_demo”
NAME_PIPELINE = “tfx-pipeline”
ROOT_PIPELINE = f’gs://{BUCKET_NAME}/root/{NAME_PIPELINE}’
MODULE_FOLDER = f’gs://{BUCKET_NAME}/module/{NAME_PIPELINE}’
OUTPUT_MODEL_DIR=f’gs://{BUCKET_NAME}/output_model/{NAME_PIPELINE}’
INPUT_DATA_DIR = ‘gs://tfx_pipeline_input_data’

ROOT_PIPELINE is used to store the artifacts of the pipeline, MODULE_FOLDER is used to store the .py file for the trainer component, OUTPUT_MODEL_DIR is used to store the trained model and INPUT_DATA_DIR is the GCS location where input data is located.