Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13416] Introduce Schema provider for AWS model classes extending SdkPojo #16947

Merged
merged 1 commit into from
Sep 19, 2022

Conversation

mosche
Copy link
Member

@mosche mosche commented Feb 25, 2022

@echauchot @aromanenko-dev Not really a PR (yet), I was just looking to understand the usage of schemas vs coders better.
The (most) interesting part here is probably SdkPojoSchema that works for all the generated AWS models in the SDK v2 (though, in some cases recursive types are used which obviously impose a challenge).

I wasn't able to find a lot of good technical documentation (design decisions) for schemas in Beam, though definitely see the value over low level coders! It seems to make sense to bother users as little as possible with coders and not expose these on any public API (M11). Currently coders are all over the place for AWS IOs, with this it seems feasible to drop them (or deprecate).

Though, I was wondering a couple of things:

  • Looks like I cannot register a provider for an interface (SdkPojo), that means I have to "announce" it globally using an auto service.
  • Is there a way a Sink/ WriteIO can set the coder for it's input like that can be done for a Source on the output? Looks like it's only possible to register such a coder globally, regardless if that sink is used or not.
  • I noticed that a RowCoder cannot be automatically inferred for a row despite every row carrying its schema. That makes sense given runners may use the schema ahead of time for query planning. But it made me wonder if there's any relationship between the schema of the row coder and the runtime schema enforced...

If you have any good resources on the topic, I'd be happy to hear ...


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@mosche
Copy link
Member Author

mosche commented Feb 25, 2022

@pabloem I remember you made related changes on your recent PR to JdbcIO to add support for generic partitioners. I wonder if you have pointers to documentation or simply advice on the topic :)

@aromanenko-dev
Copy link
Contributor

@reuvenlax Could you advise on this please?

@reuvenlax
Copy link
Contributor

Can you explain what SdkPojoSchema is? We already have an optimized Pojo schema provider (JavaFieldSchema)

@mosche
Copy link
Member Author

mosche commented Feb 28, 2022

@reuvenlax JavaFieldSchema doesn't work for AWS models, the sdk v2 uses immutable objects based on builders / final fields. SdkPojo is the common interface of all of these models.

@mosche
Copy link
Member Author

mosche commented Feb 28, 2022

SdkPojoSchema uses the metadata exposed by SdkPojos to read fields and write them via an intermediate builder.

@reuvenlax
Copy link
Contributor

reuvenlax commented Feb 28, 2022 via email

@mosche
Copy link
Member Author

mosche commented Feb 28, 2022

Thanks, I'll have a closer look

@mosche
Copy link
Member Author

mosche commented Mar 2, 2022

@reuvenlax I had a closer look why JavaFieldSchema does not work. There's quite some issues, and this is by far not an exhaustive list:

  • Only public, non-static, non-transient fields are considered. Generated models in the AWS SDK are private, however. Using some kind of Field filter in JavaFieldSchema, that can then be overwritten, this could be addressed. But this obviously also means to generate different FieldValueGetter using ByteBuddy.
  • Next, more involved I suppose, such AWS SDK models contain final fields only, but there's no known creator. The annotation based approach isn't feasible for such "external" classes.
  • An annoyance, but has to be handled as well: These model classes may contain maps with non primitive types. This is not supported. Also, in some cases there might be recursive type hierarchies that can't be supported and have to be dealt with.

Also, I've seen all the optimisations done in JavaFieldSchema and JavaBeanSchema. Honestly, at this point I don't bother much about that yet. I'm more interested to understand what the right approach forward is giving all the AWS IOs currently use hand crafted low level coders.

@mosche
Copy link
Member Author

mosche commented Mar 2, 2022

Also, for context, here's a stripped down example how such a SdkPojo may look like:

public interface SdkPojo {
    List<SdkField<?>> sdkFields();
}

@Generated("software.amazon.awssdk:codegen")
public final class DeleteMessageRequest extends SqsRequest implements SdkPojo {
    private static final SdkField<String> QUEUE_URL_FIELD = SdkField.<String> builder(MarshallingType.STRING)
            .memberName("QueueUrl")
            .getter(DeleteMessageRequest::queueUrl)
            .setter(Builder::queueUrl)
            .build();

    private static final SdkField<String> RECEIPT_HANDLE_FIELD = SdkField.<String> builder(MarshallingType.STRING)
            .memberName("ReceiptHandle")
            .getter(DeleteMessageRequest::receiptHandle)
            .setter(Builder::receiptHandle)
            .build();

    private static final List<SdkField<?>> SDK_FIELDS = Collections.unmodifiableList(Arrays.asList(QUEUE_URL_FIELD,
            RECEIPT_HANDLE_FIELD));

    private final String queueUrl;

    private final String receiptHandle;

