Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-1255: [Plasma] Fix typo in plasma protocol; add DCHECK for ReadXXX in plasma protocol. #887

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
ObjectID id;
PlasmaObject object;
RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object));
// If the CreateReply included an error, then the store will not send a file
// descriptor.
int fd = recv_fd(store_conn_);
Expand Down Expand Up @@ -227,7 +227,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
std::vector<ObjectID> received_object_ids(num_objects);
std::vector<PlasmaObject> object_data(num_objects);
PlasmaObject* object;
RETURN_NOT_OK(ReadGetReply(buffer.data(), received_object_ids.data(),
RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(),
object_data.data(), num_objects));

for (int i = 0; i < num_objects; ++i) {
Expand Down Expand Up @@ -356,7 +356,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer));
ObjectID object_id2;
RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
RETURN_NOT_OK(
ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object));
}
return Status::OK();
}
Expand Down Expand Up @@ -451,7 +452,7 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
std::vector<uint8_t> buffer;
int64_t type;
RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
return ReadEvictReply(buffer.data(), num_bytes_evicted);
return ReadEvictReply(buffer.data(), buffer.size(), num_bytes_evicted);
}

Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
Expand Down Expand Up @@ -524,7 +525,7 @@ Status PlasmaClient::Connect(const std::string& store_socket_name,
RETURN_NOT_OK(SendConnectRequest(store_conn_));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer));
RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_));
return Status::OK();
}

Expand Down Expand Up @@ -564,7 +565,7 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer));
ObjectID id;
RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1));
ARROW_CHECK(object_id == id);
return Status::OK();
}
Expand All @@ -586,7 +587,8 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req
num_ready_objects, timeout_ms));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer));
RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects));
RETURN_NOT_OK(
ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects));

*num_objects_ready = 0;
for (int i = 0; i < num_object_requests; ++i) {
Expand Down
84 changes: 56 additions & 28 deletions cpp/src/plasma/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message);
}

Status ReadCreateRequest(uint8_t* data, ObjectID* object_id, int64_t* data_size,
int64_t* metadata_size) {
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want int64_t here for the size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll check it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use int64_t for size type? The ReadXXX was called with parameter from std::vector<>, then the size is size_t type correspondingly. I see mostly using int64_t for size in the project, the reason is?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I thought these were arrow::Buffer objects. @pcmoritz have you considered moving all your heap allocations to an Arrow memory pool?

We prefer to use signed integer types unless we are dealing with system calls (like malloc) or the STL, basically following Google's guidance on this https://google.github.io/styleguide/cppguide.html#Integer_Types

I think it's fine to leave this as is and we can refactor later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it, thanks.

int64_t* data_size, int64_t* metadata_size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*data_size = message->data_size();
*metadata_size = message->metadata_size();
*object_id = ObjectID::from_binary(message->object_id()->str());
Expand All @@ -83,9 +84,11 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
}

Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) {
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
PlasmaObject* object) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
object->handle.store_fd = message->plasma_object()->segment_index();
object->handle.mmap_size = message->plasma_object()->mmap_size();
Expand All @@ -106,9 +109,11 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
}

Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) {
Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
unsigned char* digest) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
ARROW_CHECK(message->digest()->size() == kDigestSize);
memcpy(digest, message->digest()->data(), kDigestSize);
Expand All @@ -122,9 +127,10 @@ Status SendSealReply(int sock, ObjectID object_id, int error) {
return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
}

Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
}
Expand All @@ -133,13 +139,14 @@ Status ReadSealReply(uint8_t* data, ObjectID* object_id) {

Status SendReleaseRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()));
auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
}

Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Expand All @@ -151,9 +158,10 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) {
return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
}

Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
}
Expand All @@ -166,9 +174,10 @@ Status SendDeleteRequest(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
}

Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Expand All @@ -180,9 +189,10 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) {
return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
}

Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
}
Expand All @@ -196,9 +206,11 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objec
return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
}

Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) {
Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
int64_t num_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
Expand All @@ -214,16 +226,18 @@ Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
}

int64_t ReadStatusReply_num_objects(uint8_t* data) {
int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
return message->object_ids()->size();
}

Status ReadStatusReply(uint8_t* data, ObjectID object_ids[], int object_status[],
int64_t num_objects) {
Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
int object_status[], int64_t num_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
Expand All @@ -241,9 +255,10 @@ Status SendContainsRequest(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
}

Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Expand All @@ -255,9 +270,11 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
}

Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) {
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
bool* has_object) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
*has_object = message->has_object();
return Status::OK();
Expand All @@ -279,9 +296,10 @@ Status SendConnectReply(int sock, int64_t memory_capacity) {
return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
}

Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*memory_capacity = message->memory_capacity();
return Status::OK();
}
Expand All @@ -294,9 +312,10 @@ Status SendEvictRequest(int sock, int64_t num_bytes) {
return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
}

Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*num_bytes = message->num_bytes();
return Status::OK();
}
Expand All @@ -307,9 +326,10 @@ Status SendEvictReply(int sock, int64_t num_bytes) {
return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
}

Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
num_bytes = message->num_bytes();
return Status::OK();
}
Expand All @@ -324,10 +344,11 @@ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message);
}

Status ReadGetRequest(uint8_t* data, std::vector<ObjectID>& object_ids,
Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
int64_t* timeout_ms) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
auto object_id = message->object_ids()->Get(i)->str();
object_ids.push_back(ObjectID::from_binary(object_id));
Expand Down Expand Up @@ -355,10 +376,11 @@ Status SendGetReply(
return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
}

Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
int64_t num_objects) {
Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
PlasmaObject plasma_objects[], int64_t num_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
Expand All @@ -383,9 +405,10 @@ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_object
return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
}

Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
}
Expand All @@ -410,10 +433,11 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_re
return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
}

Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
int64_t* timeout_ms, int* num_ready_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*num_ready_objects = message->num_ready_objects();
*timeout_ms = message->timeout();

Expand Down Expand Up @@ -443,11 +467,12 @@ Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message);
}

Status ReadWaitReply(uint8_t* data, ObjectRequest object_requests[],
Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
int* num_ready_objects) {
DCHECK(data);

auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*num_ready_objects = message->num_ready_objects();
for (int i = 0; i < *num_ready_objects; i++) {
object_requests[i].object_id =
Expand Down Expand Up @@ -475,9 +500,11 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po
return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
}

Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) {
Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
int* port) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
DCHECK(message->object_id()->size() == sizeof(ObjectID));
*object_id = ObjectID::from_binary(message->object_id()->str());
*address = strdup(message->address()->c_str());
Expand All @@ -493,10 +520,11 @@ Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message);
}

Status ReadDataReply(uint8_t* data, ObjectID* object_id, int64_t* object_size,
int64_t* metadata_size) {
Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
int64_t* object_size, int64_t* metadata_size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
*object_size = (int64_t)message->object_size();
*metadata_size = (int64_t)message->metadata_size();
Expand Down
Loading