In this exercise, we will validate the data saved to the HANA database in the previous exercise 2. In case of data failing the quality test, a service ticket is created with the failed data.
Below is a list of all the operators that you are going to need for this exercise:
Name | Category |
---|---|
HANA Table Consumer* | Connectivity (via Flowagent) |
Flowagent CSV Producer | Connectivity (via Flowagent) |
Python3 Operator | Processing |
SAP HANA Client* | Connectivity |
* There are several other operators that you could use to read and write to HANA, however this would mean that you also might need other data type converters.
If you like to try it first by your own then here is the short summary of the tasks needed to be done:
- Read the table
CELLSTATUS
created in exercise 1 - Add a
Python3
operator and program it to create a new record for each failed record and send it to the outport as csv-string. The columns structure is as follows: ("TIMESTAMP","STATUS","COMMENT","DATE","CELLID","NOM_KEY1","NOM_KEY2","KEY1","KEY2","ERROR_ACTION","ERROR_COLUMNS","ROW_ID") - Save the "service ticket" string to the HANA table
QMTICKET
Hint: Depending on the HANA DB operator you might have the header passed as well. This you have to consider both with the "Validation"-Rule operator and with the script of the "Python3 Operator". For the latter we added a commented row that would rename the columns if the data provided with a header for complying with the following process steps.
-
Add the following operators to the pipeline canvas and connect them in this order:
Workflow Trigger
HANA Table Consumer
(Use outportoutConfig
)Flowagent CSV Producer
(Use outportoutContent
)Wiretap
-
Configure the
HANA Table Consumer
operator- HANA Connection:
HANAC_WS
- Source Table:
TECHED"."CELLSTATUS
- Partition Type:
default (None)
- Additional session parameters:
default('')
- Fetch size:
default (1000)
- HANA Connection:
-
Save the pipeline with the name
<taxx>.ValidateDataQuality
where xx is your assigned user ID. -
Before we continue with the designing the pipeline we want to verify that data is being read correctly from HANA and rewritten into a CSV format. Start the pipeline and wait for it to in status
Running
. Click on the Open UI icon on theWiretap
operator. (This icon only appears when the pipeline is running) A new browser tab is opened and if everything is configured correctly CSV data should be displayed.
In this part we like to analyse the data with a python script.
- Add the
Python3
operator to the canvas and connect it to either to the outport of the Wiretap or the outport of the Flowagent CSV Producer (outContent) - Add inport and outport to the Python3 Operator operator
- Open the script tab by clicking on Script icon of the
Python3
operator. This will open a new tab inside the Pipeline Modeler. - The operator comes with some sample code. Mark all of the text and delete it. Replace it with the script below. The basic idea of this script is to store the csv records as an pandas DataFrame. Then we add some additional columns and values to this DataFrame, filter the records that deviates from the nominal values by specified amount and then convert it back into a csv-format and send it to the outport
output
. You could add in the row withdf[comment] = '<User>'
your user-name to better find your QM tickets.
import pandas as pd
import io
from datetime import datetime
def on_input(data):
# Read data to DataFrame
data_stream = io.StringIO(data)
df = pd.read_csv(data_stream, names = ["DATE","CELLID","KEY1","KEY2","NOM_KEY1","NOM_KEY2"] )
# Add ticket information
df["TIMESTAMP"] = datetime.now()
df["STATUS"] = 'open'
df["COMMENT"] = 'TAxx'
df["ROW_ID"] = df.index
df["ERROR_COLUMNS"] = ''
df["ERROR_ACTION"] = 'D'
max_diff_key1 = 30
max_diff_key2 = 60
df = df.loc[(df['KEY1'] < df['NOM_KEY1'] - max_diff_key1) | (df['KEY1'] > df['NOM_KEY1'] + max_diff_key1) | \
(df['KEY2'] < df['NOM_KEY2'] - max_diff_key2) | (df['KEY2'] > df['NOM_KEY2'] + max_diff_key2)]
# resort DataFrame in case of order is important
df = df[["TIMESTAMP","STATUS","COMMENT","DATE","CELLID","KEY1","KEY2","NOM_KEY1","NOM_KEY2","ERROR_ACTION","ERROR_COLUMNS","ROW_ID"]]
api.send("outport", api.Message(attributes={'data':'FAILED'},body=df.to_csv(index = False,date_format="%Y-%m-%d %H:%M:%S",header=False)))
api.set_port_callback("input", on_input)
- Add a new
Wiretap
operator to the canvas and connect it to the "Python3 Operator" outport. - Save the pipeline to '<User>.QualityValidation'
- Run the pipeline and check if the output is what you expected.
- Add the
SAP HANA Client
operator to the canvas and connect thedata
-inport either to the outport of the Wiretap or directly to outport of thePython3 Operator
operator - Configure the SAP HANA Client
- Connection:
Configuration Manager/HANAC_WS
- Table name:
"TECHED"."QMTICKET"
. Don't forget to include the double quotes! - All other parameters can be left as their default values
- Connection:
- Finally, add Graph Terminator operator and connect it to the
HANA client
operator - Save and run the pipeline. It should eventually switch to status
Completed
. - Inspect the table
"TECHED"."QMTICKET"
via the Metadata Explorer to verify if your records were written as expected.
You have learnt an alternative way to read and write to a HANA database as with the Structured Data
operators of the previous exercise. We used the "Validation Rule" operator to do simple data quality checks and finally how to use a python custom operator to leverage all the options provided by an advanced script.
Solution Example: Exercise 2 - Example
Continue to Exercise 3: Create RestAPI