Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration tool #603

Merged
merged 3 commits into from
Nov 9, 2021
Merged

Migration tool #603

merged 3 commits into from
Nov 9, 2021

Conversation

octonato
Copy link
Member

This is a feature branch with initial work done by @Tochemey in #501.

I will keep it as draft as until branch migration-tool is ready to be merged into master.

Co-authored-by: Gabriele Favaretto <[email protected]>
Co-authored-by: Renato Cavalcanti <[email protected]>
@octonato octonato marked this pull request as ready for review October 25, 2021 19:56
@octonato
Copy link
Member Author

Thanks a lot @Tochemey and @gabriele83 for all the efforts here.

#611 is now merged here and we have end to end tests. I will ask around for some extra reviews to get this in master.

We probably want to update the docs explaining how to run the migration. This can be done on another PR (before or after merging this).

@Tochemey
Copy link
Contributor

Tochemey commented Oct 25, 2021

@gabriele83 Nice job. @octonato thanks for the opportunity to contribute.

@AlixBa
Copy link

AlixBa commented Oct 26, 2021

Hello guys,
This works only if you're able to parse old payload in the journal.

Context:
We have an akka-persistence/clustered app that rely on akka.actor.serializer to serialize events in protobuf. We've got it running for some times now and we didn't enforce backward compatibility on all events. If we try to migrate using JournalMigrator provided in this MR, using the same conf the app currently has, it fails because the migrator tries to deserialize old events on the latest protobuf format.

Is that something expected?
As a workaround we've extracted the PersistentMessage using val persistentMessage = PersistentMessage.parseFrom(row.message) and kept the payload as-is (Array[Byte]) to avoid any transformation by the migrator.

Quick & dirty fix we did

    // from migrate()
    .via(Flow[JournalRow].map { row =>
      Try {
        val persistentMessage = PersistentMessage.parseFrom(row.message)
        val repr = PersistentRepr(
          persistentMessage.getPayload,
          persistentMessage.getSequenceNr,
          if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
          if (persistentMessage.hasManifest) persistentMessage.getManifest else Undefined,
          if (persistentMessage.hasDeleted) persistentMessage.getDeleted else false,
          if (persistentMessage.hasSender)
            actorRefResolver.resolveActorRef(persistentMessage.getSender).toClassic
          else Actor.noSender,
          if (persistentMessage.hasWriterUuid) persistentMessage.getWriterUuid else Undefined
        )
        (
          persistentMessage,
          repr,
          row.tags
            .map(_.split(readJournalConfig.pluginConfig.tagSeparator).toSet)
            .getOrElse(Set.empty[String]),
          row.ordering
        )
      }
    })

and

     // from serialize()
      writeTimestamp =
        if (persistentMessage.hasTimestamp) persistentMessage.getTimestamp else repr.timestamp,
      adapterManifest = repr.manifest,
      eventPayload = persistentMessage.getPayload.getPayload.toByteArray,
      eventSerId = persistentMessage.getPayload.getSerializerId,
      eventSerManifest = persistentMessage.getPayload.getPayloadManifest.toStringUtf8,

@Tochemey
Copy link
Contributor

Tochemey commented Oct 27, 2021

Hello guys, This works only if you're able to parse old payload in the journal.

Context: We have an akka-persistence/clustered app that rely on akka.actor.serializer to serialize events in protobuf. We've got it running for some times now and we didn't enforce backward compatibility on all events. If we try to migrate using JournalMigrator provided in this MR, using the same conf the app currently has, it fails because the migrator tries to deserialize old events on the latest protobuf format.

Is that something expected? As a workaround we've extracted the PersistentMessage using val persistentMessage = PersistentMessage.parseFrom(row.message) and kept the payload as-is (Array[Byte]) to avoid any transformation by the migrator.

Quick & dirty fix we did

    // from migrate()
    .via(Flow[JournalRow].map { row =>
      Try {
        val persistentMessage = PersistentMessage.parseFrom(row.message)
        val repr = PersistentRepr(
          persistentMessage.getPayload,
          persistentMessage.getSequenceNr,
          if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
          if (persistentMessage.hasManifest) persistentMessage.getManifest else Undefined,
          if (persistentMessage.hasDeleted) persistentMessage.getDeleted else false,
          if (persistentMessage.hasSender)
            actorRefResolver.resolveActorRef(persistentMessage.getSender).toClassic
          else Actor.noSender,
          if (persistentMessage.hasWriterUuid) persistentMessage.getWriterUuid else Undefined
        )
        (
          persistentMessage,
          repr,
          row.tags
            .map(_.split(readJournalConfig.pluginConfig.tagSeparator).toSet)
            .getOrElse(Set.empty[String]),
          row.ordering
        )
      }
    })

