Skip to content

Commit

Permalink
ARROW-1255: [Plasma] Fix typo in plasma protocol; add DCHECK for Read…
Browse files Browse the repository at this point in the history
…XXX in plasma protocol.

Related to apache#878, add DCHECK for ReadXXX.

Author: Yeolar <[email protected]>

Closes apache#887 from Yeolar/fixtypo_plasma_and_add_DCHECK and squashes the following commits:

4df63bc [Yeolar] clang-format for too long lines.
143d254 [Yeolar] Update, compile passed.
09ff103 [Yeolar] Fix conflicts.
b951d8d [Yeolar] Merge pull request #1 from apache/master
ebae611 [Yeolar] Fix typo in plasma protocol; add DCHECK for ReadXXX in plasma protocol.
  • Loading branch information
Yeolar authored and wesm committed Jul 26, 2017
1 parent 2eeaa95 commit 676a4a9
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 96 deletions.
16 changes: 9 additions & 7 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
ObjectID id;
PlasmaObject object;
RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object));
// If the CreateReply included an error, then the store will not send a file
// descriptor.
int fd = recv_fd(store_conn_);
Expand Down Expand Up @@ -227,7 +227,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
std::vector<ObjectID> received_object_ids(num_objects);
std::vector<PlasmaObject> object_data(num_objects);
PlasmaObject* object;
RETURN_NOT_OK(ReadGetReply(buffer.data(), received_object_ids.data(),
RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(),
object_data.data(), num_objects));

for (int i = 0; i < num_objects; ++i) {
Expand Down Expand Up @@ -356,7 +356,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer));
ObjectID object_id2;
RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
RETURN_NOT_OK(
ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object));
}
return Status::OK();
}
Expand Down Expand Up @@ -451,7 +452,7 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
std::vector<uint8_t> buffer;
int64_t type;
RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
return ReadEvictReply(buffer.data(), num_bytes_evicted);
return ReadEvictReply(buffer.data(), buffer.size(), num_bytes_evicted);
}

Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
Expand Down Expand Up @@ -524,7 +525,7 @@ Status PlasmaClient::Connect(const std::string& store_socket_name,
RETURN_NOT_OK(SendConnectRequest(store_conn_));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer));
RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_));
return Status::OK();
}

Expand Down Expand Up @@ -564,7 +565,7 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer));
ObjectID id;
RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1));
ARROW_CHECK(object_id == id);
return Status::OK();
}
Expand All @@ -586,7 +587,8 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req
num_ready_objects, timeout_ms));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer));
RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects));
RETURN_NOT_OK(
ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects));

*num_objects_ready = 0;
for (int i = 0; i < num_object_requests; ++i) {
Expand Down
84 changes: 56 additions & 28 deletions cpp/src/plasma/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message);
}

Status ReadCreateRequest(uint8_t* data, ObjectID* object_id, int64_t* data_size,
int64_t* metadata_size) {
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
int64_t* data_size, int64_t* metadata_size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*data_size = message->data_size();
*metadata_size = message->metadata_size();
*object_id = ObjectID::from_binary(message->object_id()->str());
Expand All @@ -83,9 +84,11 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
}

Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) {
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
PlasmaObject* object) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
object->handle.store_fd = message->plasma_object()->segment_index();
object->handle.mmap_size = message->plasma_object()->mmap_size();
Expand All @@ -106,9 +109,11 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
}

Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) {
Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
unsigned char* digest) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
ARROW_CHECK(message->digest()->size() == kDigestSize);
memcpy(digest, message->digest()->data(), kDigestSize);
Expand All @@ -122,9 +127,10 @@ Status SendSealReply(int sock, ObjectID object_id, int error) {
return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
}

Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
}
Expand All @@ -133,13 +139,14 @@ Status ReadSealReply(uint8_t* data, ObjectID* object_id) {

Status SendReleaseRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()));
auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
}

Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Expand All @@ -151,9 +158,10 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) {
return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
}

Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
}
Expand All @@ -166,9 +174,10 @@ Status SendDeleteRequest(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
}

Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Expand All @@ -180,9 +189,10 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) {
return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
}

Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
}
Expand All @@ -196,9 +206,11 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objec
return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
}

Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) {
Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
int64_t num_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
Expand All @@ -214,16 +226,18 @@ Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
}

int64_t ReadStatusReply_num_objects(uint8_t* data) {
int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
return message->object_ids()->size();
}

Status ReadStatusReply(uint8_t* data, ObjectID object_ids[], int object_status[],
int64_t num_objects) {
Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
int object_status[], int64_t num_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
Expand All @@ -241,9 +255,10 @@ Status SendContainsRequest(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
}

Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Expand All @@ -255,9 +270,11 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
}

Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) {
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
bool* has_object) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
*has_object = message->has_object();
return Status::OK();
Expand All @@ -279,9 +296,10 @@ Status SendConnectReply(int sock, int64_t memory_capacity) {
return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
}

Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*memory_capacity = message->memory_capacity();
return Status::OK();
}
Expand All @@ -294,9 +312,10 @@ Status SendEvictRequest(int sock, int64_t num_bytes) {
return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
}

Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*num_bytes = message->num_bytes();
return Status::OK();
}
Expand All @@ -307,9 +326,10 @@ Status SendEvictReply(int sock, int64_t num_bytes) {
return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
}

Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
num_bytes = message->num_bytes();
return Status::OK();
}
Expand All @@ -324,10 +344,11 @@ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message);
}

Status ReadGetRequest(uint8_t* data, std::vector<ObjectID>& object_ids,
Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
int64_t* timeout_ms) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
auto object_id = message->object_ids()->Get(i)->str();
object_ids.push_back(ObjectID::from_binary(object_id));
Expand Down Expand Up @@ -355,10 +376,11 @@ Status SendGetReply(
return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
}

Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
int64_t num_objects) {
Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
PlasmaObject plasma_objects[], int64_t num_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
Expand All @@ -383,9 +405,10 @@ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_object
return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
}

Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
}
Expand All @@ -410,10 +433,11 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_re
return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
}

Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
int64_t* timeout_ms, int* num_ready_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*num_ready_objects = message->num_ready_objects();
*timeout_ms = message->timeout();

Expand Down Expand Up @@ -443,11 +467,12 @@ Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message);
}

Status ReadWaitReply(uint8_t* data, ObjectRequest object_requests[],
Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
int* num_ready_objects) {
DCHECK(data);

auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*num_ready_objects = message->num_ready_objects();
for (int i = 0; i < *num_ready_objects; i++) {
object_requests[i].object_id =
Expand Down Expand Up @@ -475,9 +500,11 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po
return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
}

Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) {
Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
int* port) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
DCHECK(message->object_id()->size() == sizeof(ObjectID));
*object_id = ObjectID::from_binary(message->object_id()->str());
*address = strdup(message->address()->c_str());
Expand All @@ -493,10 +520,11 @@ Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message);
}

Status ReadDataReply(uint8_t* data, ObjectID* object_id, int64_t* object_size,
int64_t* metadata_size) {
Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
int64_t* object_size, int64_t* metadata_size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
*object_size = (int64_t)message->object_size();
*metadata_size = (int64_t)message->metadata_size();
Expand Down
Loading

0 comments on commit 676a4a9

Please sign in to comment.