    private DeleteMessageRequest(BuilderImpl builder) {
        super(builder);
        this.queueUrl = builder.queueUrl;
        this.receiptHandle = builder.receiptHandle;
    }

    public final String queueUrl() {
        return queueUrl;
    }

    public final String receiptHandle() {
        return receiptHandle;
    }

    public static Builder builder() {
        return new BuilderImpl();
    }

    public final List<SdkField<?>> sdkFields() {
        return SDK_FIELDS;
    }

    public interface Builder extends SdkPojo {
        Builder queueUrl(String queueUrl);

        Builder receiptHandle(String receiptHandle);
    }

    static final class BuilderImpl implements Builder {
        private String queueUrl;

        private String receiptHandle;

        private BuilderImpl() {}

        public final Builder queueUrl(String queueUrl) {
            this.queueUrl = queueUrl;
            return this;
        }

        public final Builder receiptHandle(String receiptHandle) {
            this.receiptHandle = receiptHandle;
            return this;
        }

        public DeleteMessageRequest build() {
            return new DeleteMessageRequest(this);
        }

        public List<SdkField<?>> sdkFields() {
            return SDK_FIELDS;
        }
    }
}

@reuvenlax
Copy link
Contributor

reuvenlax commented Mar 2, 2022 via email

@mosche
Copy link
Member Author

mosche commented Mar 2, 2022

But the getter methods are public, right?

There's public getters of course, but not following the "getX" naming convention.

I've also looked briefly at just extending GetterBasedSchemaProvider, that works really well actually. The read part is rather trivial. The field metadata of each SDK model provides an easy way to read fields (without any reflection involved).

static class SdkFieldValueGetter<ObjectT, ValueT> implements FieldValueGetter<ObjectT, ValueT> {
    private final SdkField<ValueT> field;

    public SdkFieldValueGetter(SdkField<ValueT> field) {
      this.field = field;
    }

    @Override
    @Nullable
    public ValueT get(ObjectT object) {
      return field.getValueOrDefault(object);
    }

    @Override
    public String name() {
      return field.memberName();
    }
  }

Recursive type hierarchies are also definitely support.

Problem with recursive types is the immutable Schema. I'm not aware of any way to generate such a schema for a recursive hierarchy ... same problem exists for FieldValueTypeInformation.
Just verified it quickly, I get a stackoverflow as expected ... so recursive types are definitely not supported.

@reuvenlax
Copy link
Contributor

reuvenlax commented Mar 2, 2022 via email

@reuvenlax
Copy link
Contributor

reuvenlax commented Mar 2, 2022 via email

@mosche mosche force-pushed the BEAM-13416-AwsSchemas branch from 2f09257 to fb0f27f Compare March 4, 2022 19:12
@mosche mosche changed the title [BEAM-13416] WIP [BEAM-13416] Introduce Schema provider for AWS model classes extending SdkPojo Mar 4, 2022
@mosche
Copy link
Member Author

mosche commented Mar 4, 2022

@reuvenlax Trying to implement this on the existing Schema providers, I kept running into various issues. Also RowWithGetters proved to be difficult for this use case.

I introduce a similar wrapper row delegating to an underlying object very similar to RowWithGetters and optimized things a bit. I would be very grateful for your feedback. Though no rush, I'm off for the next week anyways ...

@mosche mosche force-pushed the BEAM-13416-AwsSchemas branch from fb0f27f to c0790f6 Compare March 15, 2022 09:54
@TheNeuralBit TheNeuralBit self-requested a review March 21, 2022 15:43
Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mosche thanks for this! I can help review, I have a few initial questions.

FYI @apilloud (relevant for schema proliferation), and @chamikaramj (could enable cross-language AWS IOs)