and

     // from serialize()
      writeTimestamp =
        if (persistentMessage.hasTimestamp) persistentMessage.getTimestamp else repr.timestamp,
      adapterManifest = repr.manifest,
      eventPayload = persistentMessage.getPayload.getPayload.toByteArray,
      eventSerId = persistentMessage.getPayload.getSerializerId,
      eventSerManifest = persistentMessage.getPayload.getPayloadManifest.toStringUtf8,

Hello @AlixBa.

I would like to know whether you have changed the proto definitions of your events. Also if you are using the same serializer it should be good.

Also bear in mind that this will only work with old schema(prior to 5.0.0). If you are in the new schema it will not work.

@octonato
Copy link
Member Author

@AlixBa, the migrator was not designed to fix a broken journal. The goal is to move from old schema to the new.

I think the workaround is what you did already, ie: build a custom version of the migrator.

On a side note, a more generic journal migration/clean-up tool will be very useful, but out of scope for now. Can be something to add later.

@gabriele83
Copy link
Contributor

@Tochemey no, no one has changed anything, the behavior did not change. We do not do event transformations, also because it is not the purpose of a migration tool I would say.

@octonato
Copy link
Member Author

@AlixBa, I realise that I didn't fully answer your question.

So, yes. It's expected that it deserialize all the events. The reason for that is the old schema was persisting the events wrapped with an internal class called PersistentRepr. The migrator removes it, but it needs to deserialize it, take the real event payload inside it and serialize it back.

Your solution is actually putting back the wrapper in the new schema. That's not that bad, but we won't recommend it. PersistentRepr is an internal akka persistence class and although it's very stable, its format can change in the future.
That's why, the migration, takes the opportunity to unwrap it.

If you can't read back your events during migration, you won't be able to read it in prod either. You probably don't have issues because you have been snapshotting. That said, since you can't deserialize them, I think you are better off if you don't migrate those events. You can't read them anyway.

That will allow you to build a new journal with only events you can read and without the PersistentRepr wrapper.

@AlixBa
Copy link

AlixBa commented Oct 28, 2021

I'm not sure I understand @octonato.

I got the fact that you deserialize both the PersitentRepr and the underlying payload. I just don't get the why it needs to deserialize the underlying payload as it's the part you're extracting to a new column in the new schema. Am I wrong?

What I've done, if I'm right, is that I asked to get the payload out of the PersistentRepr without deserializing it so I'd be able to migrate "broken" events. (because yes, they're broken/unreadable for now).

But anyway, if the migrator isn't expected to work on this use case, then fine for me!

@octonato
Copy link
Member Author

octonato commented Nov 8, 2021

@AlixBa, when we deserialize PersistentRepr, the payload gets automatically deserialized. That's how akka.persistence.serialization.MessageSerializer does it and we are just re-using this serializer.
But I agree that strictly speaking we don't need to deserialize the payload.

On the other hand, it's a good sanity check as it reveals if the journal is readable or not.

I'm not sure if the best solution is to make the migration oblivious about the payload. It's good to just rely on the existing serializer so at least we are sure that we are reading the data in the same way it is expected to be read in a production environment.

Eventually, we could provide two migration method. One using the MessageSerializer by default and one that the user could pass its own flow. Then people could eventually drop events, transform, etc.

@octonato
Copy link
Member Author

octonato commented Nov 9, 2021

I think this is ready for merge.

@patriknw, could you give it a review? I can't approve it myself because I created this feature branch.

I few things to note about it:

  • We don't track the progress and restart from last processed (like in the R2DBC one). Can be added on follow-up PR
  • docs are missing, but can also be added on follow-up PR
  • it goes over all events, not per persistence id (like in the R2DBC one) and in parallel. Again, an improvement for a follow-up PR.

The follow-up PRs I'm suggesting are a nice to have, but the migrator as it is can already be used in most cases.

Of course, the documentation PR is not a nice to have, but a must. However, if we want to migrate per persistence id and with tracking table, we want to wait with the documentation.

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (reviewed briefly)
Thanks to the contributors!

@octonato octonato merged commit c56753d into master Nov 9, 2021
@octonato octonato deleted the migration-tool branch November 9, 2021 20:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants