Skip to content

Commit

Permalink
Merge pull request #13608 from patriot1burke/lambda-s3event
Browse files Browse the repository at this point in the history
Support S3Event aws lambda
  • Loading branch information
patriot1burke authored Dec 3, 2020
2 parents 18796ff + 2c82c60 commit b092340
Show file tree
Hide file tree
Showing 17 changed files with 556 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;

import io.quarkus.runtime.Application;
import io.quarkus.runtime.ShutdownContext;
Expand Down Expand Up @@ -144,9 +143,9 @@ public void run() {
protected abstract void processRequest(InputStream input, OutputStream output, AmazonLambdaContext context)
throws Exception;

protected abstract ObjectReader getInputReader();
protected abstract LambdaInputReader getInputReader();

protected abstract ObjectWriter getOutputWriter();
protected abstract LambdaOutputWriter getOutputWriter();

protected AmazonLambdaContext createContext(HttpURLConnection requestConnection) throws IOException {
return new AmazonLambdaContext(requestConnection, cognitoIdReader, clientCtxReader);
Expand Down Expand Up @@ -202,4 +201,4 @@ boolean abortGracefully(Exception ex) {
return graceful;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.IOException;
import java.io.InputStream;

import com.fasterxml.jackson.databind.ObjectReader;

public class JacksonInputReader implements LambdaInputReader {
final private ObjectReader reader;

public JacksonInputReader(ObjectReader reader) {
this.reader = reader;
}

@Override
public Object readValue(InputStream is) throws IOException {
return reader.readValue(is);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.IOException;
import java.io.OutputStream;

import com.fasterxml.jackson.databind.ObjectWriter;

public class JacksonOutputWriter implements LambdaOutputWriter {
final private ObjectWriter writer;

public JacksonOutputWriter(ObjectWriter writer) {
this.writer = writer;
}

@Override
public void writeValue(OutputStream os, Object obj) throws IOException {
writer.writeValue(os, obj);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.IOException;
import java.io.InputStream;

public interface LambdaInputReader<T> {
T readValue(InputStream is) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.IOException;
import java.io.OutputStream;

public interface LambdaOutputWriter {
void writeValue(OutputStream os, Object obj) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;

import io.quarkus.amazon.lambda.runtime.handlers.S3EventInputReader;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
Expand All @@ -32,8 +32,8 @@ public class AmazonLambdaRecorder {
private static Class<? extends RequestHandler<?, ?>> handlerClass;
private static Class<? extends RequestStreamHandler> streamHandlerClass;
private static BeanContainer beanContainer;
private static ObjectReader objectReader;
private static ObjectWriter objectWriter;
private static LambdaInputReader objectReader;
private static LambdaOutputWriter objectWriter;

public void setStreamHandlerClass(Class<? extends RequestStreamHandler> handler, BeanContainer container) {
streamHandlerClass = handler;
Expand All @@ -45,8 +45,12 @@ public void setHandlerClass(Class<? extends RequestHandler<?, ?>> handler, BeanC
beanContainer = container;
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper;
Method handlerMethod = discoverHandlerMethod(handlerClass);
objectReader = objectMapper.readerFor(handlerMethod.getParameterTypes()[0]);
objectWriter = objectMapper.writerFor(handlerMethod.getReturnType());
if (handlerMethod.getParameterTypes()[0].equals(S3Event.class)) {
objectReader = new S3EventInputReader(objectMapper);
} else {
objectReader = new JacksonInputReader(objectMapper.readerFor(handlerMethod.getParameterTypes()[0]));
}
objectWriter = new JacksonOutputWriter(objectMapper.writerFor(handlerMethod.getReturnType()));
}

/**
Expand Down Expand Up @@ -149,12 +153,12 @@ protected Object processRequest(Object input, AmazonLambdaContext context) throw
}

@Override
protected ObjectReader getInputReader() {
protected LambdaInputReader getInputReader() {
return objectReader;
}

@Override
protected ObjectWriter getOutputWriter() {
protected LambdaOutputWriter getOutputWriter() {
return objectWriter;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.amazon.lambda.runtime.handlers;

import com.fasterxml.jackson.databind.JsonNode;

public class JacksonUtil {
public static String getText(String name, JsonNode node) {
JsonNode e = node.get(name);
return e == null ? null : e.asText();
}

public static Long getLong(String name, JsonNode node) {
JsonNode e = node.get(name);
return e == null ? null : e.asLong();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.quarkus.amazon.lambda.runtime.handlers;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.models.s3.S3EventNotification;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.amazon.lambda.runtime.LambdaInputReader;

public class S3EventInputReader implements LambdaInputReader<S3Event> {
final ObjectMapper mapper;

public S3EventInputReader(ObjectMapper mapper) {
this.mapper = mapper;
}

@Override
public S3Event readValue(InputStream is) throws IOException {
JsonNode json = mapper.readTree(is);
JsonNode records = json.get("Records");
if (records == null || !records.isArray()) {
return new S3Event(Collections.EMPTY_LIST);
}
List<S3EventNotification.S3EventNotificationRecord> list = new ArrayList<>();

for (int i = 0; i < records.size(); i++) {
JsonNode record = records.get(i);
String awsRegion = JacksonUtil.getText("awsRegion", record);
String eventName = JacksonUtil.getText("eventName", record);
String eventSource = JacksonUtil.getText("eventSource", record);
String eventTime = JacksonUtil.getText("eventTime", record);
String eventVersion = JacksonUtil.getText("eventVersion", record);
JsonNode params = record.get("requestParameters");
S3EventNotification.RequestParametersEntity requestParameters = null;
if (params != null) {
requestParameters = new S3EventNotification.RequestParametersEntity(
JacksonUtil.getText("sourceIPAddress", params));
}
JsonNode elems = record.get("responseElements");
S3EventNotification.ResponseElementsEntity responseElements = null;
if (elems != null) {
String requestId = JacksonUtil.getText("x-amz-request-id", elems);
String id = JacksonUtil.getText("x-amz-id-2", elems);
responseElements = new S3EventNotification.ResponseElementsEntity(id, requestId);
}
JsonNode userIdentity = record.get("userIdentity");
S3EventNotification.UserIdentityEntity userId = null;
if (userIdentity != null) {
String principalId = JacksonUtil.getText("principalId", userIdentity);
userId = new S3EventNotification.UserIdentityEntity(principalId);
}

JsonNode s3 = record.get("s3");
S3EventNotification.S3Entity s3Entity = null;
if (s3 != null) {
String configurationId = JacksonUtil.getText("configurationId", s3);
String schemaVersion = JacksonUtil.getText("s3SchemaVersion", s3);
JsonNode bucketNode = s3.get("bucket");
S3EventNotification.S3BucketEntity bucket = null;
if (bucketNode != null) {
String name = JacksonUtil.getText("name", bucketNode);
JsonNode ownerIdentity = bucketNode.get("ownerIdentity");
S3EventNotification.UserIdentityEntity owner = null;
if (ownerIdentity != null) {
String principalId = JacksonUtil.getText("principalId", ownerIdentity);
owner = new S3EventNotification.UserIdentityEntity(principalId);
}
String arn = JacksonUtil.getText("arn", bucketNode);
bucket = new S3EventNotification.S3BucketEntity(name, owner, arn);
}
JsonNode object = s3.get("object");
S3EventNotification.S3ObjectEntity obj = null;
if (object != null) {
String key = JacksonUtil.getText("key", object);
Long size = JacksonUtil.getLong("size", object);
String eTag = JacksonUtil.getText("eTag", object);
String versionId = JacksonUtil.getText("versionId", object);
String sequencer = JacksonUtil.getText("sequencer", object);
obj = new S3EventNotification.S3ObjectEntity(key, size, eTag, versionId, sequencer);
}
s3Entity = new S3EventNotification.S3Entity(configurationId, bucket, obj, schemaVersion);
}
S3EventNotification.S3EventNotificationRecord r = new S3EventNotification.S3EventNotificationRecord(awsRegion,
eventName, eventSource, eventTime,
eventVersion, requestParameters,
responseElements, s3Entity, userId);
list.add(r);
}
return new S3Event(list);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.ByteArrayInputStream;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.models.s3.S3EventNotification;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.amazon.lambda.runtime.handlers.S3EventInputReader;

public class S3EventTest {

static String json = "{\n" +
" \"Records\":[\n" +
" {\n" +
" \"eventVersion\":\"2.0\",\n" +
" \"eventSource\":\"aws:s3\",\n" +
" \"awsRegion\":\"us-west-2\",\n" +
" \"eventTime\":\"1970-01-01T00:00:00.000Z\",\n" +
" \"eventName\":\"ObjectCreated:Put\",\n" +
" \"userIdentity\":{\n" +
" \"principalId\":\"AIDAJDPLRKLG7UEXAMPLE\"\n" +
" },\n" +
" \"requestParameters\":{\n" +
" \"sourceIPAddress\":\"127.0.0.1\"\n" +
" },\n" +
" \"responseElements\":{\n" +
" \"x-amz-request-id\":\"C3D13FE58DE4C810\",\n" +
" \"x-amz-id-2\":\"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD\"\n" +
" },\n" +
" \"s3\":{\n" +
" \"s3SchemaVersion\":\"1.0\",\n" +
" \"configurationId\":\"testConfigRule\",\n" +
" \"bucket\":{\n" +
" \"name\":\"sourcebucket\",\n" +
" \"ownerIdentity\":{\n" +
" \"principalId\":\"A3NL1KOZZKExample\"\n" +
" },\n" +
" \"arn\":\"arn:aws:s3:::sourcebucket\"\n" +
" },\n" +
" \"object\":{\n" +
" \"key\":\"HappyFace.jpg\",\n" +
" \"size\":1024,\n" +
" \"eTag\":\"d41d8cd98f00b204e9800998ecf8427e\",\n" +
" \"versionId\":\"096fKKXTRTtl3on89fVO.nfljtsv6qko\"\n" +
" }\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";

@Test
public void testParse() throws Exception {
ObjectMapper mapper = new ObjectMapper();
S3EventInputReader reader = new S3EventInputReader(mapper);
ByteArrayInputStream is = new ByteArrayInputStream(json.getBytes());
S3Event event = reader.readValue(is);
Assertions.assertEquals(1, event.getRecords().size());
S3EventNotification.S3EventNotificationRecord record = event.getRecords().get(0);
Assertions.assertEquals("2.0", record.getEventVersion());
Assertions.assertEquals("aws:s3", record.getEventSource());
Assertions.assertEquals("us-west-2", record.getAwsRegion());
Assertions.assertEquals("1970-01-01T00:00:00.000Z", record.getEventTime().toString());
Assertions.assertEquals("ObjectCreated:Put", record.getEventName());
Assertions.assertEquals("AIDAJDPLRKLG7UEXAMPLE", record.getUserIdentity().getPrincipalId());
Assertions.assertEquals("127.0.0.1", record.getRequestParameters().getSourceIPAddress());
Assertions.assertEquals("C3D13FE58DE4C810", record.getResponseElements().getxAmzRequestId());
Assertions.assertEquals("FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD",
record.getResponseElements().getxAmzId2());

Assertions.assertEquals("1.0", record.getS3().getS3SchemaVersion());
Assertions.assertEquals("testConfigRule", record.getS3().getConfigurationId());
Assertions.assertEquals("sourcebucket", record.getS3().getBucket().getName());
Assertions.assertEquals("arn:aws:s3:::sourcebucket", record.getS3().getBucket().getArn());
Assertions.assertEquals("A3NL1KOZZKExample", record.getS3().getBucket().getOwnerIdentity().getPrincipalId());
Assertions.assertEquals("HappyFace.jpg", record.getS3().getObject().getKey());
Assertions.assertEquals(1024, record.getS3().getObject().getSizeAsLong());
Assertions.assertEquals("d41d8cd98f00b204e9800998ecf8427e", record.getS3().getObject().geteTag());
Assertions.assertEquals("096fKKXTRTtl3on89fVO.nfljtsv6qko", record.getS3().getObject().getVersionId());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
import io.quarkus.amazon.lambda.runtime.AbstractLambdaPollLoop;
import io.quarkus.amazon.lambda.runtime.AmazonLambdaContext;
import io.quarkus.amazon.lambda.runtime.AmazonLambdaMapperRecorder;
import io.quarkus.amazon.lambda.runtime.JacksonInputReader;
import io.quarkus.amazon.lambda.runtime.JacksonOutputWriter;
import io.quarkus.amazon.lambda.runtime.LambdaInputReader;
import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter;
import io.quarkus.arc.ManagedContext;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.funqy.runtime.FunctionConstructor;
Expand All @@ -34,8 +38,8 @@ public class FunqyLambdaBindingRecorder {

private static FunctionInvoker invoker;
private static BeanContainer beanContainer;
private static ObjectReader reader;
private static ObjectWriter writer;
private static LambdaInputReader reader;
private static LambdaOutputWriter writer;

public void init(BeanContainer bc) {
beanContainer = bc;
Expand Down Expand Up @@ -69,10 +73,10 @@ public void chooseInvoker(FunqyConfig config) {
invoker = FunctionRecorder.registry.invokers().iterator().next();
}
if (invoker.hasInput()) {
reader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName());
reader = new JacksonInputReader((ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName()));
}
if (invoker.hasOutput()) {
writer = (ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName());
writer = new JacksonOutputWriter((ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName()));
}

}
Expand Down Expand Up @@ -111,12 +115,12 @@ protected Object processRequest(Object input, AmazonLambdaContext context) throw
}

@Override
protected ObjectReader getInputReader() {
protected LambdaInputReader getInputReader() {
return reader;
}

@Override
protected ObjectWriter getOutputWriter() {
protected LambdaOutputWriter getOutputWriter() {
return writer;
}

Expand Down
Loading

0 comments on commit b092340

Please sign in to comment.