Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add point store and candidate set #9

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
6 changes: 6 additions & 0 deletions example/random/small_schema.ngql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE SPACE `random_small`( partition_num=8, replica_factor=1, vid_type=INT64);

create tag point(vector string);
create edge e();

insert vertex point(vector) VALUES 0:(""))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an extra bracket in the end?

64 changes: 64 additions & 0 deletions merak/candidate_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python3

from asyncio import futures
from concurrent.futures.thread import _worker
from merak.point import Point
from typing import List, Tuple
from merak.point_store import PointStore
import heapq
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import concurrent

executor = ThreadPoolExecutor(max_workers=40)

class CandidateSet(object):
def __init__(self, target: Point, points: List[Point], max_size: int, point_store: PointStore) -> None:
global executor
self.target = target
self.max_size = max_size
self.point_store = point_store
self.executor = executor
self.points: List[Tuple(float, Point)] = [(
p.distance(self.target), p) for p in points]
self.points.sort(key=lambda x: x[0])
self.visited = set([p.id for p in points])
self.furthest = self.points[-1]

def pop(self) -> Point:

# if len(self.futures) != 0:
# ret = concurrent.futures.wait(
# self.futures, return_when=concurrent.futures.FIRST_COMPLETED)
# done = ret.done
# self.futures = ret.not_done
# for f in done:
# point: Point = f.result()
# if point.id in self.visited:
# continue
# if len(self.points) < self.max_size:
# heapq.heappush(
# self.points, (point.distance(self.target), point))
# else:
# heapq.heappushpop(
# self.points, point.distance(self.target), point)
if len(self.points) == 0:
return None
ret = heapq.heappop(self.points)
return ret[1]

def add(self, id_list: List[int]):
points = self.executor.map(self.worker,id_list)
for point in points:
if point.id in self.visited:
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue?

self.visited.add(point.id)
x = (point.distance(self.target), point)
self.points.append(x)
self.points.sort(key=lambda x:x[0])
self.points = self.points[0:self.max_size]



def worker(self, id: int) -> Point:
point = self.point_store.get_point(id, True)
return point
14 changes: 10 additions & 4 deletions merak/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def get_neighbors(self, vid) -> Tuple[str, Dict]:
'''
# todo: replace t1 as tag, col1 as property
# todo: use int id?
query = "FETCH PROP ON t1 \'{}\' YIELD properties(vertex).col1".format(vid)
query = "FETCH PROP ON t1 \'{}\' YIELD properties(vertex).col1".format(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need slice to escape

vid)
result = self.session.execute(query)
if not result.is_succeeded():
raise RuntimeError("fetch failed")
Expand All @@ -54,7 +55,8 @@ def get_neighbors(self, vid) -> Tuple[str, Dict]:
raise RuntimeError("fetch no result")

# todo: replace e1 as edge
query = "GO FROM \'{}\' OVER e1 YIELD rank(edge) as rank, dst(edge) as dst".format(vid)
query = "GO FROM \'{}\' OVER e1 YIELD rank(edge) as rank, dst(edge) as dst".format(
vid)
result = self.session.execute(query)
if not result.is_succeeded():
raise RuntimeError("go failed")
Expand All @@ -76,7 +78,8 @@ def get_neighbors(self, vid) -> Tuple[str, Dict]:
return (vec, neighbors)

def insert_vertex(self, vid, vector):
query = "INSERT VERTEX t1(col1) VALUES \'{}\': (\'{}\')".format(vid, vector)
query = "INSERT VERTEX t1(col1) VALUES \'{}\': (\'{}\')".format(
vid, vector)
result = self.session.execute(query)
if not result.is_succeeded():
raise RuntimeError("insert vertex failed")
Expand All @@ -97,7 +100,10 @@ def insert(self, batch: InsertBatch):
raise RuntimeError(f"insert batch {batchStr} failed")

def execute(self, query):
return self.session.execute(query)
session = self.pool.get_session('root', 'nebula')
result = session.execute(query)
session.release()
return result

def close(self):
self.session.release()
Expand Down
Loading