Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proper concurrency support in websockets mode #1036

Merged
merged 6 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/api/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ Allows concurrent updates to state in the same session. If this is not updated,

By default, this is not enabled. You can enable this by setting it to `true`.

### MESOP_WEB_SOCKETS_ENABLED

!!! warning "Experimental feature"

This is an experimental feature and is subject to breaking change. Please follow [https://github.com/google/mesop/issues/1028](https://github.com/google/mesop/issues/1028) for updates.

This uses WebSockets instead of HTTP Server-Sent Events (SSE) as the transport protocol for UI updates. If you set this environment variable to `true`, then [`MESOP_CONCURRENT_UPDATES_ENABLED`](#MESOP_CONCURRENT_UPDATES_ENABLED) will automatically be enabled as well.

### MESOP_STATE_SESSION_BACKEND

Sets the backend to use for caching state data server-side. This makes it so state does
Expand Down
2 changes: 1 addition & 1 deletion mesop/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
execute_module,
get_module_name_from_runfile_path,
)
from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.exceptions import format_traceback
from mesop.runtime import (
enable_debug_mode,
Expand All @@ -22,7 +23,6 @@
from mesop.server.flags import port
from mesop.server.logging import log_startup
from mesop.server.server import configure_flask_app
from mesop.server.server_utils import MESOP_WEBSOCKETS_ENABLED
from mesop.server.static_file_serving import configure_static_file_serving
from mesop.utils.host_util import get_public_host
from mesop.utils.runfiles import get_runfile_location
Expand Down
10 changes: 10 additions & 0 deletions mesop/env/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
load("//build_defs:defaults.bzl", "py_library")

package(
default_visibility = ["//build_defs:mesop_internal"],
)

py_library(
name = "env",
srcs = glob(["*.py"]),
)
Empty file added mesop/env/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions mesop/env/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os

AI_SERVICE_BASE_URL = os.environ.get(
"MESOP_AI_SERVICE_BASE_URL", "http://localhost:43234"
)

MESOP_WEBSOCKETS_ENABLED = (
os.environ.get("MESOP_WEBSOCKETS_ENABLED", "false").lower() == "true"
)

MESOP_CONCURRENT_UPDATES_ENABLED = (
os.environ.get("MESOP_CONCURRENT_UPDATES_ENABLED", "false").lower() == "true"
)

if MESOP_WEBSOCKETS_ENABLED:
print("Experiment enabled: MESOP_WEBSOCKETS_ENABLED")
print("Auto-enabling MESOP_CONCURRENT_UPDATES_ENABLED")
MESOP_CONCURRENT_UPDATES_ENABLED = True
elif MESOP_CONCURRENT_UPDATES_ENABLED:
print("Experiment enabled: MESOP_CONCURRENT_UPDATES_ENABLED")

EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED = (
os.environ.get("MESOP_EXPERIMENTAL_EDITOR_TOOLBAR", "false").lower() == "true"
)

if EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED:
print("Experiment enabled: EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED")
3 changes: 3 additions & 0 deletions mesop/examples/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from mesop.examples import composite as composite
from mesop.examples import concurrency_state as concurrency_state
from mesop.examples import concurrent_updates as concurrent_updates
from mesop.examples import (
concurrent_updates_websockets as concurrent_updates_websockets,
)
from mesop.examples import custom_font as custom_font
from mesop.examples import dict_state as dict_state
from mesop.examples import docs as docs
Expand Down
35 changes: 35 additions & 0 deletions mesop/examples/concurrent_updates_websockets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import time

import mesop as me


@me.page(path="/concurrent_updates_websockets")
def page():
state = me.state(State)
me.text("concurrent_updates_websockets")
me.button(label="Slow state update", on_click=slow_state_update)
me.button(label="Fast state update", on_click=fast_state_update)
me.text("Slow state: " + str(state.slow_state))
me.text("Fast state: " + str(state.fast_state))
if state.show_box:
with me.box():
me.text("Box!")


@me.stateclass
class State:
show_box: bool
slow_state: bool
fast_state: bool


def slow_state_update(e: me.ClickEvent):
time.sleep(3)
me.state(State).show_box = True
me.state(State).slow_state = True
yield


def fast_state_update(e: me.ClickEvent):
me.state(State).show_box = True
me.state(State).fast_state = True
2 changes: 2 additions & 0 deletions mesop/runtime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ py_library(
srcs = glob(["*.py"]),
deps = [
"//mesop/dataclass_utils",
"//mesop/env",
"//mesop/events",
"//mesop/exceptions",
"//mesop/protos:ui_py_pb2",
"//mesop/security",
"//mesop/server:state_sessions",
"//mesop/utils",
"//mesop/warn",
] + THIRD_PARTY_PY_FLASK,
)
18 changes: 18 additions & 0 deletions mesop/runtime/context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import copy
import threading
import types
import urllib.parse as urlparse
from typing import Any, Callable, Generator, Sequence, TypeVar, cast
Expand All @@ -10,6 +11,7 @@
serialize_dataclass,
update_dataclass_from_json,
)
from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.exceptions import (
MesopDeveloperException,
MesopException,
Expand Down Expand Up @@ -42,6 +44,22 @@ def __init__(
self._theme_settings: pb.ThemeSettings | None = None
self._js_modules: set[str] = set()
self._query_params: dict[str, list[str]] = {}
if MESOP_WEBSOCKETS_ENABLED:
self._lock = threading.Lock()

def acquire_lock(self) -> None:
# No-op if websockets is not enabled because
# there shouldn't be concurrent updates to the same
# context instance.
if MESOP_WEBSOCKETS_ENABLED:
self._lock.acquire()

def release_lock(self) -> None:
# No-op if websockets is not enabled because
# there shouldn't be concurrent updates to the same
# context instance.
if MESOP_WEBSOCKETS_ENABLED:
self._lock.release()

def register_js_module(self, js_module_path: str) -> None:
self._js_modules.add(js_module_path)
Expand Down
17 changes: 16 additions & 1 deletion mesop/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
from dataclasses import dataclass
from typing import Any, Callable, Generator, Type, TypeVar, cast

from flask import g
from flask import g, request

import mesop.protos.ui_pb2 as pb
from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.events import LoadEvent, MesopEvent
from mesop.exceptions import MesopDeveloperException, MesopUserException
from mesop.key import Key
from mesop.security.security_policy import SecurityPolicy
from mesop.utils.backoff import exponential_backoff
from mesop.warn import warn

from .context import Context

Expand Down Expand Up @@ -54,12 +56,25 @@ def __init__(self):
self._state_classes: list[type[Any]] = []
self._loading_errors: list[pb.ServerError] = []
self._has_served_traffic = False
self._contexts = {}

def context(self) -> Context:
if MESOP_WEBSOCKETS_ENABLED and hasattr(request, "sid"):
# flask-socketio adds sid (session id) to the request object.
sid = request.sid # type: ignore
if sid not in self._contexts:
self._contexts[sid] = self.create_context()
wwwillchen marked this conversation as resolved.
Show resolved Hide resolved
return self._contexts[sid]
if "_mesop_context" not in g:
g._mesop_context = self.create_context()
return g._mesop_context

def delete_context(self, sid: str) -> None:
if sid in self._contexts:
del self._contexts[sid]
else:
warn(f"Tried to delete context with sid={sid} that doesn't exist.")

def create_context(self) -> Context:
# If running in prod mode, *always* enable the has served traffic safety check.
# If running in debug mode, *disable* the has served traffic safety check.
Expand Down
1 change: 1 addition & 0 deletions mesop/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ py_library(
deps = [
"//mesop/component_helpers",
"//mesop/editor",
"//mesop/env",
"//mesop/events",
"//mesop/protos:ui_py_pb2",
"//mesop/utils",
Expand Down
22 changes: 19 additions & 3 deletions mesop/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
import mesop.protos.ui_pb2 as pb
from mesop.component_helpers import diff_component
from mesop.editor.component_configs import get_component_configs
from mesop.env.env import (
EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED,
MESOP_CONCURRENT_UPDATES_ENABLED,
MESOP_WEBSOCKETS_ENABLED,
)
from mesop.events import LoadEvent
from mesop.exceptions import format_traceback
from mesop.runtime import runtime
from mesop.server.constants import WEB_COMPONENTS_PATH_SEGMENT
from mesop.server.server_debug_routes import configure_debug_routes
from mesop.server.server_utils import (
EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED,
MESOP_CONCURRENT_UPDATES_ENABLED,
MESOP_WEBSOCKETS_ENABLED,
STREAM_END,
create_update_state_event,
is_same_site,
Expand All @@ -38,6 +40,7 @@ def render_loop(
init_request: bool = False,
) -> Generator[str, None, None]:
try:
runtime().context().acquire_lock()
runtime().run_path(path=path, trace_mode=trace_mode)
page_config = runtime().get_page_config(path=path)
title = page_config.title if page_config else "Unknown path"
Expand Down Expand Up @@ -88,6 +91,8 @@ def render_loop(
yield from yield_errors(
error=pb.ServerError(exception=str(e), traceback=format_traceback())
)
finally:
runtime().context().release_lock()

def yield_errors(error: pb.ServerError) -> Generator[str, None, None]:
if not runtime().debug_mode:
Expand Down Expand Up @@ -254,6 +259,17 @@ def teardown_clear_stale_state_sessions(error=None):

socketio = SocketIO(flask_app)

@socketio.on_error(namespace=UI_PATH)
def handle_error(e):
print("WebSocket error", e)
sid = request.sid # type: ignore
runtime().delete_context(sid)

@socketio.on("disconnect", namespace=UI_PATH)
def handle_disconnect():
sid = request.sid # type: ignore
runtime().delete_context(sid)

@socketio.on("message", namespace=UI_PATH)
def handle_message(message):
if not message:
Expand Down
2 changes: 1 addition & 1 deletion mesop/server/server_debug_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

from flask import Flask, Response, request

from mesop.env.env import AI_SERVICE_BASE_URL
from mesop.runtime import runtime
from mesop.server.server_utils import (
AI_SERVICE_BASE_URL,
check_editor_access,
make_sse_response,
sse_request,
Expand Down
28 changes: 1 addition & 27 deletions mesop/server/server_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
import json
import os
import secrets
import urllib.parse as urlparse
from typing import Any, Generator, Iterable
Expand All @@ -9,35 +8,10 @@
from flask import Response, abort, request

import mesop.protos.ui_pb2 as pb
from mesop.env.env import EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED
from mesop.runtime import runtime
from mesop.server.config import app_config

AI_SERVICE_BASE_URL = os.environ.get(
"MESOP_AI_SERVICE_BASE_URL", "http://localhost:43234"
)

MESOP_WEBSOCKETS_ENABLED = (
os.environ.get("MESOP_WEBSOCKETS_ENABLED", "false").lower() == "true"
)

MESOP_CONCURRENT_UPDATES_ENABLED = (
os.environ.get("MESOP_CONCURRENT_UPDATES_ENABLED", "false").lower() == "true"
)

if MESOP_WEBSOCKETS_ENABLED:
print("Experiment enabled: MESOP_WEBSOCKETS_ENABLED")
print("Auto-enabling MESOP_CONCURRENT_UPDATES_ENABLED")
MESOP_CONCURRENT_UPDATES_ENABLED = True
elif MESOP_CONCURRENT_UPDATES_ENABLED:
print("Experiment enabled: MESOP_CONCURRENT_UPDATES_ENABLED")

EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED = (
os.environ.get("MESOP_EXPERIMENTAL_EDITOR_TOOLBAR", "false").lower() == "true"
)

if EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED:
print("Experiment enabled: EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED")

LOCALHOSTS = (
# For IPv4 localhost
"127.0.0.1",
Expand Down
2 changes: 1 addition & 1 deletion mesop/server/wsgi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from absl import flags
from flask import Flask

from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.runtime import enable_debug_mode
from mesop.server.constants import EDITOR_PACKAGE_PATH, PROD_PACKAGE_PATH
from mesop.server.flags import port
from mesop.server.logging import log_startup
from mesop.server.server import configure_flask_app
from mesop.server.server_utils import MESOP_WEBSOCKETS_ENABLED
from mesop.server.static_file_serving import configure_static_file_serving
from mesop.utils.host_util import get_local_host

Expand Down
17 changes: 17 additions & 0 deletions mesop/tests/e2e/concurrent_updates_websockets_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import {testInWebSocketsEnabledOnly} from './e2e_helpers';
import {expect} from '@playwright/test';

testInWebSocketsEnabledOnly(
wwwillchen marked this conversation as resolved.
Show resolved Hide resolved
'concurrent updates (websockets)',
async ({page}) => {
await page.goto('/concurrent_updates_websockets');
await page.getByRole('button', {name: 'Slow state update'}).click();
await page.getByRole('button', {name: 'Fast state update'}).click();
await expect(page.getByText('Fast state: true')).toBeVisible();
expect(await page.locator('text="Box!"').count()).toBe(1);
await expect(page.getByText('Slow state: false')).toBeVisible();
await expect(page.getByText('Slow state: true')).toBeVisible();
// Make sure there isn't a second Box from the concurrent update.
expect(await page.locator('text="Box!"').count()).toBe(1);
},
);
10 changes: 10 additions & 0 deletions mesop/tests/e2e/e2e_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,13 @@ export const testInConcurrentUpdatesEnabledOnly = base.extend({
await use(page);
},
});

export const testInWebSocketsEnabledOnly = base.extend({
// Skip this test if MESOP_WEBSOCKETS_ENABLED is not 'true'
page: async ({page}, use) => {
if (process.env.MESOP_WEBSOCKETS_ENABLED !== 'true') {
base.skip(true, 'Skipping test in websockets disabled mode');
}
await use(page);
},
});
Loading