From b287278c8054f778ff25870a73b990fcfe7deb5a Mon Sep 17 00:00:00 2001 From: Zhihui Du Date: Tue, 24 Oct 2023 11:10:01 -0400 Subject: [PATCH] =?UTF-8?q?fix=20the=20compiling=20errors=20in=20arkouda?= =?UTF-8?q?=5Fdevelopment=20when=20new=20arkouda=20vers=E2=80=A6=20(#64)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix the compiling errors in arkouda_development when new arkouda version released * include Oliver's code and add R before the same function --- arachne/server/BreadthFirstSearchMsg.chpl | 4 +- arachne/server/ConnectedComponentsMsg.chpl | 2 +- arachne/server/TriangleCountMsg.chpl | 4 +- arachne_development/server/Aggregators.chpl | 128 ++ .../server/BenchmarkerMsg.chpl | 0 .../server/BreadthFirstSearch.chpl | 135 ++ .../server/BreadthFirstSearchMsg.chpl | 135 ++ arachne_development/server/BuildGraphMsg.chpl | 425 ++++++ arachne_development/server/GenTrussMsg.py | 8 +- arachne_development/server/GraphArray.chpl | 184 ++- arachne_development/server/GraphInfoMsg.chpl | 111 ++ .../server/PropertyGraphMsg.chpl | 1351 +++++++++++++++++ arachne_development/server/ServerModules.cfg | 10 + arachne_development/server/SquareCount.chpl | 78 + .../server/SquareCountMsg.chpl | 71 + .../server/SubgraphIsomorphism.chpl | 193 +++ .../server/SubgraphIsomorphismMsg.chpl | 97 ++ arachne_development/server/TriangleCount.chpl | 22 + .../server/TriangleCountMsg.chpl | 583 +++++++ arachne_development/server/TrussMsg.chpl | 12 +- arachne_development/server/Utils.chpl | 273 ++-- 21 files changed, 3660 insertions(+), 166 deletions(-) create mode 100644 arachne_development/server/Aggregators.chpl create mode 100644 arachne_development/server/BenchmarkerMsg.chpl create mode 100644 arachne_development/server/BreadthFirstSearch.chpl create mode 100644 arachne_development/server/BreadthFirstSearchMsg.chpl create mode 100644 arachne_development/server/BuildGraphMsg.chpl create mode 100644 arachne_development/server/GraphInfoMsg.chpl create mode 100644 arachne_development/server/PropertyGraphMsg.chpl create mode 100644 arachne_development/server/SquareCount.chpl create mode 100644 arachne_development/server/SquareCountMsg.chpl create mode 100644 arachne_development/server/SubgraphIsomorphism.chpl create mode 100644 arachne_development/server/SubgraphIsomorphismMsg.chpl create mode 100644 arachne_development/server/TriangleCount.chpl create mode 100644 arachne_development/server/TriangleCountMsg.chpl diff --git a/arachne/server/BreadthFirstSearchMsg.chpl b/arachne/server/BreadthFirstSearchMsg.chpl index 091436bb..16f8fde3 100644 --- a/arachne/server/BreadthFirstSearchMsg.chpl +++ b/arachne/server/BreadthFirstSearchMsg.chpl @@ -104,5 +104,5 @@ module BreadthFirstSearchMsg { } use CommandMap; - registerFunction("segmentedGraphBFS", segBFSMsg, getModuleName()); -} \ No newline at end of file + registerFunction("RsegmentedGraphBFS", segBFSMsg, getModuleName()); +} diff --git a/arachne/server/ConnectedComponentsMsg.chpl b/arachne/server/ConnectedComponentsMsg.chpl index 17ca7941..ede82ad8 100644 --- a/arachne/server/ConnectedComponentsMsg.chpl +++ b/arachne/server/ConnectedComponentsMsg.chpl @@ -81,5 +81,5 @@ module CCMsg { } use CommandMap; - registerFunction("segmentedGraphCC", segCCMsg,getModuleName()); + registerFunction("RsegmentedGraphCC", segCCMsg,getModuleName()); } diff --git a/arachne/server/TriangleCountMsg.chpl b/arachne/server/TriangleCountMsg.chpl index 8f403aa9..023ef04f 100644 --- a/arachne/server/TriangleCountMsg.chpl +++ b/arachne/server/TriangleCountMsg.chpl @@ -579,5 +579,5 @@ module TriCntMsg { }// end of segTriMsg use CommandMap; - registerFunction("segmentedGraphTri", segTriCntMsg,getModuleName()); -} \ No newline at end of file + registerFunction("RsegmentedGraphTri", segTriCntMsg,getModuleName()); +} diff --git a/arachne_development/server/Aggregators.chpl b/arachne_development/server/Aggregators.chpl new file mode 100644 index 00000000..c43169f0 --- /dev/null +++ b/arachne_development/server/Aggregators.chpl @@ -0,0 +1,128 @@ +module Aggregators { + // Chapel modules. + use ReplicatedDist; + use Set; + + // Package modules. + use CopyAggregation; + + // Arkouda modules. + use CommAggregation; + use ServerConfig; + + /** + * Declare our frontier queues here to be sets, done globally since refs cannot be a part of + * records yet. TODO: move these straight into SetDstAggregator when refs are allowed inside of + * records. */ + var D_frontier_sets = {0..1} dmapped Replicated(); + var frontier_sets : [D_frontier_sets] set(int, parSafe=true); + var frontier_sets_idx : int; + + // Sizes of buffer and yield frequencies taken from the Arkouda server config information. + private const dstBuffSize = getEnvInt("CHPL_AGGREGATION_DST_BUFF_SIZE", 4096); + private const yieldFrequency = getEnvInt("CHPL_AGGREGATION_YIELD_FREQUENCY", 1024); + + /** + * Record for remote set aggregator to perform set additions from one locale to the next. Built + * using the aggregator from CopyAggregation but modfied a bit to make the aggregated memory + * address a set instead of an array memory location. */ + record SetDstAggregator { + type elemType; + type aggType = elemType; + const bufferSize = dstBuffSize; + const myLocaleSpace = LocaleSpace; + var opsUntilYield = yieldFrequency; + var lBuffers: [myLocaleSpace] [0..#bufferSize] aggType; + var rBuffers: [myLocaleSpace] remoteBuffer(aggType); + var bufferIdxs: [myLocaleSpace] int; + + /** + * Allocate the remote buffers on each locale allocated. */ + proc postinit() { + for loc in myLocaleSpace { + rBuffers[loc] = new remoteBuffer(aggType, bufferSize, loc); + } + } + + /** + * Flush all of the buffers during deinitialization. */ + proc deinit() { + flush(); + } + + /** + * For every locale allocated, flush their buffers. */ + proc flush() { + for loc in myLocaleSpace { + _flushBuffer(loc, bufferIdxs[loc], freeData=true); + } + } + + /** + * Make a copy of the data being passed to a remote buffer in the local buffer that + * corresponds to the remote locale. + * + * loc: id of remote locale. + * srcVal: value to be copied to the remote locale. */ + inline proc copy(const loc, const in srcVal: elemType) { + // Get our current index into the buffer for the destination locale. + ref bufferIdx = bufferIdxs[loc]; + + // Buffer the desired value. + lBuffers[loc][bufferIdx] = srcVal; + bufferIdx += 1; + + /** + * Flush our buffer if it's full. If it's been a while since we've let + * other tasks run, yield so that we're not blocking remote tasks from + * flushing their buffers. */ + if bufferIdx == bufferSize { + _flushBuffer(loc, bufferIdx, freeData=false); + opsUntilYield = yieldFrequency; + } else if opsUntilYield == 0 { + chpl_task_yield(); + opsUntilYield = yieldFrequency; + } else { + opsUntilYield -= 1; + } + } + + /** + * Helper function to peform actual buffer steps for each locale. + * + * loc: id of locale to flush. + * bufferIdx: id of buffer to PUT items into. + * freeData: did the last flush happen and have all remote buffers been freed? */ + proc _flushBuffer(loc: int, ref bufferIdx, freeData) { + // Get the buffer id to extract the buffered values. + const myBufferIdx = bufferIdx; + if myBufferIdx == 0 then return; + + // Get refernece to allocated remote buffer at loc and allocate it if it wasn't already. + ref rBuffer = rBuffers[loc]; + const remBufferPtr = rBuffer.cachedAlloc(); + + // PUT local buffer to remote buffer. + rBuffer.PUT(lBuffers[loc], myBufferIdx); + + // Process remote buffer where it now exists. + on Locales[loc] { + ref f = frontier_sets[(frontier_sets_idx + 1) % 2]; + /** + * forall gives error: A standalone or leader iterator is not found for the iterable + * expression in this forall loop */ + for srcVal in rBuffer.localIter(remBufferPtr, myBufferIdx) { + f.add(srcVal); + } + // Only free remaining data at deinit. + if freeData { + rBuffer.localFree(remBufferPtr); + } + } + if freeData { + rBuffer.markFreed(); + } + bufferIdx = 0; + } + } // end of SetDstAggregator +} // end of Aggregators \ No newline at end of file diff --git a/arachne_development/server/BenchmarkerMsg.chpl b/arachne_development/server/BenchmarkerMsg.chpl new file mode 100644 index 00000000..e69de29b diff --git a/arachne_development/server/BreadthFirstSearch.chpl b/arachne_development/server/BreadthFirstSearch.chpl new file mode 100644 index 00000000..85053ee1 --- /dev/null +++ b/arachne_development/server/BreadthFirstSearch.chpl @@ -0,0 +1,135 @@ +module BreadthFirstSearch { + // Chapel modules. + use Reflection; + use Set; + use List; + + // Package modules. + use CopyAggregation; + + // Arachne modules. + use GraphArray; + use Utils; + use Aggregators; + + // Arkouda modules. + use MultiTypeSymbolTable; + use MultiTypeSymEntry; + use ServerConfig; + use AryUtil; + + /** + * Breadth-first search for shared-memory (one locale) systems. Uses a Chapel set for + * + * graph: graph to run bfs on. + * + * returns: success string message. */ + proc bfs_kernel_und_smem(graph:SegGraph, root:int, depth: [?D] int):string throws { + // Extract graph data. + var src = toSymEntry(graph.getComp("SRC"),int).a; + var dst = toSymEntry(graph.getComp("DST"),int).a; + var seg = toSymEntry(graph.getComp("SEGMENTS"),int).a; + + // Generate the frontier sets. + var frontier_sets : [{0..1}] list(int, parSafe=true); + frontier_sets[0] = new list(int, parSafe=true); + frontier_sets[1] = new list(int, parSafe=true); + + var frontier_sets_idx = 0; + var cur_level = 0; + depth[root] = cur_level; + frontier_sets[frontier_sets_idx].pushBack(root); + while true { + var pending_work:bool; + forall u in frontier_sets[frontier_sets_idx] with (|| reduce pending_work) { + var adj_list_start = seg[u]; + var num_neighbors = seg[u+1] - adj_list_start; + if (num_neighbors != 0) { + var adj_list_end = adj_list_start + num_neighbors - 1; + ref neighborhood = dst.localSlice(adj_list_start..adj_list_end); + for v in neighborhood { + if (depth[v] == -1) { + pending_work = true; + depth[v] = cur_level + 1; + frontier_sets[(frontier_sets_idx + 1) % 2].pushBack(v); + } + } + } + } + frontier_sets[frontier_sets_idx].clear(); + if !pending_work { + break; + } + cur_level += 1; + frontier_sets_idx = (frontier_sets_idx + 1) % 2; + }// end while + return "success"; + }// end of bfs_kernel_und_smem + + /** + * Using a remote aggregator above for sets, we are going to perform aggregated writes to the + * locales that include the neighborhood of the vertex being processed. + * + * graph: graph to run bfs on. + * + * returns: success string message. */ + proc bfs_kernel_und_dmem(graph:SegGraph, root:int, depth: [?D] int):string throws { + // Initialize the frontiers on each of the locales. + coforall loc in Locales do on loc { + frontier_sets[0] = new set(int, parSafe=true); + frontier_sets[1] = new set(int, parSafe=true); + } + frontier_sets_idx = 0; + var src = toSymEntry(graph.getComp("SRC"),int).a; + var dst = toSymEntry(graph.getComp("DST"),int).a; + var seg = toSymEntry(graph.getComp("SEGMENTS"),int).a; + + // Add the root to the locale that owns it and update size & depth. + for lc in find_locs(root, graph) { + on lc do frontier_sets[frontier_sets_idx].add(root); + } + var cur_level = 0; + depth[root] = cur_level; + + while true { + var pending_work:bool; + coforall loc in Locales with(|| reduce pending_work) { + on loc { + var src_low = src.localSubdomain().low; + var src_high = src.localSubdomain().high; + forall u in frontier_sets[frontier_sets_idx] with (|| reduce pending_work, var frontier_agg = new SetDstAggregator(int), var depth_agg = new DstAggregator(int)) { + var adj_list_start = seg[u]; + var num_neighbors = seg[u+1] - adj_list_start; + if (num_neighbors != 0) { + var adj_list_end = adj_list_start + num_neighbors - 1; + + // Only pull the part of the adjacency list that is local. + var actual_start = max(adj_list_start, src_low); + var actual_end = min(src_high, adj_list_end); + + ref neighborhood = dst.localSlice(actual_start..actual_end); + for v in neighborhood { + if (depth[v] == -1) { + pending_work = true; + // depth[v] = cur_level + 1; + depth_agg.copy(depth[v], cur_level + 1); + var locs = find_locs(v, graph); + for lc in locs { + frontier_agg.copy(lc.id, v); + } + } + } + } + } //end forall + frontier_sets[frontier_sets_idx].clear(); + } // end on loc + }// end coforall loc + if !pending_work { + break; + } + cur_level += 1; + frontier_sets_idx = (frontier_sets_idx + 1) % 2; + }// end while + return "success"; + }// end of bfs_kernel_und_dmem +}// end of BreadthFirstSearch module \ No newline at end of file diff --git a/arachne_development/server/BreadthFirstSearchMsg.chpl b/arachne_development/server/BreadthFirstSearchMsg.chpl new file mode 100644 index 00000000..85053ee1 --- /dev/null +++ b/arachne_development/server/BreadthFirstSearchMsg.chpl @@ -0,0 +1,135 @@ +module BreadthFirstSearch { + // Chapel modules. + use Reflection; + use Set; + use List; + + // Package modules. + use CopyAggregation; + + // Arachne modules. + use GraphArray; + use Utils; + use Aggregators; + + // Arkouda modules. + use MultiTypeSymbolTable; + use MultiTypeSymEntry; + use ServerConfig; + use AryUtil; + + /** + * Breadth-first search for shared-memory (one locale) systems. Uses a Chapel set for + * + * graph: graph to run bfs on. + * + * returns: success string message. */ + proc bfs_kernel_und_smem(graph:SegGraph, root:int, depth: [?D] int):string throws { + // Extract graph data. + var src = toSymEntry(graph.getComp("SRC"),int).a; + var dst = toSymEntry(graph.getComp("DST"),int).a; + var seg = toSymEntry(graph.getComp("SEGMENTS"),int).a; + + // Generate the frontier sets. + var frontier_sets : [{0..1}] list(int, parSafe=true); + frontier_sets[0] = new list(int, parSafe=true); + frontier_sets[1] = new list(int, parSafe=true); + + var frontier_sets_idx = 0; + var cur_level = 0; + depth[root] = cur_level; + frontier_sets[frontier_sets_idx].pushBack(root); + while true { + var pending_work:bool; + forall u in frontier_sets[frontier_sets_idx] with (|| reduce pending_work) { + var adj_list_start = seg[u]; + var num_neighbors = seg[u+1] - adj_list_start; + if (num_neighbors != 0) { + var adj_list_end = adj_list_start + num_neighbors - 1; + ref neighborhood = dst.localSlice(adj_list_start..adj_list_end); + for v in neighborhood { + if (depth[v] == -1) { + pending_work = true; + depth[v] = cur_level + 1; + frontier_sets[(frontier_sets_idx + 1) % 2].pushBack(v); + } + } + } + } + frontier_sets[frontier_sets_idx].clear(); + if !pending_work { + break; + } + cur_level += 1; + frontier_sets_idx = (frontier_sets_idx + 1) % 2; + }// end while + return "success"; + }// end of bfs_kernel_und_smem + + /** + * Using a remote aggregator above for sets, we are going to perform aggregated writes to the + * locales that include the neighborhood of the vertex being processed. + * + * graph: graph to run bfs on. + * + * returns: success string message. */ + proc bfs_kernel_und_dmem(graph:SegGraph, root:int, depth: [?D] int):string throws { + // Initialize the frontiers on each of the locales. + coforall loc in Locales do on loc { + frontier_sets[0] = new set(int, parSafe=true); + frontier_sets[1] = new set(int, parSafe=true); + } + frontier_sets_idx = 0; + var src = toSymEntry(graph.getComp("SRC"),int).a; + var dst = toSymEntry(graph.getComp("DST"),int).a; + var seg = toSymEntry(graph.getComp("SEGMENTS"),int).a; + + // Add the root to the locale that owns it and update size & depth. + for lc in find_locs(root, graph) { + on lc do frontier_sets[frontier_sets_idx].add(root); + } + var cur_level = 0; + depth[root] = cur_level; + + while true { + var pending_work:bool; + coforall loc in Locales with(|| reduce pending_work) { + on loc { + var src_low = src.localSubdomain().low; + var src_high = src.localSubdomain().high; + forall u in frontier_sets[frontier_sets_idx] with (|| reduce pending_work, var frontier_agg = new SetDstAggregator(int), var depth_agg = new DstAggregator(int)) { + var adj_list_start = seg[u]; + var num_neighbors = seg[u+1] - adj_list_start; + if (num_neighbors != 0) { + var adj_list_end = adj_list_start + num_neighbors - 1; + + // Only pull the part of the adjacency list that is local. + var actual_start = max(adj_list_start, src_low); + var actual_end = min(src_high, adj_list_end); + + ref neighborhood = dst.localSlice(actual_start..actual_end); + for v in neighborhood { + if (depth[v] == -1) { + pending_work = true; + // depth[v] = cur_level + 1; + depth_agg.copy(depth[v], cur_level + 1); + var locs = find_locs(v, graph); + for lc in locs { + frontier_agg.copy(lc.id, v); + } + } + } + } + } //end forall + frontier_sets[frontier_sets_idx].clear(); + } // end on loc + }// end coforall loc + if !pending_work { + break; + } + cur_level += 1; + frontier_sets_idx = (frontier_sets_idx + 1) % 2; + }// end while + return "success"; + }// end of bfs_kernel_und_dmem +}// end of BreadthFirstSearch module \ No newline at end of file diff --git a/arachne_development/server/BuildGraphMsg.chpl b/arachne_development/server/BuildGraphMsg.chpl new file mode 100644 index 00000000..5a624747 --- /dev/null +++ b/arachne_development/server/BuildGraphMsg.chpl @@ -0,0 +1,425 @@ +module BuildGraphMsg { + // Chapel modules. + use Reflection; + use Set; + use Time; + use Sort; + use List; + use ReplicatedDist; + + // Package modules. + use CopyAggregation; + + // Arachne Modules. + use Utils; + use GraphArray; + use SegmentedString; + + // Arkouda modules. + use MultiTypeSymbolTable; + use MultiTypeSymEntry; + use ServerConfig; + use ServerErrors; + use ServerErrorStrings; + use ArgSortMsg; + use AryUtil; + use Logging; + use Message; + + // Server message logger. + private config const logLevel = ServerConfig.logLevel; + private config const logChannel = ServerConfig.logChannel; + const bgmLogger = new Logger(logLevel, logChannel); + var outMsg:string; + + /** + * Convert akarrays to a graph object. + * + * cmd: operation to perform. + * msgArgs: arugments passed to backend. + * SymTab: symbol table used for storage. + * + * returns: message back to Python. + */ + proc addEdgesFromMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + param pn = Reflection.getRoutineName(); + + // Parse the message from the Python front-end. + var akarray_srcS = msgArgs.getValueOf("AkArraySrc"); + var akarray_dstS = msgArgs.getValueOf("AkArrayDst"); + var akarray_vmapS = msgArgs.getValueOf("AkArrayVmap"); + var akarray_segS = msgArgs.getValueOf("AkArraySeg"); + var akarray_weightS = msgArgs.getValueOf("AkArrayWeight"); + var weightedS = msgArgs.getValueOf("Weighted"); + var directedS = msgArgs.getValueOf("Directed"); + var num_verticesS = msgArgs.getValueOf("NumVertices"); + var num_edgesS = msgArgs.getValueOf("NumEdges"); + + var propertied:bool; + if msgArgs.contains("IsPropGraph") { + propertied = true; + } + + // Extract the names of the arrays and the data for the non-array variables. + var src_name:string = (akarray_srcS:string); + var dst_name:string = (akarray_dstS:string); + var vmap_name:string = (akarray_vmapS:string); + var seg_name:string = (akarray_segS:string); + var weight_name:string = (akarray_weightS:string); + + var weighted:bool; + weightedS = weightedS.toLower(); + weighted = weightedS:bool; + + var directed:bool; + directedS = directedS.toLower(); + directed = directedS:bool; + + var num_vertices:int; + num_vertices = num_verticesS:int; + + var num_edges:int; + num_edges = num_edgesS:int; + + // Timer for populating the graph data structure. + var timer:stopwatch; + timer.start(); + + // Get the symbol table entries for the edge, weight, and node map arrays. + var akarray_src_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(src_name, st); + var akarray_dst_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(dst_name, st); + var akarray_vmap_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(vmap_name, st); + var akarray_seg_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(seg_name, st); + var akarray_weight_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(weight_name, st); + + // Extract the data for use. + var akarray_src_sym = toSymEntry(akarray_src_entry,int); + var src = akarray_src_sym.a; + + var akarray_dst_sym = toSymEntry(akarray_dst_entry,int); + var dst = akarray_dst_sym.a; + + var akarray_vmap_sym = toSymEntry(akarray_vmap_entry, int); + var vmap = akarray_vmap_sym.a; + + var akarray_seg_sym = toSymEntry(akarray_seg_entry, int); + var segments = akarray_seg_sym.a; + + var graph = new shared SegGraph(num_vertices, num_edges, directed, weighted, propertied); + graph.withComp(new shared SymEntry(src):GenSymEntry, "SRC") + .withComp(new shared SymEntry(dst):GenSymEntry, "DST") + .withComp(new shared SymEntry(segments):GenSymEntry, "SEGMENTS") + .withComp(new shared SymEntry(vmap):GenSymEntry, "NODE_MAP"); + + if weighted { + select akarray_weight_entry.dtype { + when DType.Int64 { + var akarray_weight_sym = toSymEntry(akarray_weight_entry, int); + var e_weight = akarray_weight_sym.a; + graph.withComp(new shared SymEntry(e_weight):GenSymEntry, "EDGE_WEIGHT"); + } + when DType.UInt64 { + var akarray_weight_sym = toSymEntry(akarray_weight_entry, uint); + var e_weight = akarray_weight_sym.a; + graph.withComp(new shared SymEntry(e_weight):GenSymEntry, "EDGE_WEIGHT"); + } + when DType.Float64 { + var akarray_weight_sym = toSymEntry(akarray_weight_entry, real); + var e_weight = akarray_weight_sym.a; + graph.withComp(new shared SymEntry(e_weight):GenSymEntry, "EDGE_WEIGHT"); + } + otherwise { + var errorMsg = notImplementedError(pn, akarray_weight_entry.dtype); + bgmLogger.error(getModuleName(), getRoutineName(), getLineNumber(), errorMsg); + return new MsgTuple(errorMsg, MsgType.ERROR); + } + } + } + + // Create the ranges array that keeps track of the vertices the edge arrays store on each + // locale. + var D_sbdmn = {0..numLocales-1} dmapped Replicated(); + var ranges : [D_sbdmn] (int,locale); + + // Write the local subdomain low value to the ranges array. + coforall loc in Locales { + on loc { + var low_vertex = src[src.localSubdomain().low]; + + coforall rloc in Locales do on rloc { + ranges[loc.id] = (low_vertex,loc); + } + } + } + graph.withComp(new shared SymEntry(ranges):GenSymEntry, "RANGES"); + + // Add graph to the specific symbol table entry. + var graphEntryName = st.nextName(); + var graphSymEntry = new shared GraphSymEntry(graph); + st.addEntry(graphEntryName, graphSymEntry); + var repMsg = graphEntryName; + + // Print out the length of time it takes to read in and build a known graph file. + timer.stop(); + outMsg = "Building graph from two edge arrays took " + timer.elapsed():string + " sec"; + + // Print out debug information to arkouda server output. + bgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),outMsg); + bgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + + return new MsgTuple(repMsg, MsgType.NORMAL); + } // end of addEdgesFromMsg + + /** + * Convert akarrays to a graph object so it can be compatible with functionality that used the + * original graph data structure. + * + * cmd: operation to perform. + * msgArgs: arugments passed to backend. + * SymTab: symbol table used for storage. + * + * returns: message back to Python. + */ + proc addEdgesFromCompatMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + param pn = Reflection.getRoutineName(); + + // Parse the message from the Python front-end. + var akarray_srcS = msgArgs.getValueOf("AkArraySrc"); + var akarray_dstS = msgArgs.getValueOf("AkArrayDst"); + var akarray_srcRS = msgArgs.getValueOf("AkArraySrcR"); + var akarray_dstRS = msgArgs.getValueOf("AkArrayDstR"); + var akarray_neiS = msgArgs.getValueOf("AkArrayNei"); + var akarray_neiRS = msgArgs.getValueOf("AkArrayNeiR"); + var akarray_start_iS = msgArgs.getValueOf("AkArrayStartIdx"); + var akarray_start_iRS = msgArgs.getValueOf("AkArrayStartIdxR"); + var akarray_vmapS = msgArgs.getValueOf("AkArrayVmap"); + var weightedS = msgArgs.getValueOf("Weighted"); + var directedS = msgArgs.getValueOf("Directed"); + var num_verticesS = msgArgs.getValueOf("NumVertices"); + var num_edgesS = msgArgs.getValueOf("NumEdges"); + + var propertied:bool; + if msgArgs.contains("IsPropGraph") { + propertied = true; + } + + // Extract the names of the arrays and the data for the non-array variables. + var src_name:string = (akarray_srcS:string); + var dst_name:string = (akarray_dstS:string); + var srcR_name:string = (akarray_srcRS:string); + var dstR_name:string = (akarray_dstRS:string); + var nei_name:string = (akarray_neiS:string); + var neiR_name:string = (akarray_neiRS:string); + var start_i_name:string = (akarray_start_iS:string); + var start_iR_name:string = (akarray_start_iRS:string); + var vmap_name:string = (akarray_vmapS:string); + + var weighted:bool; + weightedS = weightedS.toLower(); + weighted = weightedS:bool; + + var directed:bool; + directedS = directedS.toLower(); + directed = directedS:bool; + + var num_vertices:int; + num_vertices = num_verticesS:int; + + var num_edges:int; + num_edges = num_edgesS:int; + + // Timer for populating the graph data structure. + var timer:stopwatch; + timer.start(); + + // Get the symbol table entries for the edge, weight, and node map arrays. + var akarray_src_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(src_name, st); + var akarray_dst_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(dst_name, st); + var akarray_srcR_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(srcR_name, st); + var akarray_dstR_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(dstR_name, st); + var akarray_nei_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(nei_name, st); + var akarray_neiR_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(neiR_name, st); + var akarray_start_i_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(start_i_name, st); + var akarray_start_iR_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(start_iR_name, st); + var akarray_vmap_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(vmap_name, st); + + // Extract the data for use. + var akarray_src_sym = toSymEntry(akarray_src_entry,int); + var src = akarray_src_sym.a; + + var akarray_dst_sym = toSymEntry(akarray_dst_entry,int); + var dst = akarray_dst_sym.a; + + var akarray_srcR_sym = toSymEntry(akarray_srcR_entry,int); + var srcR = akarray_srcR_sym.a; + + var akarray_dstR_sym = toSymEntry(akarray_dstR_entry,int); + var dstR = akarray_dstR_sym.a; + + var akarray_nei_sym = toSymEntry(akarray_nei_entry,int); + var nei = akarray_nei_sym.a; + + var akarray_neiR_sym = toSymEntry(akarray_neiR_entry,int); + var neiR = akarray_neiR_sym.a; + + var akarray_start_i_sym = toSymEntry(akarray_start_i_entry,int); + var start_i = akarray_start_i_sym.a; + + var akarray_start_iR_sym = toSymEntry(akarray_start_iR_entry,int); + var start_iR = akarray_start_iR_sym.a; + + var akarray_vmap_sym = toSymEntry(akarray_vmap_entry, int); + var vmap = akarray_vmap_sym.a; + + var graph = new shared SegGraph(num_vertices, num_edges, directed, weighted, propertied); + graph.reversed = true; + graph.withComp(new shared SymEntry(src):GenSymEntry, "SRC") + .withComp(new shared SymEntry(dst):GenSymEntry, "DST") + .withComp(new shared SymEntry(srcR):GenSymEntry, "SRC_R") + .withComp(new shared SymEntry(dstR):GenSymEntry, "DST_R") + .withComp(new shared SymEntry(nei):GenSymEntry, "NEIGHBOR") + .withComp(new shared SymEntry(neiR):GenSymEntry, "NEIGHBOR_R") + .withComp(new shared SymEntry(start_i):GenSymEntry, "START_IDX") + .withComp(new shared SymEntry(start_iR):GenSymEntry, "START_IDX_R") + .withComp(new shared SymEntry(vmap):GenSymEntry, "NODE_MAP"); + + // Add graph to the specific symbol table entry. + var graphEntryName = st.nextName(); + var graphSymEntry = new shared GraphSymEntry(graph); + st.addEntry(graphEntryName, graphSymEntry); + var repMsg = graphEntryName; + + // Print out the length of time it takes to read in and build a known graph file. + timer.stop(); + outMsg = "Building graph from two edge arrays COMPAT took " + timer.elapsed():string + " sec"; + + // Print out debug information to arkouda server output. + bgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),outMsg); + bgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + + return new MsgTuple(repMsg, MsgType.NORMAL); + } // end of addEdgesFromCompatMsg + + /** + * Read in a matrix market file to pdarrays to eventually build a graph. + * + * cmd: operation to perform. + * msgArgs: arugments passed to backend. + * SymTab: symbol table used for storage. + * + * returns: message back to Python. + */ + proc readMatrixMarketFileMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + // Parse the message from Python to extract needed data. + var pathS = msgArgs.getValueOf("Path"); + var directedS = msgArgs.getValueOf("Directed"); + + // Converted parsed messages to the needed data types for Chapel operations. + var path:string = (pathS:string); + + var directed:bool; + directedS = directedS.toLower(); + directed = (directedS:bool); + + // Check to see if the file can be opened correctly. + try { + var f = open(path, ioMode.r); + f.close(); + } catch { + smLogger.error(getModuleName(),getRoutineName(),getLineNumber(), "Error opening file."); + } + + // Start parsing through the file. + var f = open(path, ioMode.r); + var r = f.reader(kind = ionative); + var line:string; + var a,b,c:string; + + // Prase through the matrix market file header to get number of rows, columns, and entries. + while (r.readLine(line)) { + if (line[0] == "%") { + continue; + } + else { + var temp = line.split(); + a = temp[0]; + b = temp[1]; + c = temp[2]; + break; + } + } + var rows = a:int; + var cols = b:int; + var entries = c:int; + + // Make the src and dst arrays to build the graph out of, they will all be of size entries. + var src = makeDistArray(entries, int); + var dst = makeDistArray(entries, int); + var wgt = makeDistArray(entries, real); + + // Read the next line to see if there are three columns, if so the graph is weighted. + r.readLine(line); + var temp = line.split(); + var weighted = false; + var ind = 0; + if temp.size != 3 { + src[ind] = temp[0]:int; + dst[ind] = temp[1]:int; + } else { + src[ind] = temp[0]:int; + dst[ind] = temp[1]:int; + wgt[ind] = temp[2]:real; + weighted = true; + } + ind += 1; + + // Now, read the rest of the file. The reading will be carried out by the head locale, which + // will typically be locale0. Therefore, we will create some aggregators for when locale0 + // has to write to remote data. + var edge_agg = new DstAggregator(int); + var wgt_agg = new DstAggregator(real); + while (r.readLine(line)) { + var temp = line.split(); + if !weighted { + edge_agg.copy(src[ind], temp[0]:int); + edge_agg.copy(dst[ind], temp[1]:int); + + } else { + edge_agg.copy(src[ind], temp[0]:int); + edge_agg.copy(dst[ind], temp[1]:int); + wgt_agg.copy(wgt[ind], temp[2]:real); + } + ind += 1; + } + + // Add the read arrays into the symbol table. + var src_name = st.nextName(); + var src_entry = new shared SymEntry(src); + st.addEntry(src_name, src_entry); + + var dst_name = st.nextName(); + var dst_entry = new shared SymEntry(dst); + st.addEntry(dst_name, dst_entry); + + var wgt_name = st.nextName(); + var wgt_entry = new shared SymEntry(wgt); + st.addEntry(wgt_name, wgt_entry); + + // Write the reply message back to Python. + var repMsg = "created " + st.attrib(src_name) + "+ created " + st.attrib(dst_name); + if weighted { + repMsg += "+ created " + st.attrib(wgt_name); + } else { + repMsg += "+ nil"; + } + + return new MsgTuple(repMsg, MsgType.NORMAL); + } // end of readMatrixMarketFileMsg + + + + use CommandMap; + registerFunction("addEdgesFrom", addEdgesFromMsg, getModuleName()); + registerFunction("addEdgesFromCompat", addEdgesFromCompatMsg, getModuleName()); + registerFunction("readMatrixMarketFile", readMatrixMarketFileMsg, getModuleName()); +} \ No newline at end of file diff --git a/arachne_development/server/GenTrussMsg.py b/arachne_development/server/GenTrussMsg.py index d634c2e7..f6e16f33 100644 --- a/arachne_development/server/GenTrussMsg.py +++ b/arachne_development/server/GenTrussMsg.py @@ -29,19 +29,19 @@ ParametersBool='''(kvalue:int,nei:[?D1] int, start_i:[?D2] int,src:[?D3] int, dst:[?D4] int, neiR:[?D11] int, start_iR:[?D12] int,srcR:[?D13] int, dstR:[?D14] int, - TriCount:[?D5] int, EdgeDeleted:[?D6] int ):bool{ ''' + TriCount:[?D5] int, EdgeDeleted:[?D6] int ):bool throws{ ''' ParametersInt='''(kvalue:int,nei:[?D1] int, start_i:[?D2] int,src:[?D3] int, dst:[?D4] int, neiR:[?D11] int, start_iR:[?D12] int,srcR:[?D13] int, dstR:[?D14] int, - TriCount:[?D5] int, EdgeDeleted:[?D6] int ):int{ ''' + TriCount:[?D5] int, EdgeDeleted:[?D6] int ):int throws{ ''' ParametersBoolAtomic='''(kvalue:int,nei:[?D1] int, start_i:[?D2] int,src:[?D3] int, dst:[?D4] int, neiR:[?D11] int, start_iR:[?D12] int,srcR:[?D13] int, dstR:[?D14] int, - TriCount:[?D5] atomic int, EdgeDeleted:[?D6] int ):bool{ ''' + TriCount:[?D5] atomic int, EdgeDeleted:[?D6] int ):bool throws{ ''' ParametersIntAtomic='''(kvalue:int,nei:[?D1] int, start_i:[?D2] int,src:[?D3] int, dst:[?D4] int, neiR:[?D11] int, start_iR:[?D12] int,srcR:[?D13] int, dstR:[?D14] int, - TriCount:[?D5] atomic int, EdgeDeleted:[?D6] int ):int{ ''' + TriCount:[?D5] atomic int, EdgeDeleted:[?D6] int ):int throws{ ''' ConditionEdgeRemove=''' if (EdgeDeleted[i]==-1) { diff --git a/arachne_development/server/GraphArray.chpl b/arachne_development/server/GraphArray.chpl index 02793d0d..5935f6ec 100644 --- a/arachne_development/server/GraphArray.chpl +++ b/arachne_development/server/GraphArray.chpl @@ -1,35 +1,59 @@ module GraphArray { - use AryUtil; + // Chapel modules. + use Map; + use Reflection; + use Utils; + + // Arkouda modules. + use Logging; use MultiTypeSymEntry; use MultiTypeSymbolTable; + + use AryUtil; use ServerConfig; use Reflection; - use Logging; use ServerErrors; use NumPyDType; - use Map; - + // Server message logger. private config const logLevel = LogLevel.DEBUG; const graphLogger = new Logger(logLevel); - // These are the component Key names stored in our components map + // Component key names to be stored stored in the components map for future retrieval enum Component { - SRC=1, // The source of every edge in the graph,array value - SRC_R=2, // Reverse of SRC - DST=3, // The destination of every vertex in the graph,array value - DST_R=4, // Reverse of DST - START_IDX, // The starting index of every vertex in src and dst - START_IDX_R, // Reverse of START_IDX - NEIGHBOR, // Numer of neighbors for a vertex - NEIGHBOR_R, // Numer of neighbors for a vertex based on the reverse array - A_START_IDX, // The starting index of every vertex in src and dst, aligned array based on src - A_START_IDX_R, // Reverse of START_IDX, aligned array based on src - A_NEIGHBOR, // Numer of neighbors for a vertex, aligned array based on src - A_NEIGHBOR_R, // Numer of neighbors for a vertex based on the reverse array, aligned array based on src + SRC, // The source array of every edge in the graph + DST, // The destination array of every edge in the graph + SEGMENTS, // The segments of adjacency lists for each vertex in DST + RANGES, // Keeps the range of the vertices the edge list stores per locale + EDGE_WEIGHT, // Stores the edge weights of the graph, if applicable + NODE_MAP, // Doing an index of NODE_MAP[u] gives you the original value of u + VERTEX_LABELS, // Any labels that belong to a specific node + VERTEX_LABELS_MAP, // Sorted array of vertex labels to integer id (array index) + EDGE_RELATIONSHIPS, // The relationships that belong to specific edges + EDGE_RELATIONSHIPS_MAP, // Sorted array of edge relationships to integer id (array index) + VERTEX_PROPS, // Any properties that belong to a specific node + VERTEX_PROPS_COL_MAP, // Sorted array of vertex property to integer id (array index) + VERTEX_PROPS_DTYPE_MAP, // Sorted array of column datatype to integer id (array index) + VERTEX_PROPS_COL2DTYPE, // Map of column names to the datatype of the column + EDGE_PROPS, // Any properties that belong to a specific edge + EDGE_PROPS_COL_MAP, // Sorted array of edge property to integer id (array index) + EDGE_PROPS_DTYPE_MAP, // Sorted array of column datatype to integer id (array index) + EDGE_PROPS_COL2DTYPE, // Map of column names to the datatype of the column + + // TEMPORARY COMPONENTS BELOW FOR UNDIRECTED GRAPHS TO ENSURE COMPATIBILTIY WITH OLD CODE. + // We want to phase out the need for reversed arrays in undirected graph algorithms. + // Includes: connected components, triangle counting, k-truss, and triangle centrality. + SRC_R, // DST array + DST_R, // SRC array + START_IDX, // Starting indices of vertices in SRC + START_IDX_R, // Starting indices of vertices in SRC_R + NEIGHBOR, // Number of neighbors for a given vertex based off SRC and DST + NEIGHBOR_R, // Number of neighbors for a given vertex based off SRC_R and DST_R + + + A_SRC_R, // Reverse of SRC, aligned array based on srcR A_DST_R, // Reverse of DST, aligned array based on dstR - EDGE_WEIGHT, // Edge weight VERTEX_WEIGHT, // Vertex weight VTrack, // track the vertex ID from the normalized ID to the original ID VP1, // The first vertex property @@ -70,11 +94,31 @@ module GraphArray { /* Total number of edges */ var n_edges : int; - /* The graph is directed (True) or undirected (False)*/ + + // The graph is directed (true) or undirected (false) var directed : bool; - /* The graph is weighted (True) or unweighted (False)*/ - //var weighted : bool; + // The graph is weighted (true) or unweighted (false) + var weighted: bool; + + // The graph is a property graph (true) or not (false) + var propertied: bool; + + // Undirected graphs are in the old format (true) or not (false) + var reversed: bool = false; + + /** + * Init the basic graph object, we'll compose the pieces using the withComp method. + */ + proc init(num_v:int, num_e:int, directed:bool, weighted:bool, propertied:bool) { + this.n_vertices = num_v; + this.n_edges = num_e; + this.directed = directed; + this.weighted = weighted; + this.propertied = propertied; + } + + /** * Init the basic graph object, we'll compose the pieces in @@ -86,7 +130,17 @@ module GraphArray { this.directed = directed; } + + proc isDirected():bool { return this.directed; } + proc isWeighted():bool { return this.weighted; } + proc isPropertied():bool { return this.propertied; } + + proc withComp(a:shared GenSymEntry, atrname:string):SegGraph throws { components.add(atrname:Component, a); return this; } + proc hasComp(atrname:string):bool throws { return components.contains(atrname:Component); } + proc getComp(atrname:string):GenSymEntry throws { return components[atrname:Component]; } + + /* Use the withCOMPONENT methods to compose the graph object */ proc withEnumCom(a:shared GenSymEntry, atrname:Component):SegGraph { components.add(atrname, a); return this; } @@ -94,16 +148,6 @@ module GraphArray { proc getEnumCom( atrname:Component){return components.getBorrowed(atrname); } proc withATR(a:shared GenSymEntry, atrname:int):SegGraph { components.add(atrname, a); -/* - select atrname { - when 1 do - components.add(Component.SRC, a); - when 2 do - components.add(Component.SRC_R, a); - when 3 do - components.add(Component.DST, a); - } -*/ return this; } proc withSRC(a:shared GenSymEntry):SegGraph { components.add(Component.SRC, a); return this; } @@ -213,6 +257,7 @@ module GraphArray { * GraphSymEntry is the wrapper class around SegGraph * so it may be stored in the Symbol Table (SymTab) */ + /* class GraphSymEntry:CompositeSymEntry { //var dtype = NumPyDType.DType.UNDEF; //type etype = int; @@ -232,14 +277,66 @@ module GraphArray { } + */ + /** - * Convenience proc to retrieve GraphSymEntry from SymTab - * Performs conversion from AbstractySymEntry to GraphSymEntry - */ + * GraphSymEntry is the wrapper class around SegGraph so it may be stored in + * the Symbol Table (SymTab). + */ + class GraphSymEntry : CompositeSymEntry { + var graph: shared SegGraph; + + proc init(segGraph: shared SegGraph) { + super.init(); + this.entryType = SymbolEntryType.CompositeSymEntry; + assignableTypes.add(this.entryType); + this.graph = segGraph; + } + override proc getSizeEstimate(): int { + return 1; + } + } + + + class SymEntryAD : GenSymEntry { + var aD: domain(int); + var a: [aD] int; + + proc init(associative_array: [?associative_domain] int) { + super.init(int); + this.aD = associative_domain; + this.a = associative_array; + } + } + + class MapSymEntry : GenSymEntry { + var stored_map: map(string, string); + + proc init(ref map_to_store: map(string, string)) { + super.init(string); + this.stored_map = map_to_store; + } + } + + + proc toMapSymEntry(e) { + return try! e : borrowed MapSymEntry; + } + + proc toSymEntryAD(e) { + return try! e : borrowed SymEntryAD(); + } + + + + /** + * Convenience proc to retrieve GraphSymEntry from SymTab. + * Performs conversion from AbstractySymEntry to GraphSymEntry. + */ proc getGraphSymEntry(name:string, st: borrowed SymTab): borrowed GraphSymEntry throws { var abstractEntry:borrowed AbstractSymEntry = st.lookup(name); - if ! abstractEntry.isAssignableTo(SymbolEntryType.CompositeSymEntry) { + if !abstractEntry.isAssignableTo(SymbolEntryType.CompositeSymEntry) { var errorMsg = "Error: SymbolEntryType %s is not assignable to CompositeSymEntry".format(abstractEntry.entryType); graphLogger.error(getModuleName(),getRoutineName(),getLineNumber(),errorMsg); throw new Error(errorMsg); @@ -247,6 +344,16 @@ module GraphArray { return (abstractEntry: borrowed GraphSymEntry); } + /** + * Helper proc to cast AbstractSymEntry to GraphSymEntry. + */ + proc toGraphSymEntry(entry: borrowed AbstractSymEntry): borrowed GraphSymEntry throws { + return (entry: borrowed GraphSymEntry); + } + + + + /** * Convenience proc to retrieve DomArraySymEntry from SymTab * Performs conversion from AbstractySymEntry to DomArraySymEntry @@ -268,12 +375,5 @@ module GraphArray { return (entry: borrowed DomArraySymEntry); } - /** - * Helper proc to cat AbstractSymEntry to GraphSymEntry - */ - proc toGraphSymEntry(entry: borrowed AbstractSymEntry): borrowed GraphSymEntry throws { - return (entry: borrowed GraphSymEntry); - } - } diff --git a/arachne_development/server/GraphInfoMsg.chpl b/arachne_development/server/GraphInfoMsg.chpl new file mode 100644 index 00000000..f7c8953e --- /dev/null +++ b/arachne_development/server/GraphInfoMsg.chpl @@ -0,0 +1,111 @@ +module GraphInfoMsg { + // Chapel modules. + use Reflection; + use Set; + use Time; + use Sort; + use List; + + // Arachne Modules. + use Utils; + use GraphArray; + use SegmentedString; + + // Arkouda modules. + use MultiTypeSymbolTable; + use MultiTypeSymEntry; + use ServerConfig; + use ArgSortMsg; + use AryUtil; + use Logging; + use Message; + + // Server message logger. + private config const logLevel = LogLevel.DEBUG; + const smLogger = new Logger(logLevel); + var outMsg:string; + + /** + * Return the edge arrays for a particular graph for further analysis. + * + * cmd: operation to perform. + * msgArgs: arugments passed to backend. + * SymTab: symbol table used for storage. + * + * returns: message back to Python. + */ + proc edgesMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + // Parse the message from Python to extract needed data. + var graphEntryName = msgArgs.getValueOf("GraphName"); + var gEntry: borrowed GraphSymEntry = getGraphSymEntry(graphEntryName, st); + var ag = gEntry.graph; + + // Extract the edge arrays. + var timer:stopwatch; + timer.start(); + var src = toSymEntry(ag.getComp("SRC"), int).a; + var dst = toSymEntry(ag.getComp("DST"), int).a; + + // Add new copies of each to the symbol table. + var repMsg = ""; + var attrNameSrc = st.nextName(); + var attrEntrySrc = new shared SymEntry(src); + st.addEntry(attrNameSrc, attrEntrySrc); + repMsg += "created " + st.attrib(attrNameSrc) + "+ "; + + var attrNameDst = st.nextName(); + var attrEntryDst = new shared SymEntry(dst); + st.addEntry(attrNameDst, attrEntryDst); + repMsg += "created " + st.attrib(attrNameDst); + + timer.stop(); + outMsg = "Extracting edges takes " + timer.elapsed():string; + + // Print out debug information to arkouda server output. + smLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),outMsg); + smLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + + return new MsgTuple(repMsg, MsgType.NORMAL); + } // end of edgesMsg + + /** + * Return the nodes of a graph. + * + * cmd: operation to perform. + * msgArgs: arugments passed to backend. + * SymTab: symbol table used for storage. + * + * returns: message back to Python. + */ + proc nodesMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + // Parse the message from Python to extract needed data. + var graphEntryName = msgArgs.getValueOf("GraphName"); + var gEntry: borrowed GraphSymEntry = getGraphSymEntry(graphEntryName, st); + var ag = gEntry.graph; + + // Extract the edge arrays. + var timer:stopwatch; + timer.start(); + var nodes = toSymEntry(ag.getComp("NODE_MAP"), int).a; + + // Add new copies of each to the symbol table. + var repMsg = ""; + var attrName = st.nextName(); + var attrEntry = new shared SymEntry(nodes); + st.addEntry(attrName, attrEntry); + repMsg += "created " + st.attrib(attrName) + "+ "; + + timer.stop(); + outMsg = "Extracting nodes takes " + timer.elapsed():string; + + // Print out debug information to arkouda server output. + smLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),outMsg); + smLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + + return new MsgTuple(repMsg, MsgType.NORMAL); + } // end of nodesMsg + + use CommandMap; + registerFunction("edges", edgesMsg, getModuleName()); + registerFunction("nodes", nodesMsg, getModuleName()); +} \ No newline at end of file diff --git a/arachne_development/server/PropertyGraphMsg.chpl b/arachne_development/server/PropertyGraphMsg.chpl new file mode 100644 index 00000000..c7b98cc2 --- /dev/null +++ b/arachne_development/server/PropertyGraphMsg.chpl @@ -0,0 +1,1351 @@ +module DipSLLPropertyGraphMsg { + // Chapel modules. + use Reflection; + use Time; + use Sort; + use Map; + use BlockDist; + use CommDiagnostics; + + // Arachne Modules. + use Utils; + use GraphArray; + use SymEntry2D; + + // Arkouda modules. + use MultiTypeSymbolTable; + use MultiTypeSymEntry; + use NumPyDType; + use ServerConfig; + use ServerErrors; + use ServerErrorStrings; + use SegmentedString; + use ArgSortMsg; + use AryUtil; + use Logging; + use Message; + + // Server message logger. + private config const logLevel = LogLevel.DEBUG; + const pgmLogger = new Logger(logLevel); + var outMsg:string; + + /* Wrapper concrete class for generic class. */ + class GenProperty { + var dataType: int; + } + + /* Wrapped generic class to hold arrays of variable size and type. */ + class Property: GenProperty { + type etype; + var propertyIdentifier: domain(int); + var propertyValue: [propertyIdentifier] etype; + } + + /** + * Adds node labels to the nodes of a property graph. + * + * :arg cmd: operation to perform. + * :type cmd: string + * :arg msgArgs: arguments passed to backend. + * :type msgArgs: borrowed MessageArgs + * :arg st: symbol table used for storage. + * :type st: borrowed SymTab + * + * :returns: MsgTuple + */ + proc addNodeLabelsMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + // Parse the message from Python to extract needed data. + var graphEntryName = msgArgs.getValueOf("GraphName"); + var arrays = msgArgs.getValueOf("Arrays"); + + // Extract the names of the arrays storing the vertices and their labels. + var arrays_list = arrays.split(); + var input_vertices_name = arrays_list[0]; + var input_labels_name = arrays_list[1]; + var label_mapper_name = arrays_list[2]; + + // Extract the vertices containing labels to be inputted. + var input_vertices_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(input_vertices_name, st); + var input_vertices_sym = toSymEntry(input_vertices_entry, int); + var input_vertices = input_vertices_sym.a; + + // Extract the labels to be inputted for each of the vertices. + var input_labels_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(input_labels_name, st); + var input_labels_sym = toSymEntry(input_labels_entry, int); + var input_labels = input_labels_sym.a; + + // Extract the label mapper to be sent to each locale. + var label_mapper:SegString = getSegString(label_mapper_name, st); + + // Extract the graph we are operating with from the symbol table. + var gEntry: borrowed GraphSymEntry = getGraphSymEntry(graphEntryName, st); + var graph = gEntry.graph; + + // Extract the node_map array to get the internal vertex values for our graph. + var node_map = toSymEntry(graph.getComp("NODE_MAP"), int).a; + + // Create the array of domains that will store the labels for our vertices. + var vertex_labels: [node_map.domain] domain(int); + + var timer:stopwatch; + timer.start(); + forall i in input_vertices.domain { // for each input vertex, update its label list. + var lbl = input_labels[i]; // local + var u = input_vertices[i]; // local + vertex_labels[u] += lbl; // remote + } + timer.stop(); + + // Add the component for the node labels for the graph. + graph.withComp(new shared SymEntry(vertex_labels):GenSymEntry, "VERTEX_LABELS"); + graph.withComp(new shared SegStringSymEntry(label_mapper.offsets, label_mapper.values, string):GenSymEntry, "VERTEX_LABELS_MAP"); + var repMsg = "labels added"; + outMsg = "DipSLLaddNodeLabels took " + timer.elapsed():string + " sec "; + + // Print out debug information to arkouda server output. + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),outMsg); + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + + return new MsgTuple(repMsg, MsgType.NORMAL); + } // end of addNodeLabelsMsg + + /** + * Adds node properties to the nodes of a property graph. + * + * :arg cmd: operation to perform. + * :type cmd: string + * :arg msgArgs: arguments passed to backend. + * :type msgArgs: borrowed MessageArgs + * :arg st: symbol table used for storage. + * :type st: borrowed SymTab + * + * :returns: MsgTuple + */ + proc addNodePropertiesMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + param pn = Reflection.getRoutineName(); + + // Parse the message from Python to extract needed data. + var graphEntryName = msgArgs.getValueOf("GraphName"); + var vertexIdsName = msgArgs.getValueOf("VertexIdsName"); + var propertyMapperName = msgArgs.getValueOf("PropertyMapperName"); + var dataArrayNames = msgArgs.getValueOf("DataArrayNames"); + + // Extract the graph we are operating with from the symbol table. + var gEntry: borrowed GraphSymEntry = getGraphSymEntry(graphEntryName, st); + var graph = gEntry.graph; + var node_map = toSymEntry(graph.getComp("NODE_MAP"), int).a; + + // Extract the vertices containing labels to be inputted. + var inputVerticesEntry: borrowed GenSymEntry = getGenericTypedArrayEntry(vertexIdsName, st); + var inputVerticesSym = toSymEntry(inputVerticesEntry, int); + var inputVertices = inputVerticesSym.a; + + // Extract property mappers from message, the first one contains column names in their + // regular order, the second contains the internal mapping for the property names. + var columns:SegString = getSegString(propertyMapperName, st); + + // Create map of column name to its datatype. + var col2dtype = new map(string, string); + + // Extract the data array names and the data types for those arrays. + var dataArrays = getSegString(dataArrayNames, st); + var dataTypeSet: domain(string); + for i in 0.. 0 { + var label_set_here = labels_to_find_set_dist[here.id]; + var intersection = u_label_set & label_set_here; + if intersection.size > 0 then return_array[u] = true; + } + } + } + when "and" { + forall (u, u_label_set) in zip(node_labels.domain, node_labels) with (ref return_array) { + var label_set_here = labels_to_find_set_dist[here.id]; + if u_label_set.size > 0 { + if u_label_set.contains(label_set_here) { + return_array[u] = true; + } + } + } + } + otherwise { + var errorMsg = notImplementedError(pn, op); + pgmLogger.error(getModuleName(), getRoutineName(), getLineNumber(), errorMsg); + return new MsgTuple(errorMsg, MsgType.ERROR); + } + } + timer.stop(); + var time_msg = "label query DIP-SLL took " + timer.elapsed():string + " sec"; + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),time_msg); + + var retName = st.nextName(); + var retEntry = new shared SymEntry(return_array); + st.addEntry(retName, retEntry); + var repMsg = 'created ' + st.attrib(retName); + + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + return new MsgTuple(repMsg, MsgType.NORMAL); + } //end of queryLabelsMsg + + /** + * Queries the property graph and returns a boolean array indicating which nodes match the query + * operation. + * + * :arg cmd: operation to perform. + * :type cmd: string + * :arg msgArgs: arguments passed to backend. + * :type msgArgs: borrowed MessageArgs + * :arg st: symbol table used for storage. + * :type st: borrowed SymTab + * + * :returns: MsgTuple + */ + proc queryNodePropertiesMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + param pn = Reflection.getRoutineName(); + + // Parse the message from Python to extract needed data. + var graphEntryName = msgArgs.getValueOf("GraphName"); + var column = msgArgs.getValueOf("Column"); + var value = msgArgs.getValueOf("Value"); + var op = msgArgs.getValueOf("Op"); + + // Extract graph data for usage in this function. + var gEntry: borrowed GraphSymEntry = getGraphSymEntry(graphEntryName, st); + var graph = gEntry.graph; + var vertex_props = toSymEntry2D(graph.getComp("VERTEX_PROPS"), shared GenProperty?).a; + const ref entry = toSegStringSymEntry(graph.getComp("VERTEX_PROPS_COL_MAP")); + var vertex_props_col_map = assembleSegStringFromParts(entry.offsetsEntry, entry.bytesEntry, st); + var vertex_props_dtype_map = toSymEntry(graph.getComp("VERTEX_PROPS_DTYPE_MAP"), string).a; + var vertex_props_col2dtype = toMapSymEntry(graph.getComp("VERTEX_PROPS_COL2DTYPE")).stored_map; + var return_array : [makeDistDom(graph.n_vertices)] bool; + var dtype = vertex_props_col2dtype[column]; + + var colId = 0; + for i in 0.." { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] > value:real then return_array[u] = true; + } + } + } + when "<" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] < value:real then return_array[u] = true; + } + } + } + when "<=" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] <= value:real then return_array[u] = true; + } + } + } + when ">=" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] >= value:real then return_array[u] = true; + } + } + } + when "==" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] == value:real then return_array[u] = true; + } + } + } + when "<>" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] != value:real then return_array[u] = true; + } + } + } + } + } + when "int64" { + select op { + when ">" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] > value:int then return_array[u] = true; + } + } + } + when "<" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] < value:int then return_array[u] = true; + } + } + } + when "<=" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] <= value:int then return_array[u] = true; + } + } + } + when ">=" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] >= value:int then return_array[u] = true; + } + } + } + when "==" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] == value:int then return_array[u] = true; + } + } + } + when "<>" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] != value:int then return_array[u] = true; + } + } + } + } + } + when "uint64" { + select op { + when ">" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] > value:uint then return_array[u] = true; + } + } + } + when "<" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] < value:uint then return_array[u] = true; + } + } + } + when "<=" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] <= value:uint then return_array[u] = true; + } + } + } + when ">=" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] >= value:uint then return_array[u] = true; + } + } + } + when "==" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] == value:uint then return_array[u] = true; + } + } + } + when "<>" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] != value:uint then return_array[u] = true; + } + } + } + } + } + when "bool" { + var inner_value = value.toLower():bool; + select op { + when "==" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(bool)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] == inner_value then return_array[u] = true; + } + } + } + when "<>" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(bool)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] != inner_value then return_array[u] = true; + } + } + } + } + } + when "str" { + select op { + when "contains" { + forall (u,d) in vertex_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = vertex_props[u,d].borrow():(borrowed Property(string)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId].find(value:string) != -1 then return_array[u] = true; + } + } + } + } + } + } + timer.stop(); + var time_msg = "node properties query took " + timer.elapsed():string + " sec"; + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),time_msg); + + var retName = st.nextName(); + var retEntry = new shared SymEntry(return_array); + st.addEntry(retName, retEntry); + var repMsg = 'created ' + st.attrib(retName); + + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + return new MsgTuple(repMsg, MsgType.NORMAL); + } //end of queryNodePropertiesMsg + + /** + * Queries the property graph and returns a boolean array indicating which edges contain the + * given relationships. + * + * :arg cmd: operation to perform. + * :type cmd: string + * :arg msgArgs: arguments passed to backend. + * :type msgArgs: borrowed MessageArgs + * :arg st: symbol table used for storage. + * :type st: borrowed SymTab + * + * :returns: MsgTuple + */ + proc queryRelationshipsMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + param pn = Reflection.getRoutineName(); + + // Parse the message from Python to extract needed data. + var graphEntryName = msgArgs.getValueOf("GraphName"); + var relationshipsToFindName = msgArgs.getValueOf("RelationshipsToFindName"); + var op = msgArgs.getValueOf("Op"); + + // Extract graph data for usage in this function. + var gEntry: borrowed GraphSymEntry = getGraphSymEntry(graphEntryName, st); + var graph = gEntry.graph; + var edge_relationships = toSymEntry(graph.getComp("EDGE_RELATIONSHIPS"), domain(int)).a; + + // Extract the array that contains the relationships we are looking for. + var relationships_to_find_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(relationshipsToFindName, st); + var relationships_to_find_sym = toSymEntry(relationships_to_find_entry, int); + var relationships_to_find = relationships_to_find_sym.a; + + // Convert array to associative domain to maintain the relationships to find. + var relationships_to_find_set : domain(int); + forall rel_id in relationships_to_find with (ref relationships_to_find_set) do relationships_to_find_set += rel_id; + var return_array : [edge_relationships.domain] bool; + + // Distribute the relationships_to_find_set to each locale. + var relationships_to_find_set_dist = makeDistArray(numLocales, domain(int)); + coforall loc in Locales do on loc { + relationships_to_find_set_dist[here.id] = relationships_to_find_set; + } + + // Search in parallel for the nodes that have the labesl to find. + var timer:stopwatch; + timer.start(); + select op { + when "or" { + forall (u, u_relationship_set) in zip(edge_relationships.domain, edge_relationships) with (ref return_array) { + if u_relationship_set.size != 0 { + var relationship_set_here = relationships_to_find_set_dist[here.id]; + var intersection = u_relationship_set & relationship_set_here; + if intersection.size > 0 then return_array[u] = true; + } + } + } + when "and" { + forall (u, u_relationship_set) in zip(edge_relationships.domain, edge_relationships) with (ref return_array) { + var relationship_set_here = relationships_to_find_set_dist[here.id]; + if u_relationship_set.size > 0 { + if u_relationship_set.contains(relationship_set_here) { + return_array[u] = true; + } + } + } + } + otherwise { + var errorMsg = notImplementedError(pn, op); + pgmLogger.error(getModuleName(), getRoutineName(), getLineNumber(), errorMsg); + return new MsgTuple(errorMsg, MsgType.ERROR); + } + } + timer.stop(); + var time_msg = "relationship query DIP-SLL took " + timer.elapsed():string + " sec"; + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),time_msg); + + var retName = st.nextName(); + var retEntry = new shared SymEntry(return_array); + st.addEntry(retName, retEntry); + var repMsg = 'created ' + st.attrib(retName); + + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + return new MsgTuple(repMsg, MsgType.NORMAL); + } //end of queryRelationshipsMsg + + /** + * Queries the property graph and returns a boolean array indicating which edges match the query + * operation. + * + * :arg cmd: operation to perform. + * :type cmd: string + * :arg msgArgs: arguments passed to backend. + * :type msgArgs: borrowed MessageArgs + * :arg st: symbol table used for storage. + * :type st: borrowed SymTab + * + * :returns: MsgTuple + */ + proc queryEdgePropertiesMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + param pn = Reflection.getRoutineName(); + + // Parse the message from Python to extract needed data. + var graphEntryName = msgArgs.getValueOf("GraphName"); + var column = msgArgs.getValueOf("Column"); + var value = msgArgs.getValueOf("Value"); + var op = msgArgs.getValueOf("Op"); + + // Extract graph data for usage in this function. + var gEntry: borrowed GraphSymEntry = getGraphSymEntry(graphEntryName, st); + var graph = gEntry.graph; + var edge_props = toSymEntry2D(graph.getComp("EDGE_PROPS"), shared GenProperty?).a; + const ref entry = toSegStringSymEntry(graph.getComp("EDGE_PROPS_COL_MAP")); + var edge_props_col_map = assembleSegStringFromParts(entry.offsetsEntry, entry.bytesEntry, st); + var edge_props_dtype_map = toSymEntry(graph.getComp("EDGE_PROPS_DTYPE_MAP"), string).a; + var edge_props_col2dtype = toMapSymEntry(graph.getComp("EDGE_PROPS_COL2DTYPE")).stored_map; + var return_array : [makeDistDom(graph.n_edges)] bool; + var dtype = edge_props_col2dtype[column]; + + var colId = 0; + for i in 0.." { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] > value:real then return_array[e] = true; + } + } + } + when "<" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] < value:real then return_array[e] = true; + } + } + } + when "<=" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] <= value:real then return_array[e] = true; + } + } + } + when ">=" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] >= value:real then return_array[e] = true; + } + } + } + when "==" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] == value:real then return_array[e] = true; + } + } + } + when "<>" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(real)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] != value:real then return_array[e] = true; + } + } + } + } + } + when "int64" { + select op { + when ">" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] > value:int then return_array[e] = true; + } + } + } + when "<" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] < value:int then return_array[e] = true; + } + } + } + when "<=" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] <= value:int then return_array[e] = true; + } + } + } + when ">=" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] >= value:int then return_array[e] = true; + } + } + } + when "==" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] == value:int then return_array[e] = true; + } + } + } + when "<>" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(int)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] != value:int then return_array[e] = true; + } + } + } + } + } + when "uint64" { + select op { + when ">" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] > value:uint then return_array[e] = true; + } + } + } + when "<" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] < value:uint then return_array[e] = true; + } + } + } + when "<=" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] <= value:uint then return_array[e] = true; + } + } + } + when ">=" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] >= value:uint then return_array[e] = true; + } + } + } + when "==" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] == value:uint then return_array[e] = true; + } + } + } + when "<>" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(uint)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] != value:uint then return_array[e] = true; + } + } + } + } + } + when "bool" { + var inner_value = value.toLower():bool; + select op { + when "==" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(bool)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] == inner_value then return_array[e] = true; + } + } + } + when "<>" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(bool)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId] != inner_value then return_array[e] = true; + } + } + } + } + } + when "str" { + select op { + when "contains" { + forall (e,d) in edge_props.domain[.., dtypeId..dtypeId] with (ref return_array, ref dtypeId, ref colId) { + var currentProperty = edge_props[e,d].borrow():(borrowed Property(string)); + if currentProperty.propertyValue.size > 0 { + if currentProperty.propertyValue[colId].find(value:string) != -1 then return_array[e] = true; + } + } + } + } + } + } + timer.stop(); + var time_msg = "edge properties query took " + timer.elapsed():string + " sec"; + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),time_msg); + + var retName = st.nextName(); + var retEntry = new shared SymEntry(return_array); + st.addEntry(retName, retEntry); + var repMsg = 'created ' + st.attrib(retName); + + pgmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + return new MsgTuple(repMsg, MsgType.NORMAL); + } //end of queryEdgePropertiesMsg + + use CommandMap; + registerFunction("addNodeLabels", addNodeLabelsMsg, getModuleName()); + registerFunction("addNodeProperties", addNodePropertiesMsg, getModuleName()); + registerFunction("addEdgeRelationships", addEdgeRelationshipsMsg, getModuleName()); + registerFunction("addEdgeProperties", addEdgePropertiesMsg, getModuleName()); + registerFunction("getNodeLabels", getNodeLabelsMsg, getModuleName()); + registerFunction("getNodeProperties", getNodePropertiesMsg, getModuleName()); + registerFunction("getEdgeRelationships", getEdgeRelationshipsMsg, getModuleName()); + registerFunction("getEdgeProperties", getEdgePropertiesMsg, getModuleName()); + registerFunction("queryLabels", queryLabelsMsg, getModuleName()); + registerFunction("queryNodeProperties", queryNodePropertiesMsg, getModuleName()); + registerFunction("queryRelationships", queryRelationshipsMsg, getModuleName()); + registerFunction("queryEdgeProperties", queryEdgePropertiesMsg, getModuleName()); +} \ No newline at end of file diff --git a/arachne_development/server/ServerModules.cfg b/arachne_development/server/ServerModules.cfg index 6d675bb7..e50c8fe3 100644 --- a/arachne_development/server/ServerModules.cfg +++ b/arachne_development/server/ServerModules.cfg @@ -8,3 +8,13 @@ GraphMsg TriCtrMsg JaccardMsg LCSMsg + + +BreadthFirstSearchMsg +BuildGraphMsg +GraphInfoMsg +PropertyGraphMsg +SquareCountMsg + +TriangleCountMsg +ConnectedComponentsMsg diff --git a/arachne_development/server/SquareCount.chpl b/arachne_development/server/SquareCount.chpl new file mode 100644 index 00000000..a234a60a --- /dev/null +++ b/arachne_development/server/SquareCount.chpl @@ -0,0 +1,78 @@ +module SquareCount { + // Arachne modules. + use GraphArray; + + // Arkouda modules. + use MultiTypeSymbolTable; + use MultiTypeSymEntry; + use ServerConfig; + use AryUtil; + + /** + * Total degree order operator u << v compares the degrees of the two nodes and returns + * true if the degree of u is less than the degree of v, or if equal, if the integer specifier + * of u is less than that of v. + * + * :arg u: vertex u + * :type u: int + * :arg v: vertex v + * :type v: int + * :arg degree: array containing degrees + * :type degree: [?D] int + * + * :returns: bool */ + inline proc nodeCompare(u: int, v: int, ref degree): bool { + if degree[u] < degree[v] then return true; + else if degree[u] == degree[v] && u < v then return true; + else return false; + } + + /** + * Sequential square counting for an undirected graph. + * + * :arg graph: SegGraph to run square counting on. + * :type graph: SegGraph + * :arg degree: degree sequence for each vertex u in the graph. + * :type degree: [?D] int + * + * :returns: int */ + proc squareCountSequential(graph:SegGraph, degree:[?D1] int):int throws { + var src = toSymEntry(graph.getComp("SRC"),int).a; + var dst = toSymEntry(graph.getComp("DST"),int).a; + var seg = toSymEntry(graph.getComp("SEGMENTS"),int).a; + + var square_count:int = 0; + var L : [0.. 0 { // If u in H is mapped to some vertex in G + // Check if there is an edge from u to v in H + var adj_list_of_u_from_H_start = segH[u]; + var adj_list_of_u_from_H_end = segH[u+1]; + var v_found = bin_search_v(dstH, adj_list_of_u_from_H_start, adj_list_of_u_from_H_end, v); + if v_found { + // Check if there is an edge from mapping[u] to mapping[v] in G + // And check if the edge labels are the same + var um = mapping[u]; + var vm = mapping[v]; + + var adj_list_of_um_from_G_start = segG[um]; + var adj_list_of_um_from_G_end = segG[um+1]; + var vm_found = bin_search_v(dstG, adj_list_of_um_from_G_start, adj_list_of_um_from_G_end, vm); + + if !vm_found || edgeRelationshipsH[v_found] != edgeRelationshipsG[vm_found] { + return false; + } + } + + // Check if there is an edge from v to u in H + var adj_list_of_v_from_H_start = segH[v]; + var adj_list_of_v_from_H_end = segH[v+1]; + var u_found = bin_search_v(dstH, adj_list_of_v_from_H_start, adj_list_of_v_from_H_end, u); + if u_found { + // Check if there is an edge from mapping[u] to mapping[v] in G + // And check if the edge labels are the same + var um = mapping[u]; + var vm = mapping[v]; + + var adj_list_of_vm_from_G_start = segG[vm]; + var adj_list_of_vm_from_G_end = segG[vm+1]; + var um_found = bin_search_v(dstG, adj_list_of_vm_from_G_start, adj_list_of_vm_from_G_end, um); + + if !um_found || edgeRelationshipsH[u_found] != edgeRelationshipsG[um_found] { + return false; + } + } + } + } + return true; + } + + // Recursive function for Ullmann's subgraph isomorphism algorithm + proc ullmannSubgraphIsomorphism11Helper(G: SegGraph, H: SegGraph, v: int, visited: [?D1] bool, mapping: [?D2] int, graphDegree: [?D3] int): list([1..H.n_vertices] int) throws { + var isomorphismList: list(list(int)); + + var localIsoList: list([1..H.n_vertices] int); // List to store the isomorphisms found in the current branch + var localIsoCounter = 0; // Count the number of isomorphisms found in the current branch + + writeln("$$$$$$ WE GET HERE 3"); + + for i in 0..G.n_vertices-1 { + writeln("$$$$$$ WE GET HERE 4"); + if (!visited[i] && graphDegree[i] >= 1) { + visited[i] = true; // Mark the vertex as visited + mapping[v] = i; // Add the vertex to the current mapping + // If the vertex can be added to the current mapping + if (isIsomorphic(G, H, v, mapping)) { + // If all vertices in H have been visited + if (v >= H.n_vertices-1) { + var isComplete = true; // Check if all vertices in the subgraph have been mapped + for j in 0..H.n_vertices-1 { + if (mapping[j] < 1) { + isComplete = false; + break; + } + } + // If the mapping is complete, add the current mapping to the isomorphism list + if (isComplete) { + localIsoList.pushBack(mapping); + } + } + else { + // Recursively call the function for the next vertex + var subIsoList = ullmannSubgraphIsomorphism11Helper(G, H, v+1, visited, mapping, graphDegree); + if (subIsoList.size > 0) { + // Add isomorphisms found in the sub-branch to the current isomorphism list + for isoMapping in subIsoList { + localIsoList.pushBack(isoMapping); + } + } + } + } + writeln("$$$$$$ WE GET HERE 5"); + // Backtrack: unvisit the vertex and remove it from the mapping + visited[i] = false; + mapping[v] = -1; + } + } + return localIsoList; // Return the list of isomorphisms found in the current branch + } // end of ullmannSubgraphIsomorphism11Helper + + // Ullmann's subgraph isomorphism algorithm + proc ullmannSubgraphIsomorphism11(G: SegGraph, H: SegGraph, subGraphVerticesSortedByDegree: [?D1] int, graphDegree: [?D2] int) throws { + // // Create an array to hold the vertices sorted by degree + // var subGraphVerticesSortedByDegree: [1..H.numVertices] int; + // for v in 1..H.numVertices { + // subGraphVerticesSortedByDegree[v] = v; + // } + + // // Sort the array based on degrees in descending order + // for i in 1..H.numVertices { + // for j in i+1..H.numVertices { + // if H.nodeDegree[subGraphVerticesSortedByDegree[i]] < H.nodeDegree[subGraphVerticesSortedByDegree[j]] { + // var tmp = subGraphVerticesSortedByDegree[i]; + // subGraphVerticesSortedByDegree[i] = subGraphVerticesSortedByDegree[j]; + // subGraphVerticesSortedByDegree[j] = tmp; + // } + // } + // } + + // Parallelize over the vertices of subGraph based on degree order! + // Check it realy changes the running time? I have doubt because of parallelism! + coforall idx in 0..H.n_vertices-1 { + var v = subGraphVerticesSortedByDegree[idx]; + var visited: [0..G.n_vertices-1] bool; // Array to keep track of visited vertices + var mapping: [0..H.n_vertices-1] int; // Array for the current mapping + mapping = -1; // Initialize the mapping array to -1 (indicating no mapping) + visited = false; // Initialize all vertices as unvisited + // Find isomorphisms for the current vertex v + writeln("$$$$$$ WE GET HERE 1"); + var subIsoList = ullmannSubgraphIsomorphism11Helper(G, H, v, visited, mapping, graphDegree); + writeln("$$$$$$ WE GET HERE 2"); + if (subIsoList.size > 0) { + // Print isomorphisms found by the current task without merging + //writeln("Isomorphisms found by task ", v, ":"); + var taskIsoCounter = 0; + for isoMapping in subIsoList { + taskIsoCounter += 1; + writeln("Isomorphism #", taskIsoCounter, ":"); + for k in 0..H.n_vertices-1 { + var mappedVertex = isoMapping[k]; + if (mappedVertex > 0) { + writeln("Subgraph vertex ", k, " -> Graph vertex ", mappedVertex); + } + } + //writeln("----"); + } + + // Add the number of isomorphisms found by the current task to the global counter + globalIsoCounter.add(taskIsoCounter); + } + } + + // Print the total number of isomorphisms found + writeln("Total isomorphisms found: ", globalIsoCounter.read()); + } // end of ullmannSubgraphIsomorphism11 +} // end of module \ No newline at end of file diff --git a/arachne_development/server/SubgraphIsomorphismMsg.chpl b/arachne_development/server/SubgraphIsomorphismMsg.chpl new file mode 100644 index 00000000..e5dc63e7 --- /dev/null +++ b/arachne_development/server/SubgraphIsomorphismMsg.chpl @@ -0,0 +1,97 @@ +module SubgraphIsomorphismMsg { + // Chapel modules. + use Reflection; + use Time; + + // Arachne modules. + use GraphArray; + use SubgraphIsomorphism; + + // Arkouda modules. + use MultiTypeSymbolTable; + use MultiTypeSymEntry; + use ServerConfig; + use ServerErrors; + use ServerErrorStrings; + use AryUtil; + use Logging; + use Message; + + // Server message logger. + private config const logLevel = ServerConfig.logLevel; + private config const logChannel = ServerConfig.logChannel; + const siLogger = new Logger(logLevel, logChannel); + + /** + * Run subgraph isomorphism with input graphs G and H, where we search for H inside of G. + * + * cmd: operation to perform. + * msgArgs: arugments passed to backend. + * SymTab: symbol table used for storage. + * + * returns: message back to Python. + */ + proc subgraphIsomorphismMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + param pn = Reflection.getRoutineName(); + + // Info messages to print stuff to the Chapel Server. + var repMsg:string; + var outMsg:string; + + // Extract messages send from Python. + var graphEntryName = msgArgs.getValueOf("MainGraphName"); + var subgraphEntryName = msgArgs.getValueOf("SubGraphName"); + var typeN = msgArgs.getValueOf("Type"); + var graphDegreeName = msgArgs.getValueOf("GraphDegreeName"); + var subGraphDegreeName = msgArgs.getValueOf("SubGraphDegreeName"); + var subGraphInternalVerticesSortedName = msgArgs.getValueOf("SubGraphInternalVerticesSortedName"); + + writeln("$$$ graphEntryName = ", graphEntryName); + writeln("$$$ subgraphEntryName = ", subgraphEntryName); + writeln("$$$ typeN = ", typeN); + writeln("$$$ subGraphDegreeName = ", subGraphDegreeName); + writeln("$$$ subGraphInternalVerticesSortedName = ", subGraphInternalVerticesSortedName); + + var graph_degree_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(graphDegreeName, st); + var graph_degree_sym = toSymEntry(graph_degree_entry, int); + var graph_degree = graph_degree_sym.a; + + var subgraph_degree_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(subGraphDegreeName, st); + var subgraph_degree_sym = toSymEntry(subgraph_degree_entry, int); + var subgraph_degree = subgraph_degree_sym.a; + + var subgraph_internal_vertices_degree_sorted_entry: borrowed GenSymEntry = getGenericTypedArrayEntry(subGraphInternalVerticesSortedName, st); + var subgraph_internal_vertices_degree_sorted_sym = toSymEntry(subgraph_internal_vertices_degree_sorted_entry, int); + var subgraph_internal_vertices_degree_sorted = subgraph_internal_vertices_degree_sorted_sym.a; + + writeln("$$$$$ subgraph_degree = ", subgraph_degree); + writeln("$$$$$ degree sorted subgraph = ", subgraph_internal_vertices_degree_sorted); + + // Pull out our graph from the symbol table. + var gEntry: borrowed GraphSymEntry = getGraphSymEntry(graphEntryName, st); + var g = gEntry.graph; + + // Pull out our subgraph from the symbol table. + var hEntry: borrowed GraphSymEntry = getGraphSymEntry(subgraphEntryName, st); + var h = hEntry.graph; + + var timer:stopwatch; + timer.start(); + ullmannSubgraphIsomorphism11(g, h, subgraph_internal_vertices_degree_sorted, graph_degree); + timer.stop(); + outMsg = "Subgraph Isomorphism took " + timer.elapsed():string + " sec"; + + var subgraphs = makeDistArray(1, bool); // Temporary for now, should be "array of graphs". + var subgraphsName = st.nextName(); + var subgraphsEntry = new shared SymEntry(subgraphs); + st.addEntry(subgraphsName, subgraphsEntry); + + repMsg = 'created ' + st.attrib(subgraphsName); + siLogger.info(getModuleName(),getRoutineName(),getLineNumber(),outMsg); + siLogger.info(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + return new MsgTuple(repMsg, MsgType.NORMAL); + } // end of subgraphIsomorphismMsg + + use CommandMap; + registerFunction("subgraphIsomorphism", subgraphIsomorphismMsg, getModuleName()); +} // end of module \ No newline at end of file diff --git a/arachne_development/server/TriangleCount.chpl b/arachne_development/server/TriangleCount.chpl new file mode 100644 index 00000000..faca4b9f --- /dev/null +++ b/arachne_development/server/TriangleCount.chpl @@ -0,0 +1,22 @@ +module TriangleCount { + // Chapel modules. + use Reflection; + use Set; + use List; + + // Package modules. + use CopyAggregation; + + // Arachne modules. + use GraphArray; + use Utils; + use Aggregators; + + // Arkouda modules. + use MultiTypeSymbolTable; + use MultiTypeSymEntry; + use ServerConfig; + use AryUtil; + + +}// end of TriangleCount module \ No newline at end of file diff --git a/arachne_development/server/TriangleCountMsg.chpl b/arachne_development/server/TriangleCountMsg.chpl new file mode 100644 index 00000000..8f403aa9 --- /dev/null +++ b/arachne_development/server/TriangleCountMsg.chpl @@ -0,0 +1,583 @@ +module TriCntMsg { + // Chapel modules. + use Reflection; + use Time; + + // Arachne modules. + use GraphArray; + use Utils; + use Aggregators; + use TriangleCount; + + // Arkouda modules. + use MultiTypeSymbolTable; + use MultiTypeSymEntry; + use ServerConfig; + use AryUtil; + use Logging; + use Message; + + // Server message logger. + private config const logLevel = ServerConfig.logLevel; + private config const logChannel = ServerConfig.logChannel; + const tricntLogger = new Logger(logLevel, logChannel); + + /** + * Run triangle counting on an undirected and (un)weighted graph. + * + * cmd: operation to perform. + * msgArgs: arugments passed to backend. + * SymTab: symbol table used for storage. + * + * returns: message back to Python. + */ + proc segTriCntMsg(cmd: string, msgArgs: borrowed MessageArgs, st: borrowed SymTab): MsgTuple throws { + var repMsg: string; + var n_verticesN=msgArgs.getValueOf("NumOfVertices"); + var n_edgesN=msgArgs.getValueOf("NumOfEdges"); + var directedN=msgArgs.getValueOf("Directed"); + var weightedN=msgArgs.getValueOf("Weighted"); + var graphEntryName=msgArgs.getValueOf("GraphName"); + + var vertexArrayName=msgArgs.getValueOf("VertexArray"); + var gEnt: borrowed GenSymEntry = getGenericTypedArrayEntry(vertexArrayName, st); + var e = toSymEntry(gEnt, int); + var vertexArray = e.a; + var returnary=vertexArray; + + var Nv=n_verticesN:int; + var Ne=n_edgesN:int; + var Directed=false:bool; + var Weighted=false:bool; + if (directedN:int) == 1 { + Directed=true; + } + if (weightedN:int) == 1 { + Weighted=true; + } + var countName:string; + var timer:stopwatch; + timer.start(); + + var TotalCnt:[0..0] int; + var subTriSum: [0..numLocales-1] int; + var StartVerAry: [0..numLocales-1] int; + var EndVerAry: [0..numLocales-1] int; + var RemoteAccessTimes: [0..numLocales-1] int; + var LocalAccessTimes: [0..numLocales-1] int; + + TotalCnt=0; + subTriSum=0; + StartVerAry=-1; + EndVerAry=-1; + RemoteAccessTimes=0; + LocalAccessTimes=0; + + var srcN, dstN, startN, neighbourN,vweightN,eweightN, rootN :string; + var srcRN, dstRN, startRN, neighbourRN:string; + + var gEntry:borrowed GraphSymEntry = getGraphSymEntry(graphEntryName, st); + var ag = gEntry.graph; + + proc triCtr_kernelMST(nei:[?D1] int, start_i:[?D2] int,src:[?D3] int, dst:[?D4] int, + neiR:[?D11] int, start_iR:[?D12] int,srcR:[?D13] int, dstR:[?D14] int) { + var timer:stopwatch; + TotalCnt=0; + subTriSum=0; + timer.start(); + proc binSearchE(ary:[?D] int,l:int,h:int,key:int):int { + + if ( (l>h) || ((l==h) && ( ary[l]!=key))) { + return -1; + } + if (ary[l]==key){ + return l; + } + if (ary[h]==key){ + return h; + } + var m= (l+h)/2:int; + if ((m==l) ) { + return -1; + } + if (ary[m]==key ){ + return m; + } else { + if (ary[m] or e= + proc findEdge(u:int,v:int):int { + //given the destinontion arry ary, the edge range [l,h], return the edge ID e where ary[e]=key + + var beginE=start_i[u]; + var eid=-1:int; + if (nei[u]>0) { + if ( (beginE>=0) && (v>=dst[beginE]) && (v<=dst[beginE+nei[u]-1]) ) { + eid=binSearchE(dst,beginE,beginE+nei[u]-1,v); + // search in undirect edges + } + } + if (eid==-1) {// if b + beginE=start_i[v]; + if (nei[v]>0) { + if ( (beginE>=0) && (u>=dst[beginE]) && (u<=dst[beginE+nei[v]-1]) ) { + eid=binSearchE(dst,beginE,beginE+nei[v]-1,u); + // search in undirect edges + } + } + }// end of if b + return eid; + }// end of proc findEdge(u:int,v:int) + + + + // given vertces u and v, return the edge ID e= + proc exactEdge(u:int,v:int):int { + //given the destinontion arry ary, the edge range [l,h], return the edge ID e where ary[e]=key + + var beginE=start_i[u]; + var eid=-1:int; + if (nei[u]>0) { + if ( (beginE>=0) && (v>=dst[beginE]) && (v<=dst[beginE+nei[u]-1]) ) { + eid=binSearchE(dst,beginE,beginE+nei[u]-1,v); + // search in undirect edges + } + } + return eid; + }// end of proc exatEdge(u:int,v:int) + + var tmptimer:stopwatch; + tmptimer.start(); + coforall loc in Locales { + on loc { + var ld = src.localSubdomain(); + var startEdge = ld.lowBound; + var endEdge = ld.highBound; + var triCount=0:int; + forall i in startEdge..endEdge with(+ reduce triCount){ + var v1=src[i]; + var v2=dst[i]; + var dv1=nei[v1]+neiR[v1]; + var dv2=nei[v2]+neiR[v2]; + var sv1:int; + var lv2:int; + var sdv1:int; + var ldv2:int; + if (dv1<=dv2) { + sv1=v1; + lv2=v2; + sdv1=dv1; + ldv2=dv2; + } else { + sv1=v2; + lv2=v1; + sdv1=dv2; + ldv2=dv1; + } + { + var nextStart=start_i[sv1]; + var nextEnd=start_i[sv1]+nei[sv1]-1; + if (nei[sv1]>0) { + forall j in nextStart..nextEnd with (+ reduce triCount){ + var v3=src[j];//v3==sv1 + var v4=dst[j]; + var tmpe:int; + if ( ( lv2!=v4 ) ) { + var dv4=nei[v4]+neiR[v4]; + if (ldv21 + + nextStart=start_iR[sv1]; + nextEnd=start_iR[sv1]+neiR[sv1]-1; + if (neiR[sv1]>0) { + forall j in nextStart..nextEnd with (+ reduce triCount ){ + var v3=srcR[j];//sv1==v3 + var v4=dstR[j]; + var e1=exactEdge(v4,v3);// we need the edge ID in src instead of srcR + var tmpe:int; + if (e1!=-1) { + if ( ( lv2!=v4 ) ) { + // we first check if the two different vertices can be the third edge + var dv4=nei[v4]+neiR[v4]; + if ldv2= ranges[i-1][0] && val <= ranges[i][0]) { + locs.pushBack(ranges[i-1][1]); + } + if (i == numLocales - 1) { + if val >= ranges[i][0] { + locs.pushBack(ranges[i][1]); + } + } + } + return locs.toArray(); + } - //############################################################################### - //############################################################################### - //############################################################################### - /* - Ragged array + /** + * Binary search for a given key, original version by Dr. Du. + * + * ary: int array + * l: low index bound + * h: high index bound + * key: value we are searching for + * + * returns: index if key is found, -1 if not found */ - pragma "default intent is ref" - record RagArray { - var DO = {0..0}; - var A : [DO] int; - - proc new_dom(new_d : domain(1)) { - this.DO = new_d; + proc bin_search_v(ary: [?D] int, l: int, h: int, key: int): int throws { + if ( (l < D.lowBound) || (h > D.highBound) || (l < 0)) { + return -1; } - - proc size() { - return A.size; + if ( (l > h) || ((l == h) && (ary[l] != key))) { + return -1; } - } - - private const dstBuffSize = getEnvInt("CHPL_AGGREGATION_DST_BUFF_SIZE", 4096); - private const yieldFrequency = getEnvInt("CHPL_AGGREGATION_YIELD_FREQUENCY", 1024); - - var block_locale_D : domain(1) dmapped Block(boundingBox = LocaleSpace) = LocaleSpace; - var curr_frontiers : [block_locale_D] domain(int); - var next_frontiers : [block_locale_D] domain(int); - - record DynamicAssociativeDomainDstAggregator { - const bufferSize = dstBuffSize; - const myLocaleSpace = LocaleSpace; - var opsUntilYield = yieldFrequency; - var lBuffers: [myLocaleSpace][0..#bufferSize] int; - var rBuffers: [myLocaleSpace] remoteBuffer(int); - var bufferIdxs: [myLocaleSpace] int; - - proc postinit() - { - for loc in myLocaleSpace { - rBuffers[loc] = new remoteBuffer(int, bufferSize, loc); - } + if (ary[l] == key) { + return l; } - - proc deinit() - { - flush(); + if (ary[h] == key) { + return h; } - - proc flush() - { - for loc in myLocaleSpace { - _flushBuffer(loc, bufferIdxs[loc], freeData=true); - } + + var m = (l + h) / 2: int; + + if ((m == l) ) { + return -1; } - - inline proc copy(const loc, const in srcVal: int) - { - // Get our current index into the buffer for dst's locale - ref bufferIdx = bufferIdxs[loc]; - - // Buffer the desired value - lBuffers[loc][bufferIdx] = srcVal; - bufferIdx += 1; - - // Flush our buffer if it's full. If it's been a while since we've let - // other tasks run, yield so that we're not blocking remote tasks from - // flushing their buffers. - if bufferIdx == bufferSize { - _flushBuffer(loc, bufferIdx, freeData=false); - opsUntilYield = yieldFrequency; - } - else if opsUntilYield == 0 { - chpl_task_yield(); - opsUntilYield = yieldFrequency; - } + if (ary[m] == key ) { + return m; + } else { + if (ary[m] < key) { + return bin_search_v(ary, m+1, h, key); + } else { - opsUntilYield -= 1; + return bin_search_v(ary, l, m-1, key); } } - - // Flushes the buffer. This means doing a big append to the - // associative domain on that locale. - proc _flushBuffer(loc: int, ref bufferIdx, freeData) - { - const myBufferIdx = bufferIdx; - if myBufferIdx == 0 then return; - - // Allocate a remote buffer - ref rBuffer = rBuffers[loc]; - const remBufferPtr = rBuffer.cachedAlloc(); - - // Copy local buffer to remote buffer - rBuffer.PUT(lBuffers[loc], myBufferIdx); - - // Process remote buffer - on Locales[loc] { - ref q = next_frontiers[loc]; + }// end bin_search_v + + /** + * Non-recursive, distributed-memory binary search for a given key. + * + * arr: int array + * key: value we are searching for + * + * returns: index if key is found, -1 if not found + */ + proc bin_search(arr: [?D] int, key: int, lo: int, hi: int, comparator:?rec=defaultComparator): int throws { + var found:int = -1; // index of found key, -1 if not found. + coforall loc in Locales with (ref found) do on loc { + var start_loc:bool, end_loc:bool, mid_loc:bool, skip_loc:bool; + var l:int, h:int, local_lo:int, local_hi:int; + local_lo = arr.localSubdomain().lowBound; + local_hi = arr.localSubdomain().highBound; + + // Check to see if loc is either a starting locale or an end locale. + if arr.localSubdomain().contains(lo) then start_loc = true; + else if arr.localSubdomain().contains(hi) then end_loc = true; + else if !start_loc && !end_loc && local_lo > lo && local_hi < hi then mid_loc = true; + else skip_loc = true; + + if !skip_loc { + // Start the search from the actual lo index stored on start_loc. + if start_loc { + l = if arr.localSubdomain().lowBound < lo then lo + else arr.localSubdomain().lowBound; + } else l = arr.localSubdomain().lowBound; - for srcVal in rBuffer.localIter(remBufferPtr, myBufferIdx) { - q += srcVal; - } - - if freeData { - rBuffer.localFree(remBufferPtr); + // End the search from the actual hi index stored on end_loc. + if end_loc { + h = if arr.localSubdomain().highBound > hi then hi + else arr.localSubdomain().highBound; + } else h = arr.localSubdomain().highBound; + + // Actual binary search steps. + while(l <= h) { + if arr[l] == key {found = l; break;} + if arr[h] == key {found = h; break;} + + const m = (l + h) / 2 : int; + + if m == l then break; + if arr[m] == key {found = m; break;} + + if chpl_compare(key, arr[m], comparator=comparator) > 0 then l = m + 1; + else h = m - 1; } } - if freeData { - rBuffer.markFreed(); + } // end of coforall + return found; + }// end bin_search + + /** + * Print graph data structure server-side to visualize the raw array data. + * + * G: graph we want to print out. + * + * returns: message back to Python. + */ + proc print_graph_serverside(G: borrowed SegGraph) throws { + for comp in Component { + var curr_comp = comp:string; + if G.hasComp(curr_comp) { + select curr_comp { + when "RELATIONSHIPS", "NODE_LABELS" { + var X = toSymEntry(G.getComp(comp:string), list(string, parSafe=true)).a; + writeln(comp:string, " = ", X); + } + when "NODE_PROPS", "EDGE_PROPS" { + var X = toSymEntry(G.getComp(comp:string), list((string,string), parSafe=true)).a; + writeln(comp:string, " = ", X); + } + when "EDGE_WEIGHT", "EDGE_WEIGHT_R" { + var X = toSymEntry(G.getComp(comp:string), real).a; + writeln(comp:string, " = ", X); + } + when "NODE_MAP_R" { + var X = toSymEntryAD(G.getComp(comp:string)).a; + writeln(comp:string, " = ", X); + } + otherwise { + var X = toSymEntry(G.getComp(comp:string), int).a; + writeln(comp:string, " = ", X); + } + } } - bufferIdx = 0; } - } + } // end of print_graph_serverside } \ No newline at end of file