Skip to content

Commit

Permalink
fix the compiling errors in arkouda_development when new arkouda vers… (
Browse files Browse the repository at this point in the history
#64)

* fix the compiling errors in arkouda_development when new arkouda version released

* include Oliver's code and add R before the same function
  • Loading branch information
zhihuidu authored Oct 24, 2023
1 parent a9ee846 commit b287278
Show file tree
Hide file tree
Showing 21 changed files with 3,660 additions and 166 deletions.
4 changes: 2 additions & 2 deletions arachne/server/BreadthFirstSearchMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,5 @@ module BreadthFirstSearchMsg {
}

use CommandMap;
registerFunction("segmentedGraphBFS", segBFSMsg, getModuleName());
}
registerFunction("RsegmentedGraphBFS", segBFSMsg, getModuleName());
}
2 changes: 1 addition & 1 deletion arachne/server/ConnectedComponentsMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@ module CCMsg {
}

use CommandMap;
registerFunction("segmentedGraphCC", segCCMsg,getModuleName());
registerFunction("RsegmentedGraphCC", segCCMsg,getModuleName());
}
4 changes: 2 additions & 2 deletions arachne/server/TriangleCountMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -579,5 +579,5 @@ module TriCntMsg {
}// end of segTriMsg

use CommandMap;
registerFunction("segmentedGraphTri", segTriCntMsg,getModuleName());
}
registerFunction("RsegmentedGraphTri", segTriCntMsg,getModuleName());
}
128 changes: 128 additions & 0 deletions arachne_development/server/Aggregators.chpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
module Aggregators {
// Chapel modules.
use ReplicatedDist;
use Set;

// Package modules.
use CopyAggregation;

// Arkouda modules.
use CommAggregation;
use ServerConfig;

/**
* Declare our frontier queues here to be sets, done globally since refs cannot be a part of
* records yet. TODO: move these straight into SetDstAggregator when refs are allowed inside of
* records. */
var D_frontier_sets = {0..1} dmapped Replicated();
var frontier_sets : [D_frontier_sets] set(int, parSafe=true);
var frontier_sets_idx : int;

// Sizes of buffer and yield frequencies taken from the Arkouda server config information.
private const dstBuffSize = getEnvInt("CHPL_AGGREGATION_DST_BUFF_SIZE", 4096);
private const yieldFrequency = getEnvInt("CHPL_AGGREGATION_YIELD_FREQUENCY", 1024);

/**
* Record for remote set aggregator to perform set additions from one locale to the next. Built
* using the aggregator from CopyAggregation but modfied a bit to make the aggregated memory
* address a set instead of an array memory location. */
record SetDstAggregator {
type elemType;
type aggType = elemType;
const bufferSize = dstBuffSize;
const myLocaleSpace = LocaleSpace;
var opsUntilYield = yieldFrequency;
var lBuffers: [myLocaleSpace] [0..#bufferSize] aggType;
var rBuffers: [myLocaleSpace] remoteBuffer(aggType);
var bufferIdxs: [myLocaleSpace] int;

/**
* Allocate the remote buffers on each locale allocated. */
proc postinit() {
for loc in myLocaleSpace {
rBuffers[loc] = new remoteBuffer(aggType, bufferSize, loc);
}
}

/**
* Flush all of the buffers during deinitialization. */
proc deinit() {
flush();
}

/**
* For every locale allocated, flush their buffers. */
proc flush() {
for loc in myLocaleSpace {
_flushBuffer(loc, bufferIdxs[loc], freeData=true);
}
}

/**
* Make a copy of the data being passed to a remote buffer in the local buffer that
* corresponds to the remote locale.
*
* loc: id of remote locale.
* srcVal: value to be copied to the remote locale. */
inline proc copy(const loc, const in srcVal: elemType) {
// Get our current index into the buffer for the destination locale.
ref bufferIdx = bufferIdxs[loc];

// Buffer the desired value.
lBuffers[loc][bufferIdx] = srcVal;
bufferIdx += 1;

/**
* Flush our buffer if it's full. If it's been a while since we've let
* other tasks run, yield so that we're not blocking remote tasks from
* flushing their buffers. */
if bufferIdx == bufferSize {
_flushBuffer(loc, bufferIdx, freeData=false);
opsUntilYield = yieldFrequency;
} else if opsUntilYield == 0 {
chpl_task_yield();
opsUntilYield = yieldFrequency;
} else {
opsUntilYield -= 1;
}
}

/**
* Helper function to peform actual buffer steps for each locale.
*
* loc: id of locale to flush.
* bufferIdx: id of buffer to PUT items into.
* freeData: did the last flush happen and have all remote buffers been freed? */
proc _flushBuffer(loc: int, ref bufferIdx, freeData) {
// Get the buffer id to extract the buffered values.
const myBufferIdx = bufferIdx;
if myBufferIdx == 0 then return;

// Get refernece to allocated remote buffer at loc and allocate it if it wasn't already.
ref rBuffer = rBuffers[loc];
const remBufferPtr = rBuffer.cachedAlloc();

// PUT local buffer to remote buffer.
rBuffer.PUT(lBuffers[loc], myBufferIdx);

// Process remote buffer where it now exists.
on Locales[loc] {
ref f = frontier_sets[(frontier_sets_idx + 1) % 2];
/**
* forall gives error: A standalone or leader iterator is not found for the iterable
* expression in this forall loop */
for srcVal in rBuffer.localIter(remBufferPtr, myBufferIdx) {
f.add(srcVal);
}
// Only free remaining data at deinit.
if freeData {
rBuffer.localFree(remBufferPtr);
}
}
if freeData {
rBuffer.markFreed();
}
bufferIdx = 0;
}
} // end of SetDstAggregator
} // end of Aggregators
Empty file.
135 changes: 135 additions & 0 deletions arachne_development/server/BreadthFirstSearch.chpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
module BreadthFirstSearch {
// Chapel modules.
use Reflection;
use Set;
use List;

// Package modules.
use CopyAggregation;

// Arachne modules.
use GraphArray;
use Utils;
use Aggregators;

// Arkouda modules.
use MultiTypeSymbolTable;
use MultiTypeSymEntry;
use ServerConfig;
use AryUtil;

/**
* Breadth-first search for shared-memory (one locale) systems. Uses a Chapel set for
*
* graph: graph to run bfs on.
*
* returns: success string message. */
proc bfs_kernel_und_smem(graph:SegGraph, root:int, depth: [?D] int):string throws {
// Extract graph data.
var src = toSymEntry(graph.getComp("SRC"),int).a;
var dst = toSymEntry(graph.getComp("DST"),int).a;
var seg = toSymEntry(graph.getComp("SEGMENTS"),int).a;

// Generate the frontier sets.
var frontier_sets : [{0..1}] list(int, parSafe=true);
frontier_sets[0] = new list(int, parSafe=true);
frontier_sets[1] = new list(int, parSafe=true);

var frontier_sets_idx = 0;
var cur_level = 0;
depth[root] = cur_level;
frontier_sets[frontier_sets_idx].pushBack(root);
while true {
var pending_work:bool;
forall u in frontier_sets[frontier_sets_idx] with (|| reduce pending_work) {
var adj_list_start = seg[u];
var num_neighbors = seg[u+1] - adj_list_start;
if (num_neighbors != 0) {
var adj_list_end = adj_list_start + num_neighbors - 1;
ref neighborhood = dst.localSlice(adj_list_start..adj_list_end);
for v in neighborhood {
if (depth[v] == -1) {
pending_work = true;
depth[v] = cur_level + 1;
frontier_sets[(frontier_sets_idx + 1) % 2].pushBack(v);
}
}
}
}
frontier_sets[frontier_sets_idx].clear();
if !pending_work {
break;
}
cur_level += 1;
frontier_sets_idx = (frontier_sets_idx + 1) % 2;
}// end while
return "success";
}// end of bfs_kernel_und_smem

/**
* Using a remote aggregator above for sets, we are going to perform aggregated writes to the
* locales that include the neighborhood of the vertex being processed.
*
* graph: graph to run bfs on.
*
* returns: success string message. */
proc bfs_kernel_und_dmem(graph:SegGraph, root:int, depth: [?D] int):string throws {
// Initialize the frontiers on each of the locales.
coforall loc in Locales do on loc {
frontier_sets[0] = new set(int, parSafe=true);
frontier_sets[1] = new set(int, parSafe=true);
}
frontier_sets_idx = 0;
var src = toSymEntry(graph.getComp("SRC"),int).a;
var dst = toSymEntry(graph.getComp("DST"),int).a;
var seg = toSymEntry(graph.getComp("SEGMENTS"),int).a;

// Add the root to the locale that owns it and update size & depth.
for lc in find_locs(root, graph) {
on lc do frontier_sets[frontier_sets_idx].add(root);
}
var cur_level = 0;
depth[root] = cur_level;

while true {
var pending_work:bool;
coforall loc in Locales with(|| reduce pending_work) {
on loc {
var src_low = src.localSubdomain().low;
var src_high = src.localSubdomain().high;
forall u in frontier_sets[frontier_sets_idx] with (|| reduce pending_work, var frontier_agg = new SetDstAggregator(int), var depth_agg = new DstAggregator(int)) {
var adj_list_start = seg[u];
var num_neighbors = seg[u+1] - adj_list_start;
if (num_neighbors != 0) {
var adj_list_end = adj_list_start + num_neighbors - 1;

// Only pull the part of the adjacency list that is local.
var actual_start = max(adj_list_start, src_low);
var actual_end = min(src_high, adj_list_end);

ref neighborhood = dst.localSlice(actual_start..actual_end);
for v in neighborhood {
if (depth[v] == -1) {
pending_work = true;
// depth[v] = cur_level + 1;
depth_agg.copy(depth[v], cur_level + 1);
var locs = find_locs(v, graph);
for lc in locs {
frontier_agg.copy(lc.id, v);
}
}
}
}
} //end forall
frontier_sets[frontier_sets_idx].clear();
} // end on loc
}// end coforall loc
if !pending_work {
break;
}
cur_level += 1;
frontier_sets_idx = (frontier_sets_idx + 1) % 2;
}// end while
return "success";
}// end of bfs_kernel_und_dmem
}// end of BreadthFirstSearch module
Loading

0 comments on commit b287278

Please sign in to comment.