Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]: Add stream context to request authenticator #1028

Closed
mchudoba opened this issue Oct 3, 2022 · 6 comments
Closed

[Feature]: Add stream context to request authenticator #1028

mchudoba opened this issue Oct 3, 2022 · 6 comments
Labels

Comments

@mchudoba
Copy link

mchudoba commented Oct 3, 2022

Feature scope

Taps (catalog, state, stream maps, etc.)

Description

I'm proposing adding the stream/partition context when creating an authenticator in REST streams. We have a use case where we would like to use partitions within the same stream run to extract data for multiple clients, which each require different credentials. We would like to put those credentials in the stream partition and access them when creating the authenticator object.

The SDK already supports passing the context object when building the request url, parameters, and body, so this would add it to another part of the request.

Proposal

Current authenticator property:

@property
def authenticator(self) -> APIAuthenticatorBase | None:
    ...

Add/replace with:

def get_authenticator(self, context: dict | None) -> APIAuthenticatorBase | None:
    ...

Call self.get_authenticator(context) instead of self.authenticator inside of RESTStream.build_prepared_request

Questions

If accepted, my only question would be whether or not to deprecate the authenticator property in favor of get_authenticator or to support both. By default get_authenticator can just return self.authenticator.

@edgarrmondragon
Copy link
Collaborator

edgarrmondragon commented Oct 7, 2022

Hi @mchudoba, thanks for logging!

We would like to put those credentials in the stream partition and access them when creating the authenticator object.

It's generally not safe to put credentials in the partition context because it ends up in a number of potentially insecure places, like the tap state dictionary and as a tag in metric logs.

We have a use case where we would like to use partitions within the same stream run to extract data for multiple clients, which each require different credentials.

Is within the same stream run a hard requirement? Because I can think of a solution by instantiating multiple instances of the stream class with different configurations:

class MyStream(RESTStream):
  def __init__(self, client_creds, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.client_creds = client_creds

  @property
  def authenticator(self):
    return MyAuthenticator(self.client_creds)
    

class MyTap(Tap):
  def discover_streams(self):
    return [
      MyStream(client_creds, self)
      for client_creds in self.config.get("client_creds", [])
    ]

Still, the proposal might be interesting for other folks too so I've added it to our Office Hours board 🙂

@mchudoba
Copy link
Author

@edgarrmondragon Thank you for the suggestion! Great point about storing secrets in the context, I didn't fully think that through. An alternate approach that could work would be to instead store a client_id in the partition context and look up the credentials in the config instead.

Something like this:

def get_authenticator(context):
    client_id = context["client_id"]
    creds = self.config["client_creds_map"].get(client_id)
    
    return MyAuthenticator(creds)

However, your suggestion about using multiple stream instances is really interesting and might do exactly what I need. Running within the same stream run is definitely not a hard requirement, so I'll experiment with that and see if it works for me.

How does Meltano send data from multiple instances of the same stream to its target in one stream run? Does it batch data across instances? Essentially I'm trying to send larger, less frequent batches of data to our Postgres target.

@edgarrmondragon
Copy link
Collaborator

Running within the same stream run is definitely not a hard requirement, so I'll experiment with that and see if it works for me.

👍

How does Meltano send data from multiple instances of the same stream to its target in one stream run? Does it batch data across instances? Essentially I'm trying to send larger, less frequent batches of data to our Postgres target.

@mchudoba Each instance will emit a schema message at the start of the sync, which for most targets will trigger a drain (which can be more or less costly, depending on the target). So, if you have a lot of partitions, with few records in each, the loading process could be inefficient.

In my opinion, targets should be able to identify if the schema had no changes so it does not need to drain its current record batch. No targets are implemented like that afaict, though.

cc @kgpayne @aaronsteers this 👆 is something that could be interesting/useful to have for targets.

@aaronsteers
Copy link
Contributor

aaronsteers commented Oct 11, 2022

In my opinion, targets should be able to identify if the schema had no changes so it does not need to drain its current record batch. No targets are implemented like that afaict, though.

We actually did intend to implement exactly this check (and also a check on key-properties) within the SDK for targets:

A new sink will be created if schema is provided and if either schema or
key_properties has changed. If so, the old sink becomes archived and held
until the next drain_all() operation.

https://github.com/meltano/sdk/blob/main/singer_sdk/target_base.py#L131-L133

If this is not working as expected for SDK targets (meaning, if an identical schema message is sent and it causes the Sink to drain), I think we would want to log as a defect. The conditional is here:

https://github.com/meltano/sdk/blob/main/singer_sdk/target_base.py#L160-L171

@aaronsteers
Copy link
Contributor

Logged related, because I think SDK-based taps may be noisy/ineficient for non-SDK targets:

And also, I don't think this is related, but I should call out (just in case) that we currently have an overloaded meaning of 'batch' which we will resolve and/or merge in future. BatchSink is a subclass of Sink that wants to load its items in bulk. "Batch messages" are a means of taps and targets communicating via sending of files. #963

@aaronsteers aaronsteers moved this from To Discuss to Up Next in Office Hours Oct 19, 2022
@aaronsteers aaronsteers moved this from Up Next to To Discuss in Office Hours Oct 19, 2022
@stale
Copy link

stale bot commented Jul 18, 2023

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

@stale stale bot added the stale label Jul 18, 2023
@edgarrmondragon edgarrmondragon closed this as not planned Won't fix, can't repro, duplicate, stale Jul 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

3 participants