/**
* Custom Coder for WrappedSnsResponse.
*
* @deprecated Coder of deprecated {@link SnsResponse}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the deprecation plan? Should users be opting in to using the SchemaCoder based approach, or are we keeping these coders here so that they can opt-out of schemas and use these instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to turn on schema coders by default by removing all "legacy" CoderProviderRegistrars and instead adding the generalSchemaProviderRegistrar for all AWS models. Users can opt-out using schemas using the deprecated withCoder setting.

@aromanenko-dev
Copy link
Contributor

Ping. What is a state of this PR? What are the next steps?

@mosche
Copy link
Member Author

mosche commented Apr 27, 2022

This is blocked / waiting for #17172

@TheNeuralBit
Copy link
Member

Hi @mosche does this need a rebase or anything now that #17172 is merged, or is it ready to review?

@mosche
Copy link
Member Author

mosche commented Jun 20, 2022

Just wrapping up some other work, then I'll pick this up again 👍 There's a bit of work to be done to rebase this onto master.
Thanks for checking in @TheNeuralBit .

@mosche mosche force-pushed the BEAM-13416-AwsSchemas branch from c0790f6 to 2c189b1 Compare June 24, 2022 16:06
@mosche
Copy link
Member Author

mosche commented Jun 27, 2022

@TheNeuralBit I've migrated this PR onto GetterBasedSchemaProvider and RowWithGetters to take advantage of changes in #17172.
I also decided to remove support for self-recursive AWS models (e.g. DynamoDB's AttributeValue) for this initial version. Supporting that is a bit problematic with Beam schemas. In such a case a "traditional" coder is still required.

@TheNeuralBit
Copy link
Member

retest this please

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few high-level comments and nits, this looks good overall. I haven't been able to look at the implementation closely though and probably won't be able to before I leave for vacation at the end of today. If you'd like to keep making progress on this while I'm gone maybe @reuvenlax or @lukecwik could take a look.

*
* <p>Note: Beam doesn't support self-recursive schemas. Some AWS models, such as {@link
* software.amazon.awssdk.services.dynamodb.model.AttributeValue} are self-recursive and cannot be
* used with this schema provider.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Do we recommend continuing to use AwsCoders for this case?
  • It could also be helpful to link to the issue tracking recursive schemas.
  • nit: I'd say either "recursive" or "self-referential" instead of "self-recursive"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed to self-referential 👍 DynamoDB AttributeValue is the only self-referential type I'm aware of and has a dedicated coder independent of AwsCoders.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lack of support of self-referential types in Beam was apparently by design, so there's no such issue. My initial idea for AWS was to "fake" it by just skipping over self-referential fields at some depth. But that is obviously not correct and potentially dangerous as it might silently ignore some data.

import software.amazon.awssdk.services.sqs.model.Message;

/** Custom Coder for handling SendMessageRequest for using in Write. */
public class MessageCoder extends AtomicCoder<Message> implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is public and not @Internal so technically we shouldn't just remove it outright. That being said this and SendMessageRequestCoder, MessageCoder are pretty clearly internal coders for use in their respective IOs, so it's probably fine.

I'll leave it up to you - if you want you could mark these coders deprecated and schedule them for removal in a later release.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right pointing this out 👍 Considering MessageCoder.of() / SendMessageRequestCoder.of() are both package private I think direct removal is justifiable in this case.

}

private static byte[] toBytes(Object sdkBytes) {
return ((SdkBytes) sdkBytes).asByteArrayUnsafe(); // TODO copy or use unsafe?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these TODOs something we can address now, or should there be an issue tracking it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look at what GetterBasedSchemaProvider does for byte[] / ByteBuffer. It's exposing the mutable bytes as well, so I'll keep the unsafe operations here. 👍

* sdkHttpMetadata with the HTTP response headers.
*
* @deprecated Writes fail fast in case of errors, no need to check headers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Is this a change in behavior that we need to communicate to users?
  • Will these functions be removed later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a clarification why there's no need for a coder that includes HTTP response metadata, there's no change in behavior.
Plans is to remove these after a few releases.

return str.getBytes(UTF_8);
}

private <T> Condition<Row> field(String name, T value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handy, maybe it should be in SchemaTestUtils for discoverability?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code depends on assertj-core and probably doesn't fit into SchemaTestUtils for that reason.
I could rewrite it as Matcher if you think it's worth it,e.g.:

public static Matcher<Row> containsFieldValues(Map<String, Object> expectedFieldValues) 

import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

public class AwsSchemaProviderTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this exercise all the types in the mapping from AwsTypes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact not, thanks for checking ... I'll make sure to add more test cases to cover everything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done ✅

@TheNeuralBit
Copy link
Member

retest this please

@TheNeuralBit
Copy link
Member

Running tests to get a run of the newly forked integration tests (Java_Amazon-Web-Services2_IO_Direct)

@mosche
Copy link
Member Author

mosche commented Jul 12, 2022

Thanks a lot for your review @TheNeuralBit 🙏
Enjoy your time off!

@mosche mosche force-pushed the BEAM-13416-AwsSchemas branch from 2a95375 to 38d9641 Compare September 5, 2022 12:55
@mosche
Copy link
Member Author

mosche commented Sep 5, 2022

@TheNeuralBit Could you take a final look, I just rebased to account for the byte buddy changes on master ...

@mosche
Copy link
Member Author

mosche commented Sep 19, 2022

@TheNeuralBit Kind ping, are you ok merging this?

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

You might consider creating an issue for removing the deprecated functionality, and linking it to a future release milestone. That will help us remember to actually remove it.

@TheNeuralBit
Copy link
Member

Sorry for the delay

@mosche
Copy link
Member Author

mosche commented Sep 21, 2022

Thanks a lot @TheNeuralBit :)

@mosche mosche deleted the BEAM-13416-AwsSchemas branch September 21, 2022 08:31
@mosche mosche mentioned this pull request May 16, 2023
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants