Skip to content

Commit

Permalink
Add proper concurrency support in websockets mode (#1036)
Browse files Browse the repository at this point in the history
  • Loading branch information
wwwillchen authored Oct 20, 2024
1 parent 660e1aa commit 3714bd5
Show file tree
Hide file tree
Showing 17 changed files with 170 additions and 34 deletions.
8 changes: 8 additions & 0 deletions docs/api/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ Allows concurrent updates to state in the same session. If this is not updated,

By default, this is not enabled. You can enable this by setting it to `true`.

### MESOP_WEB_SOCKETS_ENABLED

!!! warning "Experimental feature"

This is an experimental feature and is subject to breaking change. Please follow [https://github.com/google/mesop/issues/1028](https://github.com/google/mesop/issues/1028) for updates.

This uses WebSockets instead of HTTP Server-Sent Events (SSE) as the transport protocol for UI updates. If you set this environment variable to `true`, then [`MESOP_CONCURRENT_UPDATES_ENABLED`](#MESOP_CONCURRENT_UPDATES_ENABLED) will automatically be enabled as well.

### MESOP_STATE_SESSION_BACKEND

Sets the backend to use for caching state data server-side. This makes it so state does
Expand Down
2 changes: 1 addition & 1 deletion mesop/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
execute_module,
get_module_name_from_runfile_path,
)
from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.exceptions import format_traceback
from mesop.runtime import (
enable_debug_mode,
Expand All @@ -22,7 +23,6 @@
from mesop.server.flags import port
from mesop.server.logging import log_startup
from mesop.server.server import configure_flask_app
from mesop.server.server_utils import MESOP_WEBSOCKETS_ENABLED
from mesop.server.static_file_serving import configure_static_file_serving
from mesop.utils.host_util import get_public_host
from mesop.utils.runfiles import get_runfile_location
Expand Down
10 changes: 10 additions & 0 deletions mesop/env/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
load("//build_defs:defaults.bzl", "py_library")

package(
default_visibility = ["//build_defs:mesop_internal"],
)

py_library(
name = "env",
srcs = glob(["*.py"]),
)
Empty file added mesop/env/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions mesop/env/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os

AI_SERVICE_BASE_URL = os.environ.get(
"MESOP_AI_SERVICE_BASE_URL", "http://localhost:43234"
)

MESOP_WEBSOCKETS_ENABLED = (
os.environ.get("MESOP_WEBSOCKETS_ENABLED", "false").lower() == "true"
)

MESOP_CONCURRENT_UPDATES_ENABLED = (
os.environ.get("MESOP_CONCURRENT_UPDATES_ENABLED", "false").lower() == "true"
)

if MESOP_WEBSOCKETS_ENABLED:
print("Experiment enabled: MESOP_WEBSOCKETS_ENABLED")
print("Auto-enabling MESOP_CONCURRENT_UPDATES_ENABLED")
MESOP_CONCURRENT_UPDATES_ENABLED = True
elif MESOP_CONCURRENT_UPDATES_ENABLED:
print("Experiment enabled: MESOP_CONCURRENT_UPDATES_ENABLED")

EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED = (
os.environ.get("MESOP_EXPERIMENTAL_EDITOR_TOOLBAR", "false").lower() == "true"
)

if EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED:
print("Experiment enabled: EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED")
3 changes: 3 additions & 0 deletions mesop/examples/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from mesop.examples import composite as composite
from mesop.examples import concurrency_state as concurrency_state
from mesop.examples import concurrent_updates as concurrent_updates
from mesop.examples import (
concurrent_updates_websockets as concurrent_updates_websockets,
)
from mesop.examples import custom_font as custom_font
from mesop.examples import dict_state as dict_state
from mesop.examples import docs as docs
Expand Down
35 changes: 35 additions & 0 deletions mesop/examples/concurrent_updates_websockets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import time

import mesop as me


@me.page(path="/concurrent_updates_websockets")
def page():
state = me.state(State)
me.text("concurrent_updates_websockets")
me.button(label="Slow state update", on_click=slow_state_update)
me.button(label="Fast state update", on_click=fast_state_update)
me.text("Slow state: " + str(state.slow_state))
me.text("Fast state: " + str(state.fast_state))
if state.show_box:
with me.box():
me.text("Box!")


@me.stateclass
class State:
show_box: bool
slow_state: bool
fast_state: bool


def slow_state_update(e: me.ClickEvent):
time.sleep(3)
me.state(State).show_box = True
me.state(State).slow_state = True
yield


def fast_state_update(e: me.ClickEvent):
me.state(State).show_box = True
me.state(State).fast_state = True
2 changes: 2 additions & 0 deletions mesop/runtime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ py_library(
srcs = glob(["*.py"]),
deps = [
"//mesop/dataclass_utils",
"//mesop/env",
"//mesop/events",
"//mesop/exceptions",
"//mesop/protos:ui_py_pb2",
"//mesop/security",
"//mesop/server:state_sessions",
"//mesop/utils",
"//mesop/warn",
] + THIRD_PARTY_PY_FLASK,
)
18 changes: 18 additions & 0 deletions mesop/runtime/context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import copy
import threading
import types
import urllib.parse as urlparse
from typing import Any, Callable, Generator, Sequence, TypeVar, cast
Expand All @@ -10,6 +11,7 @@
serialize_dataclass,
update_dataclass_from_json,
)
from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.exceptions import (
MesopDeveloperException,
MesopException,
Expand Down Expand Up @@ -42,6 +44,22 @@ def __init__(
self._theme_settings: pb.ThemeSettings | None = None
self._js_modules: set[str] = set()
self._query_params: dict[str, list[str]] = {}
if MESOP_WEBSOCKETS_ENABLED:
self._lock = threading.Lock()

def acquire_lock(self) -> None:
# No-op if websockets is not enabled because
# there shouldn't be concurrent updates to the same
# context instance.
if MESOP_WEBSOCKETS_ENABLED:
self._lock.acquire()

def release_lock(self) -> None:
# No-op if websockets is not enabled because
# there shouldn't be concurrent updates to the same
# context instance.
if MESOP_WEBSOCKETS_ENABLED:
self._lock.release()

def register_js_module(self, js_module_path: str) -> None:
self._js_modules.add(js_module_path)
Expand Down
17 changes: 16 additions & 1 deletion mesop/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
from dataclasses import dataclass
from typing import Any, Callable, Generator, Type, TypeVar, cast

from flask import g
from flask import g, request

import mesop.protos.ui_pb2 as pb
from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.events import LoadEvent, MesopEvent
from mesop.exceptions import MesopDeveloperException, MesopUserException
from mesop.key import Key
from mesop.security.security_policy import SecurityPolicy
from mesop.utils.backoff import exponential_backoff
from mesop.warn import warn

from .context import Context

Expand Down Expand Up @@ -54,12 +56,25 @@ def __init__(self):
self._state_classes: list[type[Any]] = []
self._loading_errors: list[pb.ServerError] = []
self._has_served_traffic = False
self._contexts = {}

def context(self) -> Context:
if MESOP_WEBSOCKETS_ENABLED and hasattr(request, "sid"):
# flask-socketio adds sid (session id) to the request object.
sid = request.sid # type: ignore
if sid not in self._contexts:
self._contexts[sid] = self.create_context()
return self._contexts[sid]
if "_mesop_context" not in g:
g._mesop_context = self.create_context()
return g._mesop_context

def delete_context(self, sid: str) -> None:
if sid in self._contexts:
del self._contexts[sid]
else:
warn(f"Tried to delete context with sid={sid} that doesn't exist.")

def create_context(self) -> Context:
# If running in prod mode, *always* enable the has served traffic safety check.
# If running in debug mode, *disable* the has served traffic safety check.
Expand Down
1 change: 1 addition & 0 deletions mesop/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ py_library(
deps = [
"//mesop/component_helpers",
"//mesop/editor",
"//mesop/env",
"//mesop/events",
"//mesop/protos:ui_py_pb2",
"//mesop/utils",
Expand Down
22 changes: 19 additions & 3 deletions mesop/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
import mesop.protos.ui_pb2 as pb
from mesop.component_helpers import diff_component
from mesop.editor.component_configs import get_component_configs
from mesop.env.env import (
EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED,
MESOP_CONCURRENT_UPDATES_ENABLED,
MESOP_WEBSOCKETS_ENABLED,
)
from mesop.events import LoadEvent
from mesop.exceptions import format_traceback
from mesop.runtime import runtime
from mesop.server.constants import WEB_COMPONENTS_PATH_SEGMENT
from mesop.server.server_debug_routes import configure_debug_routes
from mesop.server.server_utils import (
EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED,
MESOP_CONCURRENT_UPDATES_ENABLED,
MESOP_WEBSOCKETS_ENABLED,
STREAM_END,
create_update_state_event,
is_same_site,
Expand All @@ -38,6 +40,7 @@ def render_loop(
init_request: bool = False,
) -> Generator[str, None, None]:
try:
runtime().context().acquire_lock()
runtime().run_path(path=path, trace_mode=trace_mode)
page_config = runtime().get_page_config(path=path)
title = page_config.title if page_config else "Unknown path"
Expand Down Expand Up @@ -88,6 +91,8 @@ def render_loop(
yield from yield_errors(
error=pb.ServerError(exception=str(e), traceback=format_traceback())
)
finally:
runtime().context().release_lock()

def yield_errors(error: pb.ServerError) -> Generator[str, None, None]:
if not runtime().debug_mode:
Expand Down Expand Up @@ -254,6 +259,17 @@ def teardown_clear_stale_state_sessions(error=None):

socketio = SocketIO(flask_app)

@socketio.on_error(namespace=UI_PATH)
def handle_error(e):
print("WebSocket error", e)
sid = request.sid # type: ignore
runtime().delete_context(sid)

@socketio.on("disconnect", namespace=UI_PATH)
def handle_disconnect():
sid = request.sid # type: ignore
runtime().delete_context(sid)

@socketio.on("message", namespace=UI_PATH)
def handle_message(message):
if not message:
Expand Down
2 changes: 1 addition & 1 deletion mesop/server/server_debug_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

from flask import Flask, Response, request

from mesop.env.env import AI_SERVICE_BASE_URL
from mesop.runtime import runtime
from mesop.server.server_utils import (
AI_SERVICE_BASE_URL,
check_editor_access,
make_sse_response,
sse_request,
Expand Down
28 changes: 1 addition & 27 deletions mesop/server/server_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
import json
import os
import secrets
import urllib.parse as urlparse
from typing import Any, Generator, Iterable
Expand All @@ -9,35 +8,10 @@
from flask import Response, abort, request

import mesop.protos.ui_pb2 as pb
from mesop.env.env import EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED
from mesop.runtime import runtime
from mesop.server.config import app_config

AI_SERVICE_BASE_URL = os.environ.get(
"MESOP_AI_SERVICE_BASE_URL", "http://localhost:43234"
)

MESOP_WEBSOCKETS_ENABLED = (
os.environ.get("MESOP_WEBSOCKETS_ENABLED", "false").lower() == "true"
)

MESOP_CONCURRENT_UPDATES_ENABLED = (
os.environ.get("MESOP_CONCURRENT_UPDATES_ENABLED", "false").lower() == "true"
)

if MESOP_WEBSOCKETS_ENABLED:
print("Experiment enabled: MESOP_WEBSOCKETS_ENABLED")
print("Auto-enabling MESOP_CONCURRENT_UPDATES_ENABLED")
MESOP_CONCURRENT_UPDATES_ENABLED = True
elif MESOP_CONCURRENT_UPDATES_ENABLED:
print("Experiment enabled: MESOP_CONCURRENT_UPDATES_ENABLED")

EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED = (
os.environ.get("MESOP_EXPERIMENTAL_EDITOR_TOOLBAR", "false").lower() == "true"
)

if EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED:
print("Experiment enabled: EXPERIMENTAL_EDITOR_TOOLBAR_ENABLED")

LOCALHOSTS = (
# For IPv4 localhost
"127.0.0.1",
Expand Down
2 changes: 1 addition & 1 deletion mesop/server/wsgi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from absl import flags
from flask import Flask

from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.runtime import enable_debug_mode
from mesop.server.constants import EDITOR_PACKAGE_PATH, PROD_PACKAGE_PATH
from mesop.server.flags import port
from mesop.server.logging import log_startup
from mesop.server.server import configure_flask_app
from mesop.server.server_utils import MESOP_WEBSOCKETS_ENABLED
from mesop.server.static_file_serving import configure_static_file_serving
from mesop.utils.host_util import get_local_host

Expand Down
17 changes: 17 additions & 0 deletions mesop/tests/e2e/concurrent_updates_websockets_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import {testInWebSocketsEnabledOnly} from './e2e_helpers';
import {expect} from '@playwright/test';

testInWebSocketsEnabledOnly(
'concurrent updates (websockets)',
async ({page}) => {
await page.goto('/concurrent_updates_websockets');
await page.getByRole('button', {name: 'Slow state update'}).click();
await page.getByRole('button', {name: 'Fast state update'}).click();
await expect(page.getByText('Fast state: true')).toBeVisible();
expect(await page.locator('text="Box!"').count()).toBe(1);
await expect(page.getByText('Slow state: false')).toBeVisible();
await expect(page.getByText('Slow state: true')).toBeVisible();
// Make sure there isn't a second Box from the concurrent update.
expect(await page.locator('text="Box!"').count()).toBe(1);
},
);
10 changes: 10 additions & 0 deletions mesop/tests/e2e/e2e_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,13 @@ export const testInConcurrentUpdatesEnabledOnly = base.extend({
await use(page);
},
});

export const testInWebSocketsEnabledOnly = base.extend({
// Skip this test if MESOP_WEBSOCKETS_ENABLED is not 'true'
page: async ({page}, use) => {
if (process.env.MESOP_WEBSOCKETS_ENABLED !== 'true') {
base.skip(true, 'Skipping test in websockets disabled mode');
}
await use(page);
},
});

0 comments on commit 3714bd5

Please sign in to comment.