Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support S3Event aws lambda #13608

Merged
merged 1 commit into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;

import io.quarkus.runtime.Application;
import io.quarkus.runtime.ShutdownContext;
Expand Down Expand Up @@ -144,9 +143,9 @@ public void run() {
protected abstract void processRequest(InputStream input, OutputStream output, AmazonLambdaContext context)
throws Exception;

protected abstract ObjectReader getInputReader();
protected abstract LambdaInputReader getInputReader();

protected abstract ObjectWriter getOutputWriter();
protected abstract LambdaOutputWriter getOutputWriter();

protected AmazonLambdaContext createContext(HttpURLConnection requestConnection) throws IOException {
return new AmazonLambdaContext(requestConnection, cognitoIdReader, clientCtxReader);
Expand Down Expand Up @@ -202,4 +201,4 @@ boolean abortGracefully(Exception ex) {
return graceful;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.IOException;
import java.io.InputStream;

import com.fasterxml.jackson.databind.ObjectReader;

public class JacksonInputReader implements LambdaInputReader {
final private ObjectReader reader;

public JacksonInputReader(ObjectReader reader) {
this.reader = reader;
}

@Override
public Object readValue(InputStream is) throws IOException {
return reader.readValue(is);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.IOException;
import java.io.OutputStream;

import com.fasterxml.jackson.databind.ObjectWriter;

public class JacksonOutputWriter implements LambdaOutputWriter {
final private ObjectWriter writer;

public JacksonOutputWriter(ObjectWriter writer) {
this.writer = writer;
}

@Override
public void writeValue(OutputStream os, Object obj) throws IOException {
writer.writeValue(os, obj);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.IOException;
import java.io.InputStream;

public interface LambdaInputReader<T> {
T readValue(InputStream is) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.IOException;
import java.io.OutputStream;

public interface LambdaOutputWriter {
void writeValue(OutputStream os, Object obj) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;

import io.quarkus.amazon.lambda.runtime.handlers.S3EventInputReader;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
Expand All @@ -32,8 +32,8 @@ public class AmazonLambdaRecorder {
private static Class<? extends RequestHandler<?, ?>> handlerClass;
private static Class<? extends RequestStreamHandler> streamHandlerClass;
private static BeanContainer beanContainer;
private static ObjectReader objectReader;
private static ObjectWriter objectWriter;
private static LambdaInputReader objectReader;
private static LambdaOutputWriter objectWriter;

public void setStreamHandlerClass(Class<? extends RequestStreamHandler> handler, BeanContainer container) {
streamHandlerClass = handler;
Expand All @@ -45,8 +45,12 @@ public void setHandlerClass(Class<? extends RequestHandler<?, ?>> handler, BeanC
beanContainer = container;
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper;
Method handlerMethod = discoverHandlerMethod(handlerClass);
objectReader = objectMapper.readerFor(handlerMethod.getParameterTypes()[0]);
objectWriter = objectMapper.writerFor(handlerMethod.getReturnType());
if (handlerMethod.getParameterTypes()[0].equals(S3Event.class)) {
objectReader = new S3EventInputReader(objectMapper);
} else {
objectReader = new JacksonInputReader(objectMapper.readerFor(handlerMethod.getParameterTypes()[0]));
}
objectWriter = new JacksonOutputWriter(objectMapper.writerFor(handlerMethod.getReturnType()));
}

/**
Expand Down Expand Up @@ -149,12 +153,12 @@ protected Object processRequest(Object input, AmazonLambdaContext context) throw
}

@Override
protected ObjectReader getInputReader() {
protected LambdaInputReader getInputReader() {
return objectReader;
}

@Override
protected ObjectWriter getOutputWriter() {
protected LambdaOutputWriter getOutputWriter() {
return objectWriter;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.amazon.lambda.runtime.handlers;

import com.fasterxml.jackson.databind.JsonNode;

public class JacksonUtil {
public static String getText(String name, JsonNode node) {
JsonNode e = node.get(name);
return e == null ? null : e.asText();
}

public static Long getLong(String name, JsonNode node) {
JsonNode e = node.get(name);
return e == null ? null : e.asLong();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.quarkus.amazon.lambda.runtime.handlers;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.models.s3.S3EventNotification;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.amazon.lambda.runtime.LambdaInputReader;

public class S3EventInputReader implements LambdaInputReader<S3Event> {
final ObjectMapper mapper;

public S3EventInputReader(ObjectMapper mapper) {
this.mapper = mapper;
}

@Override
public S3Event readValue(InputStream is) throws IOException {
JsonNode json = mapper.readTree(is);
JsonNode records = json.get("Records");
if (records == null || !records.isArray()) {
return new S3Event(Collections.EMPTY_LIST);
}
List<S3EventNotification.S3EventNotificationRecord> list = new ArrayList<>();

for (int i = 0; i < records.size(); i++) {
JsonNode record = records.get(i);
String awsRegion = JacksonUtil.getText("awsRegion", record);
String eventName = JacksonUtil.getText("eventName", record);
String eventSource = JacksonUtil.getText("eventSource", record);
String eventTime = JacksonUtil.getText("eventTime", record);
String eventVersion = JacksonUtil.getText("eventVersion", record);
JsonNode params = record.get("requestParameters");
S3EventNotification.RequestParametersEntity requestParameters = null;
if (params != null) {
requestParameters = new S3EventNotification.RequestParametersEntity(
JacksonUtil.getText("sourceIPAddress", params));
}
JsonNode elems = record.get("responseElements");
S3EventNotification.ResponseElementsEntity responseElements = null;
if (elems != null) {
String requestId = JacksonUtil.getText("x-amz-request-id", elems);
String id = JacksonUtil.getText("x-amz-id-2", elems);
responseElements = new S3EventNotification.ResponseElementsEntity(id, requestId);
}
JsonNode userIdentity = record.get("userIdentity");
S3EventNotification.UserIdentityEntity userId = null;
if (userIdentity != null) {
String principalId = JacksonUtil.getText("principalId", userIdentity);
userId = new S3EventNotification.UserIdentityEntity(principalId);
}

JsonNode s3 = record.get("s3");
S3EventNotification.S3Entity s3Entity = null;
if (s3 != null) {
String configurationId = JacksonUtil.getText("configurationId", s3);
String schemaVersion = JacksonUtil.getText("s3SchemaVersion", s3);
JsonNode bucketNode = s3.get("bucket");
S3EventNotification.S3BucketEntity bucket = null;
if (bucketNode != null) {
String name = JacksonUtil.getText("name", bucketNode);
JsonNode ownerIdentity = bucketNode.get("ownerIdentity");
S3EventNotification.UserIdentityEntity owner = null;
if (ownerIdentity != null) {
String principalId = JacksonUtil.getText("principalId", ownerIdentity);
owner = new S3EventNotification.UserIdentityEntity(principalId);
}
String arn = JacksonUtil.getText("arn", bucketNode);
bucket = new S3EventNotification.S3BucketEntity(name, owner, arn);
}
JsonNode object = s3.get("object");
S3EventNotification.S3ObjectEntity obj = null;
if (object != null) {
String key = JacksonUtil.getText("key", object);
Long size = JacksonUtil.getLong("size", object);
String eTag = JacksonUtil.getText("eTag", object);
String versionId = JacksonUtil.getText("versionId", object);
String sequencer = JacksonUtil.getText("sequencer", object);
obj = new S3EventNotification.S3ObjectEntity(key, size, eTag, versionId, sequencer);
}
s3Entity = new S3EventNotification.S3Entity(configurationId, bucket, obj, schemaVersion);
}
S3EventNotification.S3EventNotificationRecord r = new S3EventNotification.S3EventNotificationRecord(awsRegion,
eventName, eventSource, eventTime,
eventVersion, requestParameters,
responseElements, s3Entity, userId);
list.add(r);
}
return new S3Event(list);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.quarkus.amazon.lambda.runtime;

import java.io.ByteArrayInputStream;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.models.s3.S3EventNotification;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.amazon.lambda.runtime.handlers.S3EventInputReader;

public class S3EventTest {

static String json = "{\n" +
" \"Records\":[\n" +
" {\n" +
" \"eventVersion\":\"2.0\",\n" +
" \"eventSource\":\"aws:s3\",\n" +
" \"awsRegion\":\"us-west-2\",\n" +
" \"eventTime\":\"1970-01-01T00:00:00.000Z\",\n" +
" \"eventName\":\"ObjectCreated:Put\",\n" +
" \"userIdentity\":{\n" +
" \"principalId\":\"AIDAJDPLRKLG7UEXAMPLE\"\n" +
" },\n" +
" \"requestParameters\":{\n" +
" \"sourceIPAddress\":\"127.0.0.1\"\n" +
" },\n" +
" \"responseElements\":{\n" +
" \"x-amz-request-id\":\"C3D13FE58DE4C810\",\n" +
" \"x-amz-id-2\":\"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD\"\n" +
" },\n" +
" \"s3\":{\n" +
" \"s3SchemaVersion\":\"1.0\",\n" +
" \"configurationId\":\"testConfigRule\",\n" +
" \"bucket\":{\n" +
" \"name\":\"sourcebucket\",\n" +
" \"ownerIdentity\":{\n" +
" \"principalId\":\"A3NL1KOZZKExample\"\n" +
" },\n" +
" \"arn\":\"arn:aws:s3:::sourcebucket\"\n" +
" },\n" +
" \"object\":{\n" +
" \"key\":\"HappyFace.jpg\",\n" +
" \"size\":1024,\n" +
" \"eTag\":\"d41d8cd98f00b204e9800998ecf8427e\",\n" +
" \"versionId\":\"096fKKXTRTtl3on89fVO.nfljtsv6qko\"\n" +
" }\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";

@Test
public void testParse() throws Exception {
ObjectMapper mapper = new ObjectMapper();
S3EventInputReader reader = new S3EventInputReader(mapper);
ByteArrayInputStream is = new ByteArrayInputStream(json.getBytes());
S3Event event = reader.readValue(is);
Assertions.assertEquals(1, event.getRecords().size());
S3EventNotification.S3EventNotificationRecord record = event.getRecords().get(0);
Assertions.assertEquals("2.0", record.getEventVersion());
Assertions.assertEquals("aws:s3", record.getEventSource());
Assertions.assertEquals("us-west-2", record.getAwsRegion());
Assertions.assertEquals("1970-01-01T00:00:00.000Z", record.getEventTime().toString());
Assertions.assertEquals("ObjectCreated:Put", record.getEventName());
Assertions.assertEquals("AIDAJDPLRKLG7UEXAMPLE", record.getUserIdentity().getPrincipalId());
Assertions.assertEquals("127.0.0.1", record.getRequestParameters().getSourceIPAddress());
Assertions.assertEquals("C3D13FE58DE4C810", record.getResponseElements().getxAmzRequestId());
Assertions.assertEquals("FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD",
record.getResponseElements().getxAmzId2());

Assertions.assertEquals("1.0", record.getS3().getS3SchemaVersion());
Assertions.assertEquals("testConfigRule", record.getS3().getConfigurationId());
Assertions.assertEquals("sourcebucket", record.getS3().getBucket().getName());
Assertions.assertEquals("arn:aws:s3:::sourcebucket", record.getS3().getBucket().getArn());
Assertions.assertEquals("A3NL1KOZZKExample", record.getS3().getBucket().getOwnerIdentity().getPrincipalId());
Assertions.assertEquals("HappyFace.jpg", record.getS3().getObject().getKey());
Assertions.assertEquals(1024, record.getS3().getObject().getSizeAsLong());
Assertions.assertEquals("d41d8cd98f00b204e9800998ecf8427e", record.getS3().getObject().geteTag());
Assertions.assertEquals("096fKKXTRTtl3on89fVO.nfljtsv6qko", record.getS3().getObject().getVersionId());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
import io.quarkus.amazon.lambda.runtime.AbstractLambdaPollLoop;
import io.quarkus.amazon.lambda.runtime.AmazonLambdaContext;
import io.quarkus.amazon.lambda.runtime.AmazonLambdaMapperRecorder;
import io.quarkus.amazon.lambda.runtime.JacksonInputReader;
import io.quarkus.amazon.lambda.runtime.JacksonOutputWriter;
import io.quarkus.amazon.lambda.runtime.LambdaInputReader;
import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter;
import io.quarkus.arc.ManagedContext;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.funqy.runtime.FunctionConstructor;
Expand All @@ -34,8 +38,8 @@ public class FunqyLambdaBindingRecorder {

private static FunctionInvoker invoker;
private static BeanContainer beanContainer;
private static ObjectReader reader;
private static ObjectWriter writer;
private static LambdaInputReader reader;
private static LambdaOutputWriter writer;

public void init(BeanContainer bc) {
beanContainer = bc;
Expand Down Expand Up @@ -69,10 +73,10 @@ public void chooseInvoker(FunqyConfig config) {
invoker = FunctionRecorder.registry.invokers().iterator().next();
}
if (invoker.hasInput()) {
reader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName());
reader = new JacksonInputReader((ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName()));
}
if (invoker.hasOutput()) {
writer = (ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName());
writer = new JacksonOutputWriter((ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName()));
}

}
Expand Down Expand Up @@ -111,12 +115,12 @@ protected Object processRequest(Object input, AmazonLambdaContext context) throw
}

@Override
protected ObjectReader getInputReader() {
protected LambdaInputReader getInputReader() {
return reader;
}

@Override
protected ObjectWriter getOutputWriter() {
protected LambdaOutputWriter getOutputWriter() {
return writer;
}

Expand Down
Loading