In this exercise, you will use the Metadata Explorer to explore and view a batch of files stored in a cloud storage account such as an Amazon S3 bucket. Then, using the Pipeline Modeler you will read this entire batch of files and consolidate them into a single csv file that will be used in a later exercise.
Objective: To better understand the data we will be working in the later sections of this exercise we will first preview the datasets using the Metadata Explorer.
- Select the Metadata Explorer from the drop-down menu in the top-left of the screen or by clicking on the Metadata Explorer tile in the Launchpad.
- Click on Browse Connections
- Select the
device_performance
directory. This directory contains all of the source .csv files that we will be appending in the next section of this exercise.
- Click on the View Factsheet button on any cellpm file
- Click on the Data Preview button. You should see that our source file contains four unlabelled columns: A timestamp, a device ID, and two columns describing the value of this device.
Video walkthrough: Write files and Message Filters
Objective: In the previous section you observed that the device_performance
directory contains multiple small source csv files that contain four columns. In this section we will read the contents of these sources file and re-write them into a single output csv file.
-
Return to the Modeler by using the dropdown menu in the upper left corner of your screen.
-
Create a new graph by first selecting the
Graphs
tab (shown vertically) and then the + button. -
When creating a new graph the modeler should automatically switch to the
Operators
tab (shown vertically), if not then select it. Use the search field to find theList Files
operator and drag-and-drop it into the new empty graph. -
The
List Files
operator takes a directory as an input or configurable parameter and outputs a list of all files and sub-directories in a given storage service such as Amazon S3 buckets. You can view more information about this operator by right clicking it and selectOpen Documentation
.
- Like many other operators the
List File
operator will by default read configuration parameters from its input nodedirRef
during runtime. However, in this exercise the parameters will be provided during design time:
- Configure the
List Files
operator by right clicking and selectingOpen Configuration
- Set the list operation to occur only
Once
- Select the pencil icon to define which storage account to read from.
-
Set the Configuration Type to
Connection Management
and set Connection ID to your cloud storage (e.gTechEd2020_S3
) then select Save -
Browse the connection by clicking on the monitor icon and select the path directory
/device_performance
and click OK -
Now would be a great time to save your progress. Click on the floppy disk icon located in the toolbar at the top of the screen. Enter the name
FileCollectionPerformance_TAxx
where xx is the ID assigned to you at the beginning of the workshop.
- Use the search field to find the
Read File
operator. Drag and drop it into your graph. This operator reads the contents of files from various storage services.
-
The file path to be read by the
Read File
operator can be provided either via its input node at runtime or configured at design time. In this exercise we will provide the path at runtime using theList File
operator. Connect the output noderef
to the input node of theRead Operator
-
Use the search field to find the
Write File
operator. Drag and drop it into your graph. TheWrite File
operator writes any content that is provided to its input node as a file to various object store services. Connect the output nodefile
to the input node of theWrite File
operator. -
Like the
Read File
operator theWrite File
operator can be configured at runtime or design time. In this exercise we will provide the target directory at design time.
-
Select Path Mode to
Static (from configuration)
-
Click on the pencil button to select the
DI_DATA_LAKE
connection. -
Set Mode to
Append
-
Since the file we want to write does not yet exist we cannot browse for the path. Instead manually enter the following path:
/shared/<user_id>/performance.csv
. This directory will be automatically created if it does not already exist.If you are doing this tutorial as part of a workshop then your user ID will be in the format of taXX, where XX is your assigned number.
-
[Optional] You can rename the label of the
Write File
operator toAppend File
. This can be helpful for other users to understand what the pipeline is designed to do at a glance.
- When the
Write File
operator has successfully written the last batch of data it will mark its last message output with an attributelastBatch
. We can listen for this attribute and create a trigger to safely terminate the pipeline.
- Use the search field to find the
Python3
operator, drag and drop it into your graph. - Right click it and select open configuration: Set the label to read
Message Filter
- Right click the operator again and select
Add Port
. Do this twice to add the following two ports:
Name | Type | Data Type | Data Type Name |
---|---|---|---|
input | input port | Basic | message.file |
output | output port | Basic | any.* |
- Connect the
file
output node of theWrite File
operator to thepython3
operator's input node - Right click the python3 operator and select
Open Script
. This will open a code editor as a new tab inside the Modeler. - Delete any pre-existing code and copy/paste below text snippet. Afterwards you may close the code editor tab.
def on_input(data):
# If the attribute lastBatch is set then send an output, else do nothing
if data.attributes.get("message.lastBatch",False):
api.send("output", True)
# When an input is received, call the function on_input()
api.set_port_callback("input", on_input)
- Use the search field to find the
Graph Terminator
operator. Drag and drop it into your graph. This operator will terminate the pipeline execution when receiving any input to its input node. Connect the output node yourpython3
operator to the input node of theGraph Terminator
.
Since the custom Message Filter
operator is filtering out all messages that do not contain the lastBatch
attribute the graph will not be terminated until we are sure that the last batch of data was written by Write File
operator.
- Save the graph by clicking on the floppy disk icon located in the toolbar at the top of the screen.
Video walkthrough: Running and copying graphs
Now that you've completed designing your graph it is time to execute it and inspect the output data.
- To execute the pipeline click on the play button at the top of the screen. The pipeline will appear under the Status tab. It will first appear to be in status
Pending
and then after a few secondsRunning
.
- If the pipeline did not run into any errors then the execution should reach the
Graph Terminator
operator and the status will eventually switch to statusCompleted
.
Tip: If the pipeline terminates with an error you can click on the title of the failed graph to view a detailed error message.
- After the pipeline reaches status
Completed
return to the Metdata Explorer using the drop-down menu in the top left corner of the screen.
-
Click on Browse Connections and navigate to the
/shared/TAxx/
directory(Your folder/directory can be different) in the cloud storage connection (e.gDI_DATA_LAKE
). -
Click on on the View Factsheet button on the
performance.csv
file
- Click on the Data Preview button. Observe that all of the individual files from the source directory have been appended into a single consolidated csv file (The file size should be approx 83kb)
We need the same data pipeline for consolidating the configuration files into on configuration.csv
-file.
- 'Save as..' the previous pipeline and name it to
FileCollectionConfiguration_TAxx
. Click on the floppy disk icon located in the toolbar at the top of the screen. - Change the configuration-parameters:
- "List-Files" operator, Path: "/device_configuration"
- "Write File" operator, Path: "/shared/TAxx/configuration.csv"
- Run pipeline to create the configuration.csv-File
In this exercise you have consolidated a batch of csv files into a single csv file using the List, Read, and Write file operators. By using the Message Filter operator you were also able to gracefully terminate the pipeline when all files have been processed and written to cloud storage. Finally, you were able to verify the correctness of your output using the Metadata Explorer.
Continue to Exercise 2: Joining and writing workflow data to SAP HANA where you will learn to use the workflow operators to join and aggregate two csv files and store the result in a HANA database.