Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add awareness features to handle server state #170

Merged
merged 21 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Awareness
- Decoder
- Doc
- Encoder
- Map
- MapEvent
- NewTransaction
Expand Down
1 change: 1 addition & 0 deletions python/pycrdt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ._pycrdt import Subscription as Subscription
from ._pycrdt import TransactionEvent as TransactionEvent
from ._sync import Decoder as Decoder
from ._sync import Encoder as Encoder
from ._sync import YMessageType as YMessageType
from ._sync import YSyncMessageType as YSyncMessageType
from ._sync import create_sync_message as create_sync_message
Expand Down
294 changes: 118 additions & 176 deletions python/pycrdt/_awareness.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@

import json
import time
from typing import Any, Callable
from typing import Any, Callable, cast
from uuid import uuid4

from typing_extensions import deprecated

from ._doc import Doc
from ._sync import Decoder, YMessageType, read_message, write_var_uint
from ._sync import Decoder, Encoder


class Awareness:
client_id: int
_meta: dict[int, dict[str, Any]]
_states: dict[int, dict[str, Any]]
_subscriptions: dict[str, Callable[[dict[str, Any]], None]]
_subscriptions: dict[str, Callable[[str, tuple[dict[str, Any], Any]], None]]

def __init__(self, ydoc: Doc):
"""
Expand All @@ -25,8 +23,8 @@ def __init__(self, ydoc: Doc):
self.client_id = ydoc.client_id
self._meta = {}
self._states = {}

self._subscriptions = {}
self.set_local_state({})

@property
def meta(self) -> dict[int, dict[str, Any]]:
Expand All @@ -38,164 +36,148 @@ def states(self) -> dict[int, dict[str, Any]]:
"""The client states."""
return self._states

@deprecated("Use `apply_awareness_update()` instead")
def get_changes(self, message: bytes) -> dict[str, Any]:
def get_local_state(self) -> dict[str, Any] | None:
"""
Apply states update and sends the changes to subscribers.

Args:
message: The binary changes.

Returns:
A dictionary summarizing the changes.
"""
changes = self.apply_awareness_update(message, "remote")
states_changes = changes["changes"]
client_ids = [*states_changes["added"], *states_changes["filtered_updated"]]
states = [self._states[client_id] for client_id in client_ids]
states_changes["states"] = states
return states_changes

def apply_awareness_update(self, update: bytes, origin: str) -> dict[str, Any]:
The local state, if any.
"""
Apply states update and sends the changes to subscribers.

Args:
message: The binary changes.
origin: The origin of the change.
return self._states.get(self.client_id)

Returns:
A dictionary with the changes and the origin.
"""
update = read_message(update)
decoder = Decoder(update)
states = []
length = decoder.read_var_uint()
states_changes: dict[str, list[int]] = {
"added": [],
"updated": [],
"filtered_updated": [],
"removed": [],
}

for _ in range(length):
client_id = decoder.read_var_uint()
clock = decoder.read_var_uint()
state_str = decoder.read_var_string()
state = None if not state_str else json.loads(state_str)
if state is not None:
states.append(state)
self._update_states(client_id, clock, state, states_changes)

changes = {
"changes": states_changes,
"origin": origin,
}

# Do not trigger the callbacks if it is only a keep alive update
if (
states_changes["added"]
or states_changes["filtered_updated"]
or states_changes["removed"]
):
for callback in self._subscriptions.values():
callback(changes)

return changes

def get_local_state(self) -> dict[str, Any]:
"""
Returns:
The local state.
"""
return self._states.get(self.client_id, {})

def set_local_state(self, state: dict[str, Any]) -> dict[str, Any]:
def set_local_state(self, state: dict[str, Any] | None) -> None:
"""
Updates the local state and meta, and sends the changes to subscribers.

Args:
state: The new local state.

