Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Stream Setup Working? #3867

Merged
merged 10 commits into from
Feb 9, 2025
510 changes: 296 additions & 214 deletions src/libs/ck-libs/stream/stream.C

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions src/libs/ck-libs/stream/stream.ci
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module CkStream_impl
namespace Ck{
namespace Stream{


// how the user gets the result from a createNewStream
message StreamIdMessage {
size_t id;
};
Expand All @@ -39,37 +39,36 @@ module CkStream_impl
mainchare[migratable] Starter
{
entry Starter(CkArgMsg* m);
entry void starterHello(); // this is a dummy function
entry[reductiontarget] void streamCreated(int stream_token);
entry [reductiontarget] void streamCreated(int stream_token);
entry void createNewStream(CkCallback cb) {
serial {
size_t token_to_use = _curr_stream_token_id++;
size_t coordinator_to_use = (_curr_stream_coordinator++) % (CkNumPes());
stream_managers.initializeStream(token_to_use, coordinator_to_use);
}
when streamCreated(int stream_token) serial {
CkPrintf("We made it to the streamCreated part baby\n");
StreamIdMessage* msg = new StreamIdMessage();
msg -> id = stream_token;
cb.send(msg);
}
};
entry void startWriteStreamClose(StreamToken token);
entry void addRegisteredPE(StreamToken, size_t);
entry void tellManagersExpectedReceives(CkReductionMsg* msg);
entry [reductiontarget] void processBufferedCloseStream(StreamToken);

}

group[migratable] StreamManager
{
entry StreamManager();
entry void initializeStream(int id, size_t coordinator_pe);
entry void recvData(DeliverStreamBytesMsg* in_msg);
entry void addRegisteredPE(StreamToken, size_t pe);
entry void recvPutBufferFromPE(DeliverStreamBytesMsg* in_msg);
entry void addRegisteredPE(StreamToken, size_t pe, bool);
entry void clearBufferedDeliveryMsg(StreamToken token);
entry void registerPEWithCoordinator(StreamToken, size_t);
entry void startCloseWriteStream(StreamToken token);
//entry void startCloseWriteStream(StreamToken token);
entry void initiateWriteStreamClose(StreamToken token);
entry void ackWrites(StreamToken, size_t);

entry void expectedReceivesUponClose(StreamToken token, size_t);
}
}
}
Expand Down
49 changes: 34 additions & 15 deletions src/libs/ck-libs/stream/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,36 @@ namespace Ck { namespace Stream {
void dummyFunction();
// API to insert into the stream
void put(StreamToken stream, void* data, size_t elem_size, size_t num_elems);

void putRecord(StreamToken stream, void* data, size_t data_size);
// create a new stream, with the callback taking a message returning the StreamToken
void createNewStream(CkCallback cb);
// flush the buffer of the local stream
void flushLocalStream(StreamToken stream);
// extract data from stream
void get(StreamToken stream, size_t elem_size, size_t num_elems, CkCallback cb);

void getRecord(StreamToken stream, CkCallback cb);
// closing the write side of a stream
void closeWriteStream(StreamToken);
namespace impl {
// used when buffering the request
struct GetRequest {
// when this flag is true, we request sizeof(size_t) bytes first, then invoke a new fulfill request
bool get_record = false;
size_t requested_bytes;
CkCallback cb;
GetRequest(size_t, CkCallback);
};

// returned when extracting data from the get buffer on get requests
struct ExtractedData {
char* buffer = 0;
size_t num_bytes_copied;
};
// keep this struct here in case we need to trakc more metadata in the future
struct StreamMetaData {
std::vector<size_t> _registered_pes;
bool close_buffered;
};
class DeliverStreamBytesMsg;
class CMessage_DeliverStreamBytesMsg;
Expand Down Expand Up @@ -62,20 +74,23 @@ namespace Ck { namespace Stream {
};
// used by StreamManagers to organize the data of multiple streams
class StreamBuffers {
char* _in_buffer; // the buffer for incoming data; once filled, just drop extra data; (_in_buffer is used for the data going out; I should rename this at some point
std::deque<InData> _out_buffer; // the buffer for outgoing data
std::deque<GetRequest> _buffered_reqs;
std::deque<DeliverStreamBytesMsg*> _msg_out_buffer;
size_t _in_buffer_capacity= 4 * 1024 * 1024;
size_t _out_buffer_capacity= 4 * 1024 * 1024;
size_t _in_buffer_size = 0;
size_t _out_buffer_size = 0;
size_t _stream_id= 0;
char* _put_buffer;
std::deque<InData> _get_buffer; // the buffer for outgoing data
std::deque<GetRequest> _buffered_gets; // buffered get requests
// the messages that need to be sent
std::deque<DeliverStreamBytesMsg*> _buffered_msg_to_deliver;
// size of buffer for data to be sent out to OTHER PEs (from puts)
size_t _put_buffer_capacity= 4 * 1024 * 1024; // Note: Old code had in = put and out = get
size_t _get_buffer_capacity= 4 * 1024 * 1024;
size_t _put_buffer_size = 0;
size_t _get_buffer_size = 0;
StreamToken _stream_id= 0;
size_t _coordinator_pe = 0;
std::vector<size_t> _registered_pes;
bool _registered_pe = false;
StreamMessageCounter _counter;
void _sendOutBuffer(char* data, size_t size);
void _sendOutBuffer(DeliverStreamBytesMsg*);
ssize_t _pickTargetPE();

public:
Expand All @@ -89,16 +104,20 @@ namespace Ck { namespace Stream {
void flushOutBuffer(char* extra_data, size_t extra_bytes);
void addToRecvBuffer(DeliverStreamBytesMsg* data);
void fulfillRequest(GetRequest& gr);
void handleGetRequest(GetRequest gr);
ExtractedData extractFromGetBuffer(char* ret_buffer, size_t bytes_to_copy);
void handleGetRequest(GetRequest& gr);
void pushBackRegisteredPE(size_t pe);
size_t numBufferedDeliveryMsg();
// just create the message from given info
DeliverStreamBytesMsg* createDeliverBytesStreamMsg();
DeliverStreamBytesMsg* createDeliverBytesStreamMsg(char* extra_data, size_t extra_bytes);
void clearBufferedDeliveryMsg();
void popFrontMsgOutBuffer();
size_t coordinator();
bool isStreamClosed();
void setStreamClosed();
void insertAck(size_t);
bool allAcked();
CkReductionMsg* setStreamClosed();
void clearBufferedGetRequests();
void setExpectedReceivesUponClose(size_t num_messages_to_receive);
void setCloseFlag();
};

class StreamCoordinator {
Expand Down
46 changes: 33 additions & 13 deletions src/libs/ck-libs/stream/stream_msg_counter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,44 @@
#include <unordered_map>

#include "streamtoken.h"
namespace Ck { namespace Stream { namespace impl {
class StreamMessageCounter{
std::unordered_map<size_t, size_t> _counter;
size_t _num_sent_messages = 0;
size_t _write_acks = 0;
bool _stream_write_closed = false;
namespace Ck {
namespace Stream {
namespace impl {
class StreamMessageCounter {
// keep track of how many bytes were sent to each PE in the stream
std::unordered_map<size_t, size_t> _sent_counter;
// keep track of how many bytes were received by each PE in the stream
std::unordered_map<size_t, size_t> _received_counter;
StreamToken _stream = 0;
public:
size_t _num_bytes_get = 0;
size_t _number_of_expected_receives = 0;
bool _close_initiated = false;

public:
StreamMessageCounter();
StreamMessageCounter(StreamToken);
bool isStreamClosed();
void setStreamWriteClosed();
void processIncomingMessage(size_t);
void addSentMessage();
void addWriteAck(size_t);
bool allAcked();
// given a source PE and the number of bytes, track the incoming message
void processIncomingMessage(size_t, size_t);
// given a destination PE and the number of bytes, track the outgoing message
void processOutgoingMessage(size_t, size_t);
// gets the sent counters in array format
u_long* getSentCounterArray();
// gets the received counters in array format
u_long* getReceivedCounterArray();
// method that sums everything in receive array
size_t totalReceivedMessages();
// determines if we have received all the data we should && stream is trying to be closed
bool receivedAllData();

size_t getNumberOfExpectedReceives();

// set the number of receive messages we should expect after all messages on this pe have arrived
void setExpectedReceives(size_t);
bool isCloseFlagSet();
void setCloseFlag();
};

}}}
} // namespace impl
} // namespace Stream
} // namespace Ck
Binary file removed tests/charm++/stream/streamtest
Binary file not shown.
4 changes: 3 additions & 1 deletion tests/charm++/stream/streamtest.C
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ class Producers : public CBase_Producers {
public:
Producers(StreamToken stream){
_stream = stream;
CkPrintf("Producer[%d] is on PE=%d\n", thisIndex, CkMyPe());
for(int i = 0; i < 10; ++i){
size_t brudda = i + 10 * thisIndex;
Ck::Stream::put(_stream, &brudda, sizeof(size_t), 1);
}
Ck::Stream::flushLocalStream(_stream);
CkPrintf("Producer %d has written %d size_t to the stream...\n", thisIndex, 10);
CkPrintf("Producer %d has written %d size_t to the stream, which is %d total bytes...\n", thisIndex, 10, 10 * sizeof(size_t));
contribute(CkCallback(CkReductionTarget(Producers, doneWriting), thisProxy[0]));
}

Expand All @@ -39,6 +40,7 @@ public:
Consumers_SDAG_CODE
Consumers(StreamToken stream) {
_stream = stream;
CkPrintf("DEBUG CONSUMERS: CONSUMER[%d] is on PE[%zu]\n", thisIndex, CkMyPe());
Ck::Stream::get(_stream, sizeof(size_t), 1, CkCallback(CkIndex_Consumers::recvData(0), thisProxy[thisIndex]));
}
};
Expand Down
15 changes: 12 additions & 3 deletions tests/charm++/stream/streamtest.ci
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mainmodule streamtest
size_t id = msg -> id;
CkPrintf("Got the stream id: %d\n", id);
// CkExit();
producers = CProxy_Producers::ckNew(id, 4);
producers = CProxy_Producers::ckNew(id, 10);
consumers = CProxy_Consumers::ckNew(id, 2);
}
};
Expand All @@ -28,16 +28,25 @@ mainmodule streamtest
size_t num_ints = msg -> num_bytes / sizeof(size_t);
_num_ints_received += num_ints;
for(int i = 0; i < num_ints; ++i){
CkPrintf("%d,", data[i]);
CkPrintf("PE %d, Index: %d: Received: %d,", CkMyPe(), thisIndex, data[i]);
}
CkPrintf("\n");
if(msg -> status == Ck::Stream::StreamStatus::STREAM_OK) {
CkPrintf("issuing another get request...\n");
CkPrintf("issuing another get request on PE #%d, index %d...\n", CkMyPe(), thisIndex);
Ck::Stream::get(_stream, sizeof(size_t), 1, CkCallback(CkIndex_Consumers::recvData(0), thisProxy[thisIndex]));
} else {
CkPrintf("Consumer %d has received the done signal and consumed %d size_t...\n", thisIndex, _num_ints_received);
CkCallback cb = CkCallback(CkReductionTarget(Consumers, finishReading), thisProxy[0]);
contribute(cb);
}
}
};

entry [reductiontarget] void finishReading() {
serial {
CkPrintf("All consumers done reading!\n");
CkExit(0);
}
}
}
}
30 changes: 30 additions & 0 deletions tests/charm++/stream_records/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-include ../../common.mk
CHARMC=../../../bin/charmc $(OPTS)

FILENAME = "readtest.txt"
FILESIZE = 121
N_BUFFER_CHARES = 3
N_READERS = 11

all: streamtest

streamtest: streamtest.ci streamtest.C
$(CHARMC) streamtest.ci
$(CHARMC) streamtest.C -o $@ -module CkStream

# test: iotest
# #dd if=/dev/urandom of=large_test.txt bs=64M count=16 iflag=fullblock
# $(call run, ./iotest +p2 $(N_BUFFER_CHARES) $(FILESIZE) $(N_READERS) $(FILENAME))
# $(call run, ./iotest +p4 $(N_BUFFER_CHARES) $(FILESIZE) $(N_READERS) $(FILENAME))
#
# testp: iotest
# $(call run, ./iotest +p$(P) $(N_BUFFER_CHARES) $(FILESIZE) $(N_READERS) $(FILENAME))
# $(call run, ./iotest +p$(P) $$((2 * $(N_BUFFER_CHARES))) $(FILESIZE) $$((4 * $(N_READERS))) $(FILENAME))
#
# smptest: iotest
# $(call run, ./iotest +p2 ++ppn 2 $(N_BUFFER_CHARES) $(FILESIZE) $(N_READERS) $(FILENAME))
# $(call run, ./iotest +p4 ++ppn 2 $(N_BUFFER_CHARES) $(FILESIZE) $(N_READERS) $(FILENAME))
#
clean:
rm -f *.o *.decl.h *.def.h streamtest test*

Binary file added tests/charm++/stream_records/streamtest
Binary file not shown.
62 changes: 62 additions & 0 deletions tests/charm++/stream_records/streamtest.C
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "streamtest.decl.h"
#include <iostream>
#include <time.h>

class Main : public CBase_Main {
CProxy_Producers producers;
CProxy_Consumers consumers;
public:
Main_SDAG_CODE
Main(CkArgMsg* m){
delete m;
Ck::Stream::createNewStream(CkCallback(CkIndex_Main::streamMade(0), thisProxy));
}
};

class Producers : public CBase_Producers {
StreamToken _stream;
public:
Producers(StreamToken stream){
_stream = stream;

for(int i = 0; i < 10; ++i){
std::string record = generateRandomString();
CkPrintf("Created: %s\n", record.c_str());
Ck::Stream::putRecord(_stream, (void*)record.c_str(), sizeof(char) * record.size() + 1);
}
Ck::Stream::flushLocalStream(_stream);
CkPrintf("Producer %d has written %d size_t to the stream...\n", thisIndex, 10);
contribute(CkCallback(CkReductionTarget(Producers, doneWriting), thisProxy[0]));
}

void doneWriting(){
Ck::Stream::closeWriteStream(_stream);
}

std::string generateRandomString() {
const std::string characters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
std::random_device rd;
std::mt19937 generator(rd());
std::uniform_int_distribution<> lengthDistribution(5, 20);
std::uniform_int_distribution<> distribution(0, characters.size() - 1);
size_t length = lengthDistribution(generator);
std::string randomString = "";
for (size_t i = 0; i < length; ++i) {
randomString += characters[distribution(generator)];
}
return randomString;
// CkPrintf("i'm here lmao\n");
}
};

class Consumers : public CBase_Consumers {
StreamToken _stream;
size_t _num_bytes_received = 0;
public:
Consumers_SDAG_CODE
Consumers(StreamToken stream) {
_stream = stream;
Ck::Stream::getRecord(_stream, CkCallback(CkIndex_Consumers::recvData(0), thisProxy[thisIndex]));
}
};
#include "streamtest.def.h"
Loading
Loading