-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: sync storage manager #2561
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General direction looks good but I hate strings...
See my other style comments.
You can find the image built from this PR at
Built from 568c3af |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good direction!
#TODO: Do we need to check if this message has already been ingessed? | ||
# because what if messages is received via gossip and sync as well? | ||
# Might 2 entries to be inserted into storage which is inefficient. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The system should be idempotent, so i would say yes. Is the negentropy storage insert func not idempotent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Negentropy doesn't seem to be, because I wrote a small test to verify and it shows as a diff. Hence this needs to be handled separately at higher layer.
|
||
proc sync*( | ||
self: WakuSync | ||
): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async, gcsafe.} = | ||
let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr: | ||
return err("No suitable peer found for sync") | ||
|
||
let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not more concise this way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is concise but leading to a bug where the handler gets invoked twice. i referred the issue in the pr description.
self.periodicSyncFut = self.periodicSync() | ||
|
||
proc stopWait*(self: WakuSync) {.async.} = | ||
await self.periodicSyncFut.cancelAndWait() | ||
|
||
proc storageSize*(self: WakuSync): int = | ||
return self.storage.size() | ||
#[ TODO:Fetch from storageManager?? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would make sense to me.
type SyncSessionType* = enum | ||
CLIENT = 1 | ||
SERVER = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect more session type in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so
Session State Machine | ||
1. negotiate sync params | ||
2. start negentropy sync | ||
3. find out local needhashes | ||
4. If client, share peer's needhashes to peer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love state machine!
self: SyncSession, conn: Connection, storageMgr: WakuSyncStorageManager | ||
) {.async, gcsafe.} = | ||
#TODO: Find matching storage based on sync range and continue?? | ||
#TODO: Return error rather than closing stream abruptly? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding standard error code and desc. to the payload would be the way I think
Description
Introducing storage manager .
Changes
Implementing sync protocol flow in Sync session as per waku-org/research#80 (comment) can be taken up in a separate PR.
cc @SionoiS , I don't think i had time to start on this. Maybe you can take it up from here after merging this PR.