diff --git a/content/monte-carlo-with-batch/monte-carlo-with-batch.files/stack.yaml b/content/monte-carlo-with-batch/monte-carlo-with-batch.files/stack.yaml index e27b26bb..7314a190 100644 --- a/content/monte-carlo-with-batch/monte-carlo-with-batch.files/stack.yaml +++ b/content/monte-carlo-with-batch/monte-carlo-with-batch.files/stack.yaml @@ -592,7 +592,52 @@ Resources: Type: AWS::Lambda::Function Properties: Code: - ZipFile: "import json, boto3, sys\n\ndef lambda_handler(event, context):\n uri = event['inputUri']\n uri_components = uri.split('s3://')[1].split('/')\n bucket = uri_components[0]\n key = uri_components[1]\n s3 = boto3.resource('s3')\n obj = s3.Object(bucket, key)\n \n raw_data = obj.get()['Body'].read().decode('utf-8') \n json_data = json.loads(raw_data)\n array_job_size = len(json_data[\"positions\"])\n return {\n 'statusCode': 200,\n 'body': \n {\n 'arrayJobSize': array_job_size,\n 'bucket': bucket,\n 'key': key\n }\n }" + ZipFile: | + import json + import boto3 + import sys + + def lambda_handler(event, context): + uri = event['outputUri'] # the location that the AWS Batch result files were written to + uri_components = uri.split('s3://')[1].split('/') + bucket = uri_components[0] + prefix = uri_components[1] + + s3 = boto3.resource('s3') + result = s3.meta.client.list_objects_v2(Bucket=bucket, Prefix=prefix) + + items = result.get("Contents", []) + aggregation = {} # Dictionary for aggregating by strike + for item in items: + # get handle to S3 object + s3key = item["Key"] + obj = s3.Object(bucket, s3key) + + # each file contains a single line of JSON containing the strike and PV + strike_pv_pair = obj.get()['Body'].read().decode('utf-8') + pair = json.loads(strike_pv_pair) + if pair: + strike = pair.get("strike") + pv = pair.get("pv", "0") # Provide a default value of "0" if "pv" is not present + if strike and strike != 'null' and pv: + try: + pv_value = float(pv) + if strike in aggregation: + aggregation[strike] += pv_value + else: + aggregation[strike] = pv_value + except ValueError: + # Handle the case where pv cannot be converted to float + print(f"Skipping invalid pv value: {pv} for strike: {strike}") + + # upload to S3 + obj = s3.Object(bucket, "aggregation_" + prefix + ".json") + obj.put(Body=(bytes(json.dumps(aggregation).encode('utf-8')))) + + return { + 'statusCode': 200, + 'body': json.dumps(aggregation, default=str) + } Role: Fn::GetAtt: - PreprocessingServiceRole532E6474