Returns:
A dictionary with the changes and the origin (="local").
state: The new local state, if any.
"""
clock = self._meta.get(self.client_id, {}).get("clock", 0) + 1
states_changes: dict[str, list[int]] = {
"added": [],
"updated": [],
"filtered_updated": [],
"removed": [],
}
self._update_states(self.client_id, clock, state, states_changes)

changes = {
"changes": states_changes,
"origin": "local",
}

if (
states_changes["added"]
or states_changes["filtered_updated"]
or states_changes["removed"]
):
client_id = self.client_id
curr_local_meta = self._meta.get(client_id)
clock = 0 if curr_local_meta is None else curr_local_meta["clock"] + 1
prev_state = self._states.get(client_id)
if state is None:
del self._states[client_id]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably raise an error if the local state has not been defined and we try to set it as None.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising an exception is a feature IMO (rather than keep it silent), but maybe you mean that we should raise a more meaningful exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to raise an exception, if we want to set it to null but it is already the case, it might be silent, like here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in set_local_state we are responsible for what we do, in apply_awareness_update less so. But I don't have a strong opinion on this, I will add the check back 👍

else:
self._states[client_id] = state
timestamp = int(time.time() * 1000)
self._meta[client_id] = {"clock": clock, "lastUpdated": timestamp}
added = []
updated = []
filtered_updated = []
removed = []
if state is None:
removed.append(client_id)
elif prev_state is None:
if state is not None:
added.append(client_id)
else:
updated.append(client_id)
if prev_state != state:
filtered_updated.append(client_id)
if added or filtered_updated or removed:
for callback in self._subscriptions.values():
callback(changes)

return changes
callback(
"change",
({"added": added, "updated": filtered_updated, "removed": removed}, "local"),
)
for callback in self._subscriptions.values():
callback("update", ({"added": added, "updated": updated, "removed": removed}, "local"))

def set_local_state_field(self, field: str, value: Any) -> dict[str, Any]:
def set_local_state_field(self, field: str, value: Any) -> None:
"""
Sets a local state field, and optionally returns the encoded new state.
Sets a local state field.

Args:
field: The field of the local state to set.
value: The value associated with the field.

Returns:
A dictionary with the changes and the origin (="local").
"""
current_state = self.get_local_state()
current_state[field] = value
return self.set_local_state(current_state)
state = self.get_local_state()
if state is not None:
state[field] = value
self.set_local_state(state)

def encode_awareness_update(self, client_ids: list[int]) -> bytes | None:
def encode_awareness_update(self, client_ids: list[int]) -> bytes:
"""
Encode the states of the client ids.
Creates an encoded awareness update of the clients given by their IDs.

Args:
client_ids: The list of clients' state to update.
client_ids: The list of client IDs for which to create an update.

Returns:
The encoded clients' state.
The encoded awareness update.
"""
messages = []
encoder = Encoder()
encoder.write_var_uint(len(client_ids))
for client_id in client_ids:
if client_id not in self._states:
continue
state = self._states[client_id]
meta = self._meta[client_id]
update = json.dumps(state, separators=(",", ":")).encode()
client_msg = [update]
client_msg.insert(0, write_var_uint(len(update)))
client_msg.insert(0, write_var_uint(meta.get("clock", 0)))
client_msg.insert(0, write_var_uint(client_id))
messages.append(b"".join(client_msg))
state = self._states.get(client_id)
clock = cast(int, self._meta.get(client_id, {}).get("clock"))
encoder.write_var_uint(client_id)
encoder.write_var_uint(clock)
encoder.write_var_string(json.dumps(state, separators=(",", ":")))
return encoder.to_bytes()

if not messages:
return None

messages.insert(0, write_var_uint(len(client_ids)))
encoded_messages = b"".join(messages)
def apply_awareness_update(self, update: bytes, origin: Any) -> None:
"""
Applies the binary update and notifies subscribers with changes.

