Creation of pipeline – Pipelines using TensorFlow Extended
Step 6: Creation of pipeline
Till now we have understood and executed certain components of the tfx pipeline (not all the components are needed for the pipeline construction). Let’s start constructing pipeline using example_gen, statistics_gen, schema_gen, transform, trainer and pusher components (Pusher component is used to push the trained model to a location, in this example trained model we will be pushed to the cloud storage):
• Define a function (_create_pipeline) to consisting of all the steps/components that needs to be included in the pipeline and returns the tfx pipeline. Run the below mentioned code in a new cell to define the pipeline function:
def _create_pipeline(pl_name, pipeline_root_folder, data_root,
module_file_transform, module_file_train, model_dir_save,
) -> tfx.dsl.Pipeline:
example_gen_csv = tfx.components.CsvExampleGen(input_base=data_root)
gen_statistics = tfx.components.StatisticsGen(examples=example_gen_csv.outputs[‘examples’])
gen_schema = tfx.components.SchemaGen(statistics=gen_statistics.outputs[‘statistics’])
transform_data = tfx.components.Transform(examples=example_gen_csv.outputs[‘examples’],schema=gen_schema.outputs[‘schema’],module_file=os.path.join(MODULE_FOLDER, TRANSFORM_MODULE_PATH))
model_trainer = tfx.components.Trainer(
module_file=module_file_train,
examples=example_gen_csv.outputs[‘examples’],
transform_graph=transform_data.outputs[‘transform_graph’],
schema=gen_schema.outputs[‘schema’],
train_args=tfx.proto.TrainArgs(num_steps=200),
eval_args=tfx.proto.EvalArgs(num_steps=10))
pusher = tfx.components.Pusher(
model=model_trainer.outputs[‘model’],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=model_dir_save)))
return tfx.dsl.Pipeline(
pipeline_name=pl_name,
pipeline_root=pipeline_root_folder,
components=[example_gen_csv,gen_statistics,gen_schema,transform_data,model_trainer,pusher])
Step 8: Defining a runner
As mentioned in the theory section, TFX is portable across environments and orchestration frameworks. TFX supports Airflow, Beam, and Kubeflow. It also provides flexibility for the developers to add their own orchestrators. Orchestrators must inherit TfxRunner. TFX orchestrators schedule pipeline components based on DAG dependencies using the logical pipeline object, which comprises pipeline args, components, and DAG. In our example we will be using Vertex Pipelines together with the Kubeflow V2 dag runner. In the code, we create runner using Kubeflow V2 dag and run it by passing all the pipeline parameters.
Run the following mentioned codes to define the runner:
trainer_file=”trainer.py”
file_transform=os.path.join(MODULE_FOLDER, TRANSFORM_MODULE_PATH)
file_train=os.path.join(MODULE_FOLDER, trainer_file)
pl_def_file = NAME_PIPELINE + ‘.json’
pl_runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=pl_def_file)
_ = pl_runner.run(
_create_pipeline(
pl_name=NAME_PIPELINE,
pipeline_root_folder=ROOT_PIPELINE,
data_root=INPUT_DATA_DIR,
module_file_transform=file_transform,
module_file_train=file_train,
model_dir_save=OUTPUT_MODEL_DIR))
Step 9: Pipeline execution
As the last step we need to execute the pipeline. Run the following mentioned codes in a new cell to start the pipeline execution:
import google.cloud
google.cloud.aiplatform.init(project=PROJECT_ID, location=”us-central1”)
job=google.cloud.aiplatform.pipeline_jobs.PipelineJob(template_path=pl_def_file,
display_name=NAME_PIPELINE)
job.run(sync=False)
Once the pipeline starts, users will be provided the link to check the status as shown in Figure 8.13:

Figure 8.13: Pipeline link
- Click on the link to check the status of the pipeline as shown in the following screenshot:

Figure 8.14: Pipeline executed successfully
Pipeline will take a few mins to complete the training and push the trained model to the cloud storage. If the pipeline is constructed using tfx or Kubeflow, the user interface of the pipeline in GCP will remain same as shown in Figure 8.14. Spend some time to understand each of the components and its artifacts in detail as described in the previous chapter.