Skip to content
This repository has been archived by the owner on Sep 30, 2020. It is now read-only.

Commit

Permalink
asrv: Move database management logic to a new DbTracker class.
Browse files Browse the repository at this point in the history
  • Loading branch information
mwkmwkmwk committed Apr 12, 2017
1 parent 3909603 commit 508a093
Show file tree
Hide file tree
Showing 13 changed files with 690 additions and 610 deletions.
1 change: 1 addition & 0 deletions python/requirements-py2.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
enum34==1.1.6
backports.functools_lru_cache==1.3
16 changes: 3 additions & 13 deletions python/veles/async_client/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import weakref

from veles.proto import messages
from veles.proto.node import PosFilter
from veles.async_conn.conn import AsyncConnection
Expand All @@ -22,12 +20,13 @@
QueryHandler,
BroadcastHandler,
)
from veles.async_conn.node import AsyncNode
from veles.async_conn.subscriber import (
from veles.db.subscriber import (
BaseSubscriberNode,
BaseSubscriberData,
BaseSubscriberBinData,
BaseSubscriberList,
)
from veles.async_conn.subscriber import (
BaseSubscriberQuery,
)
from .runner import (
Expand All @@ -53,21 +52,12 @@ class AsyncRemoteConnection(AsyncConnection):
def __init__(self, loop, proto):
self.loop = loop
self.proto = proto
self.objs = weakref.WeakValueDictionary()
self.conns = {}
self.next_cid = 0
self.proto.conn = self
self.subs = {}
super().__init__()

def get_node_norefresh(self, obj_id):
try:
return self.objs[obj_id]
except KeyError:
res = AsyncNode(self, obj_id, None)
self.objs[obj_id] = res
return res

def new_conn(self, conn):
cid = self.next_cid
self.next_cid += 1
Expand Down
13 changes: 11 additions & 2 deletions python/veles/async_conn/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import weakref

import asyncio

from veles.proto.node import PosFilter
from veles.schema.nodeid import NodeID
from .plugin import MethodHandler, QueryHandler, BroadcastHandler
from .node import AsyncNode


class AsyncConnection:
Expand All @@ -25,16 +28,22 @@ class AsyncConnection:
"""

def __init__(self):
self.anodes = weakref.WeakValueDictionary()
self.root = self.get_node_norefresh(NodeID.root_id)

def get_node_norefresh(self, id):
def get_node_norefresh(self, nid):
"""
Returns an AsyncNode with the given id immediately. Does not fetch it
from the server - it may contain invalid data until refresh is called.
Such nodes may still be useful for some operations (eg. listing
children).
"""
raise NotImplementedError
try:
return self.anodes[nid]
except KeyError:
res = AsyncNode(self, nid, None)
self.anodes[nid] = res
return res

def get_node(self, id):
"""
Expand Down
97 changes: 1 addition & 96 deletions python/veles/async_conn/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,79 +13,10 @@
# limitations under the License.


from veles.proto.node import PosFilter
from veles.db.subscriber import BaseSubscriber
from veles.proto.exceptions import VelesException


class BaseSubscriber:
def __init__(self, tracker):
self.active = True
self.tracker = tracker
self.tracker.register_subscriber(self)

def cancel(self):
if self.active:
self.active = False
self.tracker.unregister_subscriber(self)


class BaseSubscriberNode(BaseSubscriber):
"""
A subscriber of node modifications. To watch for node modifications,
create a subclass of this, override node_changed and error callbacks,
and create an instance. When the subscription is no longer used, call
cancel().
"""

def __init__(self, tracker, node):
self.node = node
super().__init__(tracker)

def node_changed(self, node):
raise NotImplementedError

def error(self, err):
raise NotImplementedError


class BaseSubscriberData(BaseSubscriber):
"""
A subscriber of node data modifications. ``data_changed`` will be called
whenever the data value is changed.
"""

def __init__(self, tracker, node, key):
self.node = node
self.key = key
super().__init__(tracker)

def data_changed(self, data):
raise NotImplementedError

def error(self, err):
raise NotImplementedError


class BaseSubscriberBinData(BaseSubscriber):
"""
A subscriber of node binary data modifications. ``bindata_changed`` will
be called whenever the given bindara range is changed.
"""

def __init__(self, tracker, node, key, start, end):
self.node = node
self.key = key
self.start = start
self.end = end
super().__init__(tracker)

def bindata_changed(self, data):
raise NotImplementedError

def error(self, err):
raise NotImplementedError


class BaseSubscriberQueryRaw(BaseSubscriber):
"""
A subscriber of query results. ``raw_result_changed`` is called whenever
Expand Down Expand Up @@ -125,29 +56,3 @@ def raw_result_changed(self, result, checks):
self.result_changed(self.sig.result.load(result))
except VelesException as e:
self.error(e, checks)


class BaseSubscriberList(BaseSubscriber):
def __init__(self, tracker, parent, tags=frozenset(),
pos_filter=PosFilter()):
self.parent = parent
self.tags = tags
self.pos_filter = pos_filter
super().__init__(tracker)

def matches(self, node):
if node is None:
return False
if node.parent != self.parent:
return False
if not self.tags <= node.tags:
return False
if not self.pos_filter.matches(node):
return False
return True

def list_changed(self, changed, gone):
raise NotImplementedError

def error(self, err):
raise NotImplementedError
3 changes: 2 additions & 1 deletion python/veles/db/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,8 @@ def list(self, parent, tags=frozenset(), pos_filter=PosFilter()):
return {NodeID(bytes(x)) for x, in c.fetchall()}

def begin(self):
assert not self.db.in_transaction
if six.PY3:
assert not self.db.in_transaction

def commit(self):
self.db.commit()
Expand Down
39 changes: 19 additions & 20 deletions python/veles/server/node.py → python/veles/db/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from veles.async_conn.node import AsyncNode
from veles.proto.exceptions import (
VelesException,
ObjectGoneError,
)
from veles.schema.nodeid import NodeID


class AsyncLocalNode(AsyncNode):
def __init__(self, conn, id, node, parent):
super().__init__(conn, id, node)
class DbNode(object):
def __init__(self, tracker, id, node, parent):
self.tracker = tracker
self.id = id
self.node = node
self.parent = parent
self.subs = set()
self.data_subs = {}
Expand All @@ -32,25 +33,23 @@ def __init__(self, conn, id, node, parent):

def _add_sub(self, sub):
self.subs.add(sub)
self._send_sub(sub)

def _del_sub(self, sub):
self.subs.remove(sub)

def _send_sub(self, sub):
if self.node is not None:
sub.node_changed(self.node)
else:
sub.error(ObjectGoneError())

def _del_sub(self, sub):
self.subs.remove(sub)

def _add_sub_list(self, sub):
obj_ids = self.conn.db.list(self.id, sub.tags, sub.pos_filter)
objs = {self.conn.get_node_norefresh(x) for x in obj_ids}
self.list_subs[sub] = objs
if self.node is not None or self.id == NodeID.root_id:
sub.list_changed([obj.node for obj in objs], [])
else:
sub.error(ObjectGoneError())
try:
nids = self.tracker.get_list_raw(self.id, sub.tags, sub.pos_filter)
dbnodes = {self.tracker.get_cached_node(x) for x in nids}
self.list_subs[sub] = dbnodes
sub.list_changed([dbnode.node for dbnode in dbnodes], [])
except VelesException as e:
self.list_subs[sub] = None
sub.error(e)

def _del_sub_list(self, sub):
del self.list_subs[sub]
Expand All @@ -64,7 +63,7 @@ def _add_sub_data(self, sub):
if self.node is None:
sub.error(ObjectGoneError())
else:
sub.data_changed(self.conn.db.get_data(self.id, sub.key))
sub.data_changed(self.tracker.get_data(self.id, sub.key))

def _del_sub_data(self, sub):
self.data_subs[sub.key].remove(sub)
Expand All @@ -80,7 +79,7 @@ def _add_sub_bindata(self, sub):
if self.node is None:
sub.error(ObjectGoneError())
else:
sub.bindata_changed(self.conn.db.get_bindata(
sub.bindata_changed(self.tracker.get_bindata(
self.id, sub.key, sub.start, sub.end))

def _del_sub_bindata(self, sub):
Expand Down
Loading

0 comments on commit 508a093

Please sign in to comment.