message = [
write_var_uint(YMessageType.AWARENESS),
write_var_uint(len(encoded_messages)),
encoded_messages,
]
return b"".join(message)
Args:
update: The binary update.
origin: The origin of the update.
"""
decoder = Decoder(update)
timestamp = int(time.time() * 1000)
added = []
updated = []
filtered_updated = []
removed = []
length = decoder.read_var_uint()
for _ in range(length):
client_id = decoder.read_var_uint()
clock = decoder.read_var_uint()
state_str = decoder.read_var_string()
state = None if not state_str else json.loads(state_str)
client_meta = self._meta.get(client_id)
prev_state = self._states.get(client_id)
curr_clock = 0 if client_meta is None else client_meta["clock"]
if curr_clock < clock or (
curr_clock == clock and state is None and client_id in self._states
):
if state is None:
# Never let a remote client remove this local state.
if client_id == self.client_id and self.get_local_state() is not None:
# Remote client removed the local state. Do not remove state.
# Broadcast a message indicating that this client still exists by increasing
# the clock.
clock += 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be

clock = curr_clock + 1

to update the local clock ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mimics the JavaScript implementation. Should it be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I misunderstood the clock, but I understood that each client has its own.
If this is the case, why should we rely on a value coming from a client to update the local clock ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to open an issue in https://github.com/yjs/y-protocols to discuss this, or maybe @dmonad can comment here?

else:
if client_id in self._states:
del self._states[client_id]
else:
self._states[client_id] = state
self._meta[client_id] = {
"clock": clock,
"lastUpdated": timestamp,
}
if client_meta is None and state is not None:
added.append(client_id)
elif client_meta is not None and state is None:
removed.append(client_id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we not fallback here if a remote client try to remove the local state ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the translation of this. Do you mean that there is an issue in the JavaScript implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe...
We should test it, but it seems to me that if a remote client try to set the local awareness to null, this condition is fulfilled.
The client_meta is the local meta (and may be not null), and the state is the state to apply, which is null in that case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But a remote client cannot remove the local state, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that, but it will send a wrong information to other client, that the local state has been removed, no ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, maybe open a PR in https://github.com/yjs/y-protocols? It should be fixed there too, and they will confirm if it's a bug.

elif state is not None:
if state != prev_state:
filtered_updated.append(client_id)
updated.append(client_id)
if added or filtered_updated or removed:
for callback in self._subscriptions.values():
callback(
"change",
({"added": added, "updated": filtered_updated, "removed": removed}, origin),
)
if added or updated or removed:
for callback in self._subscriptions.values():
callback(
"update", ({"added": added, "updated": updated, "removed": removed}, origin)
)

def observe(self, callback: Callable[[dict[str, Any]], None]) -> str:
def observe(self, callback: Callable[[str, tuple[dict[str, Any], Any]], None]) -> str:
"""
Registers the given callback to awareness changes.

Expand All @@ -217,43 +199,3 @@ def unobserve(self, id: str) -> None:
id: The subscription ID to unregister.
"""
del self._subscriptions[id]

def _update_states(
self, client_id: int, clock: int, state: Any, states_changes: dict[str, list[int]]
) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function could be kept, using an origin to know if we should remove the local state or not.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's slightly different between setLocalState and applyAwarenessUpdate, right? In particular setLocalState doesn't handle the clock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure to understand "handle the clock". It does update it too, no ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what about this, that _update_states() was doing inconditionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it was wrong in _update_states(), it should also have tested that origin is not "local".
Anyway, the current implementation works, it was just to avoid duplicating code.

"""
Update the states of the clients, and the states_changes dictionary.

Args:
client_id: The client's state to update.
clock: The clock of this client.
state: The updated state.
states_changes: The changes to updates.
"""
timestamp = int(time.time() * 1000)
client_meta = self._meta.get(client_id)
prev_state = self._states.get(client_id)
curr_clock = 0 if client_meta is None else client_meta["clock"]
if curr_clock < clock or (
curr_clock == clock and state is None and client_id in self._states
):
if state is None:
if client_id == self.client_id and self._states.get(client_id) is not None:
clock += 1
else:
if client_id in self._states:
del self._states[client_id]
else:
self._states[client_id] = state
self._meta[client_id] = {
"clock": clock,
"last_updated": timestamp,
}
if client_meta is None and state is not None:
states_changes["added"].append(client_id)
elif client_meta is not None and state is None:
states_changes["removed"].append(client_id)
elif state is not None:
if state != prev_state:
states_changes["filtered_updated"].append(client_id)
states_changes["updated"].append(client_id)
Loading
Loading