diff --git a/.github/workflows/integrationTest.yml b/.github/workflows/integrationTest.yml index 3b4544ea5b..0bf55752fc 100644 --- a/.github/workflows/integrationTest.yml +++ b/.github/workflows/integrationTest.yml @@ -21,6 +21,7 @@ on: branches: - master + workflow_dispatch: concurrency: @@ -135,12 +136,12 @@ jobs: - name: Upload to s3 if: steps.cached_binaries.outputs.cache-hit != 'true' run: aws s3 cp build/bin s3://${S3_INTEGRATION_BUCKET}/integration-test/binary/${{ github.sha }} --recursive - GenerateTestMatrix: name: 'GenerateTestMatrix' runs-on: ubuntu-latest outputs: ec2_linux_matrix: ${{ steps.set-matrix.outputs.ec2_linux_matrix }} + ec2_performance_matrix: ${{steps.set-matrix.outputs.ec2_performance_matrix}} ec2_windows_matrix: ${{ steps.set-matrix.outputs.ec2_windows_matrix }} ecs_fargate_matrix: ${{ steps.set-matrix.outputs.ecs_fargate_matrix }} steps: @@ -156,12 +157,14 @@ jobs: run: | go run --tags=generator integration/generator/test_case_generator.go echo "::set-output name=ec2_linux_matrix::$(echo $(cat integration/generator/resources/ec2_linux_complete_test_matrix.json))" + echo "::set-output name=ec2_performance_matrix::$(echo $(cat integration/generator/resources/ec2_performance_complete_test_matrix.json))" echo "::set-output name=ec2_windows_matrix::$(echo $(cat integration/generator/resources/ec2_windows_complete_test_matrix.json))" echo "::set-output name=ecs_fargate_matrix::$(echo $(cat integration/generator/resources/ecs_fargate_complete_test_matrix.json))" - name: Echo test plan matrix run: | echo ${{ steps.set-matrix.outputs.ec2_linux_matrix }} + echo ${{ steps.set-matrix.outputs.ec2_performance_matrix}} echo ${{ steps.set-matrix.outputs.ec2_windows_matrix }} echo ${{ steps.set-matrix.outputs.ecs_fargate_matrix }} @@ -614,6 +617,10 @@ jobs: name: "PerformanceTrackingTest" needs: [MakeBinary, StartLocalStack, GenerateTestMatrix] runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + arrays: ${{ fromJson(needs.GenerateTestMatrix.outputs.ec2_performance_matrix) }} steps: - uses: actions/checkout@v2 @@ -633,52 +640,51 @@ jobs: - name: Echo Test Info run: echo run performance-tracking - - - name: Verify Terraform version - run: terraform --version - + - name: Get SHA + id: sha + run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)" - name: Get git date id: sha_date run: echo "::set-output name=sha_date::$(git show -s --format=%ct ${{ steps.sha.outputs.sha_short }} )" - - name: Check env - run: echo "SHA ${GITHUB_SHA} | Date ${{ steps.sha_date.outputs.sha_date }}" - + run: echo "SHA ${{ steps.sha.outputs.sha_short }} | Date ${{ steps.sha_date.outputs.sha_date }} " + - name: Verify Terraform version + run: terraform --version - name: Terraform apply if: steps.performance-tracking.outputs.cache-hit != 'true' uses: nick-invision/retry@v2 with: max_attempts: 1 - timeout_minutes: 50 + timeout_minutes: 30 retry_wait_seconds: 5 command: | cd integration/terraform/ec2/linux terraform init if terraform apply --auto-approve \ -var="ssh_key=${PRIVATE_KEY}" -var="github_repo=${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git" \ - -var="github_sha=${GITHUB_SHA}" -var="install_agent=rpm -U ./amazon-cloudwatch-agent.rpm" \ - -var="user=ec2-user" \ - -var="ami=cloudwatch-agent-integration-test-al2*" \ - -var="ca_cert_path=/etc/ssl/certs/ca-bundle.crt" \ - -var="arc=amd64" \ - -var="binary_name=amazon-cloudwatch-agent.rpm" \ + -var="github_sha=${GITHUB_SHA}" -var="install_agent=${{ matrix.arrays.installAgentCommand }}" \ + -var="user=${{ matrix.arrays.username }}" \ + -var="ami=${{ matrix.arrays.ami }}" \ + -var="ca_cert_path=${{ matrix.arrays.caCertPath }}" \ + -var="arc=${{ matrix.arrays.arc }}" \ + -var="binary_name=${{ matrix.arrays.binaryName }}" \ -var="local_stack_host_name=${{ needs.StartLocalStack.outputs.local_stack_host_name }}" \ -var="s3_bucket=${S3_INTEGRATION_BUCKET}" \ -var="key_name=${KEY_NAME}" \ - -var="test_name=cw-integ-test-al2" \ -var="github_sha_date=${{ steps.sha_date.outputs.sha_date }}" \ - -var="test_dir=./integration/test/performancetest" ; then terraform destroy -auto-approve - + -var="test_name=cw-integ-test-${{ matrix.arrays.os }}" \ + -var="performance_number_of_logs=${{ matrix.arrays.performance_number_of_logs}}"\ + -var="test_dir=${{ matrix.arrays.test_dir }}" ; then terraform destroy -auto-approve else terraform destroy -auto-approve && exit 1 fi #This is here just in case workflow cancel - name: Terraform destroy - if: ${{ cancelled() && steps.performance-tracking.outputs.cache-hit != 'true' }} + if: ${{ cancelled() && steps.ec2-linux-integration-test.outputs.cache-hit != 'true' }} uses: nick-invision/retry@v2 with: max_attempts: 3 timeout_minutes: 8 retry_wait_seconds: 5 - command: cd integration/terraform/ec2/linux && terraform destroy --auto-approve + command: cd integration/terraform/ec2/linux && terraform destroy --auto-approve \ No newline at end of file diff --git a/.github/workflows/releaseTest.yml b/.github/workflows/releaseTest.yml new file mode 100644 index 0000000000..fb4b2d3163 --- /dev/null +++ b/.github/workflows/releaseTest.yml @@ -0,0 +1,52 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: MIT + +name: Release Update +env: + PRIVATE_KEY: ${{ secrets.AWS_PRIVATE_KEY }} + TERRAFORM_AWS_ACCESS_KEY_ID: ${{ secrets.TERRAFORM_AWS_ACCESS_KEY_ID }} + TERRAFORM_AWS_SECRET_ACCESS_KEY: ${{ secrets.TERRAFORM_AWS_SECRET_ACCESS_KEY }} + S3_INTEGRATION_BUCKET: ${{ secrets.S3_INTEGRATION_BUCKET }} + KEY_NAME: ${{ secrets.KEY_NAME }} + VPC_SECURITY_GROUPS_IDS: ${{ secrets.VPC_SECURITY_GROUPS_IDS }} + IAM_ROLE: ${{ secrets.IAM_ROLE }} + GPG_PRIVATE_KEY: ${{ secrets.GPG_PRIVATE_KEY }} + PASSPHRASE: ${{ secrets.PASSPHRASE }} + GPG_KEY_NAME: ${{ secrets.GPG_KEY_NAME }} + GPG_TTY: $(tty) + +on: + release: + types: [created] + + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref_name }} + cancel-in-progress: true + +jobs: + UpdatePerformanceMetrics: + name: "UpdatePerformanceMetrics" + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set up Go 1.x + uses: actions/setup-go@v2 + with: + go-version: ~1.18.3 + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.TERRAFORM_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.TERRAFORM_AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + + - name: Update isRelease for this release + run: | + cd integration/test/performancetest + export IS_RELEASE=true + export SHA=$GITHUB_SHA + go test -run TestUpdateCommit -p 1 -v --tags=integration + \ No newline at end of file diff --git a/go.mod b/go.mod index 52a5097775..fadb74b8b0 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,7 @@ require ( github.com/gobwas/glob v0.2.3 github.com/google/cadvisor v0.44.0 github.com/google/go-cmp v0.5.8 + github.com/google/uuid v1.3.0 github.com/hashicorp/golang-lru v0.5.4 github.com/influxdata/telegraf v0.0.0-00010101000000-000000000000 github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65 @@ -177,7 +178,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/go-querystring v1.0.0 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/googleapis/gax-go/v2 v2.3.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/gophercloud/gophercloud v0.24.0 // indirect diff --git a/go.sum b/go.sum index a83633fd1c..335629e9bb 100644 --- a/go.sum +++ b/go.sum @@ -2723,4 +2723,4 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q= -sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= +sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= \ No newline at end of file diff --git a/integration/generator/resources/ec2_performance_test_matrix.json b/integration/generator/resources/ec2_performance_test_matrix.json new file mode 100644 index 0000000000..357095c196 --- /dev/null +++ b/integration/generator/resources/ec2_performance_test_matrix.json @@ -0,0 +1,32 @@ +[ + { + "os": "al2", + "username": "ec2-user", + "installAgentCommand": "rpm -U ./amazon-cloudwatch-agent.rpm", + "ami": "cloudwatch-agent-integration-test-al2*", + "caCertPath": "/etc/ssl/certs/ca-bundle.crt", + "arc": "amd64", + "binaryName": "amazon-cloudwatch-agent.rpm", + "performance_number_of_logs": "10" + }, + { + "os": "al2", + "username": "ec2-user", + "installAgentCommand": "rpm -U ./amazon-cloudwatch-agent.rpm", + "ami": "cloudwatch-agent-integration-test-al2*", + "caCertPath": "/etc/ssl/certs/ca-bundle.crt", + "arc": "amd64", + "binaryName": "amazon-cloudwatch-agent.rpm", + "performance_number_of_logs": "100" + }, + { + "os": "al2", + "username": "ec2-user", + "installAgentCommand": "rpm -U ./amazon-cloudwatch-agent.rpm", + "ami": "cloudwatch-agent-integration-test-al2*", + "caCertPath": "/etc/ssl/certs/ca-bundle.crt", + "arc": "amd64", + "binaryName": "amazon-cloudwatch-agent.rpm", + "performance_number_of_logs": "1000" + } +] \ No newline at end of file diff --git a/integration/generator/test_case_generator.go b/integration/generator/test_case_generator.go index 456c0e5f08..6527e150c0 100644 --- a/integration/generator/test_case_generator.go +++ b/integration/generator/test_case_generator.go @@ -25,6 +25,9 @@ var osToTestDirMap = map[string][]string{ "./integration/test/cloudwatchlogs", "./integration/test/metrics_number_dimension", }, + "ec2_performance":{ + "./integration/test/performancetest", + }, // @TODO add real tests "ec2_windows": {""}, "ec2_mac": {}, diff --git a/integration/terraform/ec2/linux/iam.tf b/integration/terraform/ec2/linux/iam.tf index 963791c773..d82b50522f 100644 --- a/integration/terraform/ec2/linux/iam.tf +++ b/integration/terraform/ec2/linux/iam.tf @@ -47,7 +47,9 @@ data "aws_iam_policy_document" "user-managed-policy-document" { "s3:ListBucket", "dynamodb:DescribeTable", "dynamodb:PutItem", - "dynamodb:CreateTable" + "dynamodb:CreateTable", + "dynamodb:Query", + "dynamodb:UpdateItem" ] resources = ["*"] } @@ -61,4 +63,4 @@ resource "aws_iam_policy" "cwagent_server_policy" { resource "aws_iam_role_policy_attachment" "cwagent_server_policy_attachment" { role = aws_iam_role.cwagent_role.name policy_arn = aws_iam_policy.cwagent_server_policy.arn -} \ No newline at end of file +} diff --git a/integration/terraform/ec2/linux/main.tf b/integration/terraform/ec2/linux/main.tf index b32eb7b9f1..525a057216 100644 --- a/integration/terraform/ec2/linux/main.tf +++ b/integration/terraform/ec2/linux/main.tf @@ -46,6 +46,7 @@ resource "null_resource" "integration_test" { # Prepare Integration Test provisioner "remote-exec" { inline = [ + "echo sha ${var.github_sha}", "cloud-init status --wait", "echo clone and install agent", "git clone ${var.github_repo}", @@ -80,10 +81,11 @@ resource "null_resource" "integration_test" { "export PATH=$PATH:/snap/bin:/usr/local/go/bin", "echo run integration test", "cd ~/amazon-cloudwatch-agent", - "go test ./integration/test/sanity -p 1 -v --tags=integration", + "echo run sanity test && go test ./integration/test/sanity -p 1 -v --tags=integration", "export SHA=${var.github_sha}", "export SHA_DATE=${var.github_sha_date}", - "go test ${var.test_dir} -p 1 -v --tags=integration" + "export PERFORMANCE_NUMBER_OF_LOGS=${var.performance_number_of_logs}", + "go test ${var.test_dir} -p 1 -timeout 30m -v --tags=integration " ] connection { type = "ssh" diff --git a/integration/terraform/ec2/linux/variables.tf b/integration/terraform/ec2/linux/variables.tf index aff2297d60..4afd05e8e4 100644 --- a/integration/terraform/ec2/linux/variables.tf +++ b/integration/terraform/ec2/linux/variables.tf @@ -82,4 +82,8 @@ variable "github_repo" { variable "github_sha_date"{ type = string default = "" -} \ No newline at end of file +} +variable "performance_number_of_logs"{ + type = string + default = "" +} diff --git a/integration/test/performancetest/performance_query_utils.go b/integration/test/performancetest/performance_query_utils.go index f1e2b44e10..bb8ec2661e 100644 --- a/integration/test/performancetest/performance_query_utils.go +++ b/integration/test/performancetest/performance_query_utils.go @@ -16,6 +16,8 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + + "github.com/google/uuid" ) const ( @@ -23,13 +25,23 @@ const ( DimensionName = "InstanceId" Stat = "Average" Period = 10 - METRIC_PERIOD = 5 * 60 // this const is in seconds , 5 mins PARTITION_KEY ="Year" HASH = "Hash" COMMIT_DATE= "CommitDate" SHA_ENV = "SHA" SHA_DATE_ENV = "SHA_DATE" + IS_RELEASE = "isRelease" + TEST_ID ="TestID" + TPS = "TPS" + PERFORMANCE_NUMBER_OF_LOGS = "PERFORMANCE_NUMBER_OF_LOGS" + RESULTS = "Results" + /* + TEST_ID is used for version control, in order to make sure the + item has not changed between item being editted and updated. + TEST_ID is checked atomicaly. + TEST_ID uses UIUD to give unique id to each packet. + */ ) //struct that holds statistics on the returned data @@ -177,16 +189,19 @@ func GetPerformanceMetrics(instanceId string, agentRuntime, logNum, tps int, age packet[PARTITION_KEY] = time.Now().Year() packet[HASH] = os.Getenv(SHA_ENV) //fmt.Sprintf("%d", time.Now().UnixNano()) packet[COMMIT_DATE],_ = strconv.Atoi(os.Getenv(SHA_DATE_ENV)) - + packet[IS_RELEASE] = false //add test metadata - packet["NumberOfLogsMonitored"] = logNum - packet["TPS"] = tps + packet[TEST_ID] = uuid.New().String() + testSettings := fmt.Sprintf("%d-%d",logNum,tps) + testMetricResults := make(map[string]Stats) + //add actual test data with statistics for _, result := range metrics.MetricDataResults { - packet[*result.Label] = CalcStats(result.Values) + stats:= CalcStats(result.Values) + testMetricResults[*result.Label] = stats } - + packet[RESULTS] = map[string]map[string]Stats{ testSettings: testMetricResults} return packet, nil } diff --git a/integration/test/performancetest/performance_test.go b/integration/test/performancetest/performance_test.go index 72c6ff14cf..f510b72af8 100644 --- a/integration/test/performancetest/performance_test.go +++ b/integration/test/performancetest/performance_test.go @@ -9,7 +9,7 @@ import ( "fmt" "log" "os" - //"strconv" + "strconv" "sync" "testing" "time" @@ -35,11 +35,10 @@ type LogInfo struct { func TestPerformance(t *testing.T) { //get number of logs for test from github action //@TODO - //logNum, err := strconv.Atoi(os.Getenv(testLogNum)) //requires a commit from Okan that updates the workflow file so the log tests will run concurrently - // if err != nil { - // t.Fatalf("Error: cannot convert test log number to integer, %v", err) - // } - logNum := 10 //THIS IS TEMPORARY SO CODE RUNS + logNum, err := strconv.Atoi(os.Getenv(testLogNum)) + if err != nil { + t.Fatalf("Error: cannot convert test log number to integer, %v", err) + } agentContext := context.TODO() instanceId := test.GetInstanceId() @@ -105,8 +104,10 @@ func TestPerformance(t *testing.T) { if data == nil { t.Fatalf("No data") } - - _, err = dynamoDB.SendItem(data) + // this print shows the sendItem packet,it can be used to debug attribute issues + fmt.Printf("%v \n",data) + + _, err = dynamoDB.SendItem(data,tps) if err != nil { t.Fatalf("Error: couldn't upload metric data to table, %v", err) } @@ -251,3 +252,18 @@ func GetLogFilePaths(configPath string) ([]string, error) { return filePaths, nil } + +func TestUpdateCommit(t*testing.T){ + if(os.Getenv("IS_RELEASE") !="true"){ + t.SkipNow() + } + t.Log("Updating Release Commit",os.Getenv(SHA_ENV)) + dynamoDB := InitializeTransmitterAPI("CWAPerformanceMetrics") //add cwa version here + testHash := os.Getenv(SHA_ENV) + if dynamoDB == nil{ + t.Fatalf("Error: generating dynamo table") + return + } + + dynamoDB.UpdateReleaseTag(testHash) +} diff --git a/integration/test/performancetest/transmitter.go b/integration/test/performancetest/transmitter.go index fc9ac19ecd..a68a746e9f 100644 --- a/integration/test/performancetest/transmitter.go +++ b/integration/test/performancetest/transmitter.go @@ -5,6 +5,9 @@ import ( "errors" "fmt" "log" + "math/rand" + "os" + "strings" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -14,6 +17,16 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) +const ( + UPDATE_DELAY_THRESHOLD = 60 // this is how long we want to wait for random sleep in seconds + MAX_ATTEMPTS = 5 // number of attemps before we stop retrying to update + /* + !Warning: if this value is less than 25 there is a risk of testCases being lost. + This will only happen if all test threads and at the same time and get the same + sleep value after first attempt to add ITEM + */ +) + type TransmitterAPI struct { dynamoDbClient *dynamodb.Client DataBaseName string // this is the name of the table when test is run @@ -26,7 +39,7 @@ Side effects: Creates a dynamodb table if it doesn't already exist */ func InitializeTransmitterAPI(DataBaseName string) *TransmitterAPI { //setup aws session - cfg, err := config.LoadDefaultConfig(context.TODO(),config.WithRegion("us-west-2")) + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-west-2")) if err != nil { fmt.Printf("Error: Loading in config %s\n", err) } @@ -55,7 +68,7 @@ func InitializeTransmitterAPI(DataBaseName string) *TransmitterAPI { CreateTable() Desc: Will create a DynamoDB Table with given param. and config */ - //add secondary index space vs time +//add secondary index space vs time func (transmitter *TransmitterAPI) CreateTable() error { _, err := transmitter.dynamoDbClient.CreateTable( context.TODO(), &dynamodb.CreateTableInput{ @@ -65,9 +78,13 @@ func (transmitter *TransmitterAPI) CreateTable() error { AttributeType: types.ScalarAttributeTypeN, }, { - AttributeName: aws.String("CommitDate"), + AttributeName: aws.String(COMMIT_DATE), AttributeType: types.ScalarAttributeTypeN, }, + { + AttributeName: aws.String(HASH), + AttributeType: types.ScalarAttributeTypeS, + }, }, KeySchema: []types.KeySchemaElement{ { @@ -75,11 +92,32 @@ func (transmitter *TransmitterAPI) CreateTable() error { KeyType: types.KeyTypeHash, }, { - AttributeName: aws.String("CommitDate"), - KeyType: types.KeyTypeRange, + AttributeName: aws.String(COMMIT_DATE), + KeyType: types.KeyTypeRange, + }, + }, + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ // this make sure we can query hashes in O(1) time + { + IndexName: aws.String("Hash-index"), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String(HASH), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String(COMMIT_DATE), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: "ALL", + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, }, }, - ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(10), WriteCapacityUnits: aws.Int64(10), @@ -92,10 +130,10 @@ func (transmitter *TransmitterAPI) CreateTable() error { } //https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStarted.CreateTable.html waiter := dynamodb.NewTableExistsWaiter(transmitter.dynamoDbClient) - err = waiter.Wait(context.TODO(), &dynamodb.DescribeTableInput{ - TableName: aws.String(transmitter.DataBaseName)}, 5* time.Minute) - if err != nil { - log.Printf("Wait for table exists failed. Here's why: %v\n", err) + err = waiter.Wait(context.TODO(), &dynamodb.DescribeTableInput{ + TableName: aws.String(transmitter.DataBaseName)}, 5*time.Minute) //5 minutes is the timeout value for a table creation + if err != nil { + log.Printf("Wait for table exists failed. Here's why: %v\n", err) } fmt.Println("Created the table", transmitter.DataBaseName) return nil @@ -111,16 +149,22 @@ Side effects: Adds an item to dynamodb table */ func (transmitter *TransmitterAPI) AddItem(packet map[string]interface{}) (string, error) { + var ae *types.ConditionalCheckFailedException // this exception represent the atomic check has failed item, err := attributevalue.MarshalMap(packet) if err != nil { panic(err) } _, err = transmitter.dynamoDbClient.PutItem(context.TODO(), &dynamodb.PutItemInput{ - Item: item, - TableName: aws.String(transmitter.DataBaseName), + Item: item, + TableName: aws.String(transmitter.DataBaseName), + ConditionExpression: aws.String("attribute_not_exists(#hash)"), + ExpressionAttributeNames: map[string]string{ + "#hash": HASH, + }, }) - if err != nil { + + if err != nil && !errors.As(err, &ae) { fmt.Printf("Error adding item to table. %v\n", err) } return fmt.Sprintf("%v", item), err @@ -149,16 +193,200 @@ func (transmitter *TransmitterAPI) TableExist() (bool, error) { return exists, err } - /* SendItem() Desc: Parses the input data and adds it to the dynamo table -Param: data []byte is the data collected by data collector +Param: packet map[string]interface{} is the data collected by data collector */ -func (transmitter *TransmitterAPI) SendItem(packet map[string]interface{}) (string, error) { - //this is currently a passthrough function and will change to have functionality with Okan's PR - //@TODO - sentItem, err := transmitter.AddItem(packet) +func (transmitter *TransmitterAPI) SendItem(packet map[string]interface{}, tps int) (string, error) { + var sentItem string + var ae *types.ConditionalCheckFailedException // this exception represent the atomic check has failed + // check if hash exists + currentItem, err := transmitter.Query(packet[HASH].(string)) + if err != nil { + return "", err + } + if len(currentItem) == 0 { // if an item with the same hash doesn't exist add it + sentItem, err = transmitter.AddItem(packet) + // this may be overwritten by other test threads, in that case it will return a specific error + + if !errors.As(err, &ae) { // check if our add call got overwritten by other threads + return sentItem, err + } + if err != nil { //any other error dont try again + return "", err + } + // addItem failed due to a competing thread + // instead of adding, proceed to update the item, with the same data + rand.Seed(time.Now().UnixNano()) + time.Sleep(time.Duration(rand.Intn(UPDATE_DELAY_THRESHOLD)) * time.Second) + fmt.Println("Item already exist going to update", len(currentItem)) + } + // item already exist so update the item instead + err = transmitter.UpdateItem(packet, tps) //try to update the item + //this may be overwritten by other test threads, in that case it will return a specific error + if err != nil { + return "", err + } + fmt.Println("SendItem Completed") return sentItem, err + +} + +/* +PacketMerger() +Desc: + This function updates the currentPacket with the unique parts of newPacket and returns in dynamo format +Params: + newPacket: this is the agentData collected in this test + currentPacket: this is the agentData stored in dynamo currently +*/ +func (transmitter *TransmitterAPI) PacketMerger(newPacket map[string]interface{}, currentPacket map[string]interface{}, tps int) (map[string]interface{}, error) { + testSettings := fmt.Sprintf("%s-%d", os.Getenv(PERFORMANCE_NUMBER_OF_LOGS), tps) + fmt.Println("The test is", testSettings) + item := currentPacket[RESULTS].(map[string]interface{}) + _, isPresent := item[testSettings] // check if we already had this test + if isPresent { + // we already had this test so ignore it + return nil, errors.New("Nothing to update") + } + newAttributes := make(map[string]interface{}) + mergedResults := make(map[string]interface{}) + if newPacket[RESULTS] != nil { + testSettingValue := newPacket[RESULTS].(map[string]map[string]Stats)[testSettings] + for attribute, value := range item { + _, isPresent := newPacket[RESULTS].(map[string]map[string]Stats)[attribute] + if isPresent { + continue + } + mergedResults[attribute] = value + + } + mergedResults[testSettings] = testSettingValue + newAttributes[RESULTS] = mergedResults + } + if newPacket[IS_RELEASE] != nil { + newAttributes[IS_RELEASE] = true + } + // newAttributes, _ := attributevalue.MarshalMap(mergedResults) + // newAttributes[IS_RELEASE] = &types.AttributeValueMemberBOOL{Value: true} + // return newAttributes, nil + return newAttributes, nil } +/* +UpdateItem() +Desc: + This function updates the item in dynamo if the atomic condition is true else it will return ConditionalCheckFailedException +Params: + hash: this is the commitHash + targetAttributes: this is the targetAttribute to be added to the dynamo item + testHash: this is the hash of the last item, used like a version check +*/ +func (transmitter *TransmitterAPI) UpdateItem(packet map[string]interface{}, tps int) error { + var ae *types.ConditionalCheckFailedException // this exception represent the atomic check has failed + rand.Seed(time.Now().UnixNano()) + randomSleepDuration := time.Duration(rand.Intn(UPDATE_DELAY_THRESHOLD)) * time.Second + hash := packet[HASH].(string) + for attemptCount := 0; attemptCount < MAX_ATTEMPTS; attemptCount++ { + fmt.Println("Updating:", hash) + item, err := transmitter.Query(hash) // get most Up to date item from dynamo | O(1) bcs of global sec. idx. + if len(item) == 0 { // check if hash is in dynamo + return errors.New("ERROR: Hash is not found in dynamo") + } + commitDate := fmt.Sprintf("%d", int(item[0][COMMIT_DATE].(float64))) + year := fmt.Sprintf("%d", int(item[0][PARTITION_KEY].(float64))) + testHash := item[0][TEST_ID].(string) + mergedAttributes, err := transmitter.PacketMerger(packet, item[0], tps) + if err != nil { + return err + } + targetAttributes, err := attributevalue.MarshalMap(mergedAttributes) + if err != nil { + return err + } + //setup the update expression + expressionAttributeValues := make(map[string]types.AttributeValue) + expression := "set " + n_expression := len(targetAttributes) + i := 0 + for attribute, value := range targetAttributes { + expressionName := ":" + strings.ToLower(attribute) + expression += fmt.Sprintf("%s = %s", attribute, expressionName) + expressionAttributeValues[expressionName] = value + if n_expression-1 > i { + expression += ", " + } + i++ + } + expressionAttributeValues[":testID"] = &types.AttributeValueMemberS{Value: testHash} + //call update + _, err = transmitter.dynamoDbClient.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{ + TableName: aws.String(transmitter.DataBaseName), + Key: map[string]types.AttributeValue{ + "Year": &types.AttributeValueMemberN{Value: year}, + "CommitDate": &types.AttributeValueMemberN{Value: commitDate}, + }, + UpdateExpression: aws.String(expression), + ExpressionAttributeValues: expressionAttributeValues, + ConditionExpression: aws.String("#testID = :testID"), + ExpressionAttributeNames: map[string]string{ + "#testID": TEST_ID, + }, + }) + if errors.As(err, &ae) { //check if our call got overwritten + // item has changed + fmt.Println("Retrying...") + time.Sleep(randomSleepDuration) + continue + } + if err != nil { + return err + } + fmt.Println("Update Completed") + return nil + } + // if the code reaches here it means we have reach MAX_ATTEMPTS + return errors.New("ERROR: We reached max number of attempts dropping the update") +} + +/* +UpdateReleaseTag() +Desc: This function takes in a commit hash and updates the release value to true +Param: commit hash in terms of string +*/ +func (transmitter *TransmitterAPI) UpdateReleaseTag(hash string) error { + var err error + packet := make(map[string]interface{}) + packet[HASH] = hash + packet[IS_RELEASE] = true + err = transmitter.UpdateItem(packet, 0) //try to update the item + //this may be overwritten by other test threads, in that case it will return a specific error + if err != nil { + return err + } + return err +} + +func (transmitter *TransmitterAPI) Query(hash string) ([]map[string]interface{}, error) { + var err error + var packets []map[string]interface{} + out, err := transmitter.dynamoDbClient.Query(context.TODO(), &dynamodb.QueryInput{ + TableName: aws.String(transmitter.DataBaseName), + IndexName: aws.String("Hash-index"), + KeyConditionExpression: aws.String("#hash = :hash"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":hash": &types.AttributeValueMemberS{Value: hash}, + }, + ExpressionAttributeNames: map[string]string{ + "#hash": HASH, + }, + ScanIndexForward: aws.Bool(true), // true or false to sort by "date" Sort/Range key ascending or descending + }) + if err != nil { + return nil, err + } + // fmt.Println(out.Items) + attributevalue.UnmarshalListOfMaps(out.Items, &packets) + return packets, err +}