diff --git a/common/common.go b/common/common.go index 5ffdb93..3fcb462 100644 --- a/common/common.go +++ b/common/common.go @@ -502,10 +502,6 @@ type MetaData struct { // Optional field, default is false (not visiable to all users) Public bool `json:"public" bson:"public"` - // DataVerified is an internal field set by ESS after ESS downloads data from CSS or by CSS after ESS uploads data - // Data can be obtained only when DataVerified field is true - DataVerified bool `json:"dataVerified" bson:"data-verified"` - // OwnerID is an internal field indicating who creates the object // This field should not be set by users OwnerID string `json:"ownerID" bson:"owner-id"` @@ -678,6 +674,10 @@ type DestinationRequestInQueue struct { Destination Destination } +type ObjectInVerifyQueue struct { + Object MetaData +} + // ACLentry contains ACL information about each user type ACLentry struct { Username string @@ -687,14 +687,20 @@ type ACLentry struct { // Object status const ( + // status at sender side NotReadyToSend = "notReady" // The object is not ready to be sent to the other side + Verifying = "verifying" // The object data is in the process of verification + VerificationFailed = "verificationFailed" // The data verification is failed ReadyToSend = "ready" // The object is ready to be sent to the other side - PartiallyReceived = "partiallyreceived" // Received the object from the other side, waiting for its data - CompletelyReceived = "completelyReceived" // The object was received completely from the other side - ObjConsumed = "objconsumed" // The object was consumed by the app - ObjDeleted = "objdeleted" // The object was deleted by the other side - ObjReceived = "objreceived" // The object was received by the app - ConsumedByDest = "consumedByDest" // The object was consumed by the other side (ESS only) + // status at receiver side + PartiallyReceived = "partiallyreceived" // Received the object from the other side, waiting for its data + ReceiverVerifying = "receiverVerifying" // The object data at receiver side is in the process of verification + ReceiverVerificationFailed = "receiverVerificationFailed" // The data verification is failed at receiver side + CompletelyReceived = "completelyReceived" // The object was received completely from the other side + ObjConsumed = "objconsumed" // The object was consumed by the app + ObjDeleted = "objdeleted" // The object was deleted by the other side + ObjReceived = "objreceived" // The object was received by the app + ConsumedByDest = "consumedByDest" // The object was consumed by the other side (ESS only) ) // Notification status and type diff --git a/common/config.go b/common/config.go index 9a100ef..df16ebf 100644 --- a/common/config.go +++ b/common/config.go @@ -117,6 +117,10 @@ type Config struct { // For the ESS, default value is 2 ObjectQueueBufferSize uint64 `env:"OBJECT_QUEUE_BUFFER_SIZE"` + // Buffer size of Object Queue to verify object data + // Default size is 500 + VerifyQueueBufferSize uint64 `env:"VERIFY_QUEUE_BUFFER_SIZE"` + // CommunicationProtocol is a comma separated list of protocols to be used for communication between CSS and ESS // The elements of the list can be 'http', 'mqtt', and 'wiotp' // wiotp indicates MQTT communication via the Watson IoT Platform and mqtt indicates direct MQTT communication to a broker @@ -690,6 +694,10 @@ func ValidateConfig() error { } } + if Configuration.VerifyQueueBufferSize == 0 { + Configuration.VerifyQueueBufferSize = 500 + } + return nil } @@ -705,6 +713,7 @@ func SetDefaultConfig(config *Config) { config.SecureListeningPort = 8443 config.UnsecureListeningPort = 8080 config.LeadershipTimeout = 30 + config.VerifyQueueBufferSize = 500 config.AuthenticationHandler = "dummy" config.CSSOnWIoTP = false config.UsingEdgeConnector = false diff --git a/core/base/apiModule.go b/core/base/apiModule.go index 925052f..e081bbf 100644 --- a/core/base/apiModule.go +++ b/core/base/apiModule.go @@ -233,7 +233,7 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com common.ObjectLocks.Lock(lockIndex) existingObject, existingObjStatus, _ := store.RetrieveObjectAndStatus(orgID, objectType, objectID) - if existingObjStatus != "" && existingObjStatus != common.ReadyToSend && existingObjStatus != common.NotReadyToSend { + if existingObjStatus != "" && existingObjStatus != common.ReadyToSend && existingObjStatus != common.NotReadyToSend && existingObjStatus != common.Verifying && existingObjStatus != common.VerificationFailed { common.ObjectLocks.Unlock(lockIndex) apiObjectLocks.Unlock(lockIndex) return &common.InvalidRequest{Message: "Can't update object of the receiving side"} @@ -241,10 +241,18 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com // Store the object in the storage module status := common.NotReadyToSend - metaData.DataVerified = false if metaData.Link != "" || metaData.NoData || metaData.SourceDataURI != "" { status = common.ReadyToSend + if metaData.NoData { + data = nil + metaData.Link = "" + metaData.SourceDataURI = "" + metaData.PublicKey = "" + metaData.Signature = "" + } } else if metaData.MetaOnly { + // data is nil + data = nil reader, err := store.RetrieveObjectData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, false) if err != nil { common.ObjectLocks.Unlock(lockIndex) @@ -261,75 +269,83 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com metaData.PublicKey = existingObject.PublicKey metaData.Signature = existingObject.Signature } - - metaData.DataVerified = true //don't need to verify data again - } - - if metaData.NoData { - data = nil - metaData.Link = "" - metaData.SourceDataURI = "" - metaData.PublicKey = "" - metaData.Signature = "" - metaData.DataVerified = true - } - - if !metaData.DataVerified && !common.NeedDataVerification(metaData) { - metaData.DataVerified = true - } - - if data != nil && metaData.DataVerified { - metaData.ObjectSize = int64(len(data)) - status = common.ReadyToSend + // If no data in the database, then status is notReady (data == nil, status == notReady) } metaData.ChunkSize = common.Configuration.MaxDataChunkSize - - // Store metadata and data, with correct verified status - deletedDestinations, err := store.StoreObject(metaData, data, status) - if err != nil { - common.ObjectLocks.Unlock(lockIndex) - apiObjectLocks.Unlock(lockIndex) - return err + if data != nil { + metaData.ObjectSize = int64(len(data)) + if !common.NeedDataVerification(metaData) { + status = common.ReadyToSend + } } // Verify - if data != nil { - // data signature verification if metadata has both publicKey and signature - // data is nil for metaOnly object. Meta-only object will not apply data verification - if common.NeedDataVerification(metaData) { - // will no store data if object metadata not exist - dataReader := bytes.NewReader(data) - dataVf := dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) - if success, err := dataVf.VerifyDataSignature(dataReader, orgID, objectType, objectID, ""); !success || err != nil { - if trace.IsLogging(logger.ERROR) { - if err != nil { - trace.Error("Failed to verify data for object %s %s, Error: %s\n", objectType, objectID, err.Error()) - - } - - } + // data signature verification if metadata has both publicKey and signature + // data is nil for metaOnly object. Meta-only object will not apply data verification + var deletedDestinations []common.StoreDestinationStatus + var err common.SyncServiceError + if data != nil && common.NeedDataVerification(metaData) { + // Store metadata, with correct verified status + deletedDestinations, err = store.StoreObject(metaData, nil, status) + if err != nil { + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + return err + } - dataVf.RemoveUnverifiedData(metaData) - store.UpdateObjectStatus(orgID, objectType, objectID, common.NotReadyToSend) + // Set object status from "notReady" to "verifying" + status = common.Verifying + if err = store.UpdateObjectStatus(orgID, objectType, objectID, status); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", status, orgID, objectType, objectID) + } + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + return err + } + // will no store data if object metadata not exist + dataReader := bytes.NewReader(data) + dataVf := dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) + if success, err := dataVf.VerifyDataSignature(dataReader, orgID, objectType, objectID, ""); !success || err != nil { + if log.IsLogging(logger.ERROR) && err != nil { + log.Error("Failed to verify data for object %s %s, Error: %s\n", objectType, objectID, err.Error()) + } - common.ObjectLocks.Unlock(lockIndex) - apiObjectLocks.Unlock(lockIndex) - return err + dataVf.RemoveUnverifiedData(metaData) + status = common.VerificationFailed + if updateStatusErr := store.UpdateObjectStatus(orgID, objectType, objectID, status); updateStatusErr != nil && log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for %s %s, Error: %s\n", status, objectType, objectID, updateStatusErr.Error()) } - if err = store.UpdateObjectDataVerifiedStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, true); err != nil { + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + return err + } else { + // verified, update object status to "ready" + status = common.ReadyToSend + if err = store.UpdateObjectStatus(orgID, objectType, objectID, status); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", status, orgID, objectType, objectID) + } common.ObjectLocks.Unlock(lockIndex) apiObjectLocks.Unlock(lockIndex) return err } - status = common.ReadyToSend + } + } else { + // Store metadata and data, with correct verified status + deletedDestinations, err = store.StoreObject(metaData, data, status) + if err != nil { + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + return err } } store.DeleteNotificationRecords(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, "", "") - if status == common.NotReadyToSend || metaData.Inactive { + if status == common.NotReadyToSend || status == common.VerificationFailed || status == common.Verifying || metaData.Inactive { common.ObjectLocks.Unlock(lockIndex) apiObjectLocks.Unlock(lockIndex) return nil @@ -525,11 +541,7 @@ func GetObjectData(orgID string, objectType string, objectID string) (io.Reader, if err != nil { return nil, err } - if metaData == nil || status == common.NotReadyToSend || status == common.PartiallyReceived { - return nil, nil - } - - if !metaData.DataVerified { + if metaData == nil || status == common.NotReadyToSend || status == common.Verifying || status == common.VerificationFailed || status == common.ReceiverVerifying || status == common.ReceiverVerificationFailed || status == common.PartiallyReceived { return nil, nil } if metaData.DestinationDataURI != "" && status == common.CompletelyReceived { @@ -588,7 +600,7 @@ func PutObjectAllData(orgID string, objectType string, objectID string, dataRead apiObjectLocks.Unlock(lockIndex) return false, nil } - if status != common.ReadyToSend && status != common.NotReadyToSend { + if status != common.ReadyToSend && status != common.NotReadyToSend && status != common.Verifying && status != common.VerificationFailed { common.ObjectLocks.Unlock(lockIndex) apiObjectLocks.Unlock(lockIndex) return false, &common.InvalidRequest{Message: "Can't update data of the receiving side"} @@ -601,35 +613,55 @@ func PutObjectAllData(orgID string, objectType string, objectID string, dataRead var dataVf *dataVerifier.DataVerifier if common.NeedDataVerification(*metaData) { - //start data verification + // Set object status from "notReady" to "verifying" + if err := store.UpdateObjectStatus(orgID, objectType, objectID, common.Verifying); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", common.Verifying, orgID, objectType, objectID) + } + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + return false, &common.InternalError{Message: "Failed to update object status to verifying"} + } + if trace.IsLogging(logger.DEBUG) { - trace.Debug("In PutObjectData. Start data verification %s %s\n", objectType, objectID) + trace.Debug("In PutObjectAllData. Start data verification %s %s\n", objectType, objectID) } + //start data verification. After verified, the data is stored. object size is updated, dataID is incremented dataVf = dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) if success, err := dataVf.VerifyDataSignature(dataReader, orgID, objectType, objectID, ""); !success || err != nil { - if trace.IsLogging(logger.ERROR) { - trace.Error("Failed to verify data for object %s %s, remove unverified data\n", objectType, objectID) + errMessage := "" + if log.IsLogging(logger.ERROR) { + if err != nil { + errMessage = err.Error() + } + log.Error("Failed to verify data for object %s %s, remove unverified data. Error: %s\n", objectType, objectID, errMessage) } dataVf.RemoveUnverifiedData(*metaData) + + if updateErr := store.UpdateObjectStatus(orgID, objectType, objectID, common.VerificationFailed); updateErr != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s, Error: %s", common.VerificationFailed, orgID, objectType, objectID, updateErr.Error()) + } + } common.ObjectLocks.Unlock(lockIndex) apiObjectLocks.Unlock(lockIndex) - return false, &common.InvalidRequest{Message: "Failed to verify and store data, Error: " + err.Error()} - } - if trace.IsLogging(logger.DEBUG) { - trace.Debug("In PutObjectData. data verified for object %s %s\n", objectType, objectID) + return false, &common.InternalError{Message: "Failed to verify and store data, Error: " + errMessage} } - } - - if dataVf != nil { - // If the data has been verified, then set metadata.DataVerified to true - if err = store.UpdateObjectDataVerifiedStatus(orgID, objectType, objectID, true); err != nil { + if err := store.UpdateObjectStatus(orgID, objectType, objectID, common.ReadyToSend); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", common.ReadyToSend, orgID, objectType, objectID) + } common.ObjectLocks.Unlock(lockIndex) apiObjectLocks.Unlock(lockIndex) - return false, err + return false, &common.InternalError{Message: "Failed to updated object status to " + common.ReadyToSend} + } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In PutObjectAllData. data verified for object %s %s\n", objectType, objectID) } } else { + // After the data is stored. object size is updated, dataID is incremented, object status is set to "ready" if exists, err := store.StoreObjectData(orgID, objectType, objectID, dataReader); err != nil || !exists { common.ObjectLocks.Unlock(lockIndex) apiObjectLocks.Unlock(lockIndex) @@ -664,7 +696,7 @@ func PutObjectAllData(orgID string, objectType string, objectID string, dataRead // Should be in antoher thread if trace.IsLogging(logger.DEBUG) { - trace.Debug("In PutObjectData. Send object to objectQueue %s %s\n", objectType, objectID) + trace.Debug("In PutObjectAllData. Send object to objectQueue %s %s\n", objectType, objectID) } common.ObjectLocks.Unlock(lockIndex) @@ -673,7 +705,7 @@ func PutObjectAllData(orgID string, objectType string, objectID string, dataRead objectQueue.SendObjectToQueue(objectInQueue) if trace.IsLogging(logger.DEBUG) { - trace.Debug("In PutObjectData. Return response for PutObjectData %s %s\n", objectType, objectID) + trace.Debug("In PutObjectAllData. Return response for PutObjectData %s %s\n", objectType, objectID) } return true, nil } @@ -718,7 +750,7 @@ func PutObjectChunkData(orgID string, objectType string, objectID string, dataRe apiObjectLocks.Unlock(lockIndex) return false, nil } - if status != common.ReadyToSend && status != common.NotReadyToSend { + if status != common.ReadyToSend && status != common.NotReadyToSend && status != common.Verifying && status != common.VerificationFailed { common.ObjectLocks.Unlock(lockIndex) apiObjectLocks.Unlock(lockIndex) return false, &common.InvalidRequest{Message: "Can't update data of the receiving side"} @@ -758,57 +790,33 @@ func PutObjectChunkData(orgID string, objectType string, objectID string, dataRe apiObjectLocks.Unlock(lockIndex) return true, nil - } else { - // Is lastChunk - if isTempData { - // Verify data - if common.NeedDataVerification(*metaData) { - //start data verification - if trace.IsLogging(logger.DEBUG) { - trace.Debug("In PutObjectData. Start data verification %s %s\n", objectType, objectID) - } - // verify data - dataVf := dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) - if dr, err := dataVf.GetTempData(*metaData); err != nil { - common.ObjectLocks.Unlock(lockIndex) - apiObjectLocks.Unlock(lockIndex) - return false, &common.InvalidRequest{Message: "Failed to get temp data for data verify, Error: " + err.Error()} - } else if success, err := dataVf.VerifyDataSignature(dr, orgID, objectType, objectID, ""); !success || err != nil { - if log.IsLogging(logger.ERROR) { - log.Error("Failed to verify data for object %s %s, remove unverified data\n", objectType, objectID) - } - dataVf.RemoveUnverifiedData(*metaData) - common.ObjectLocks.Unlock(lockIndex) - apiObjectLocks.Unlock(lockIndex) - errMsg := "" - if err != nil && trace.IsLogging(logger.ERROR) { - errMsg = err.Error() - log.Error("Failed to verify data for object %s %s, Error: %s\n", objectType, objectID, errMsg) - } - - return false, &common.InvalidRequest{Message: "Failed to verify and store data, Error: " + errMsg} - } - if trace.IsLogging(logger.DEBUG) { - trace.Debug("In PutObjectChunkData. data verified for object %s %s\n", objectType, objectID) - } - - // If the data has been verified, then set metadata.DataVerified to true - if err = store.UpdateObjectDataVerifiedStatus(orgID, objectType, objectID, true); err != nil { - common.ObjectLocks.Unlock(lockIndex) - apiObjectLocks.Unlock(lockIndex) - return false, err - } + } else if isTempData { + // Is lastChunk and need data verification: + // send object to queue for data verification, and return + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In PutObjectChunkData, send object %s %s to data verification queue\n", objectType, objectID) + } + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + objectForDataVerification := common.ObjectInVerifyQueue{Object: *metaData} + objectDataVerifyQueue.SendObjectToQueue(objectForDataVerification) - } - } else { - // handle object info (update metadata.ObjectSize, metadata.InstanceId, metaData.DataId and object status from notReady to Ready), because Store.AppendObjectData will not modify those object info - if _, err := store.HandleObjectInfoForLastDataChunk(orgID, objectType, objectID, false, totalSize); err != nil { - common.ObjectLocks.Unlock(lockIndex) - apiObjectLocks.Unlock(lockIndex) - return false, err - } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In PutObjectChunkData. Return response for PutObjectChunkData %s %s\n", objectType, objectID) } + return true, nil + } else { + // It is last chunk, and no data verification is needed + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In PutObjectChunkData, received last chunk for %s %s. Data verificaton is not applied\n", objectType, objectID) + } + // handle object info (update metadata.ObjectSize, metadata.InstanceId, metaData.DataId and object status from notReady to Ready), because Store.AppendObjectData will not modify those object info + if _, err := store.HandleObjectInfoForLastDataChunk(orgID, objectType, objectID, false, totalSize); err != nil { + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + return false, err + } var updatedMetaData *common.MetaData // StoreObject increments the instance id if this is a data update, we need to fetch the updated meta data // Also, StoreObjectData updates the ObjectSize, so we need to fetch the updated meta data @@ -1161,7 +1169,7 @@ func DeleteObject(orgID string, objectType string, objectID string) common.SyncS apiObjectLocks.Unlock(lockIndex) return &common.InvalidRequest{Message: "Object not found"} } - if status != common.NotReadyToSend && status != common.ReadyToSend { + if status != common.NotReadyToSend && status != common.ReadyToSend && status != common.Verifying && status != common.VerificationFailed { // This node is not the originator of the object being deleted. // ESS is not allowed to remove such objects if common.Configuration.NodeType == common.ESS { @@ -1230,7 +1238,7 @@ func ActivateObject(orgID string, objectType string, objectID string) common.Syn apiObjectLocks.Unlock(lockIndex) return &common.InvalidRequest{Message: "Object not found"} } - if status != common.NotReadyToSend && status != common.ReadyToSend { + if status != common.NotReadyToSend && status != common.ReadyToSend && status != common.Verifying && status != common.VerificationFailed { common.ObjectLocks.Unlock(lockIndex) apiObjectLocks.Unlock(lockIndex) return &common.InvalidRequest{Message: "Can't activate object on the receiving side"} diff --git a/core/base/apiModule_test.go b/core/base/apiModule_test.go index 281725e..3a6601f 100644 --- a/core/base/apiModule_test.go +++ b/core/base/apiModule_test.go @@ -77,7 +77,7 @@ func setupDataSignature(data []byte, hashAlgo string) (string, string, error) { } func setupObjectQueue() { - objectQueue = communications.NewObjectWorkQueue(40) + objectQueue = NewObjectWorkQueue(40) } func teardownObjectQueue() { diff --git a/core/base/apiServer_test.go b/core/base/apiServer_test.go index c17e34c..3ad1ea5 100644 --- a/core/base/apiServer_test.go +++ b/core/base/apiServer_test.go @@ -1415,7 +1415,7 @@ func testAPIServerSetup(nodeType string, storageType string) string { } common.Configuration.NodeType = nodeType - objectQueue = communications.NewObjectWorkQueue(40) + objectQueue = NewObjectWorkQueue(40) return "" } diff --git a/core/base/base.go b/core/base/base.go index 2b688fb..9c46a8e 100644 --- a/core/base/base.go +++ b/core/base/base.go @@ -43,8 +43,9 @@ var started bool var waitersForStartChannel chan chan int -var objectQueue *communications.ObjectWorkQueue +var objectQueue *ObjectWorkQueue var destReqQueue *communications.DestinationRequestQueue +var objectDataVerifyQueue *ObjectVerifyQueue func init() { blockChannel = make(chan int, 1) @@ -153,15 +154,22 @@ func Start(swaggerFile string, registerHandlers bool) common.SyncServiceError { common.InitObjectDownloadSemaphore() // storage, lock should be setup before initialize objectQueue - queueBufferSize := common.Configuration.ObjectQueueBufferSize - objectQueue = communications.NewObjectWorkQueue(queueBufferSize) + objectQueueBufferSize := common.Configuration.ObjectQueueBufferSize + objectDataVerifyQueueBufferSize := common.Configuration.VerifyQueueBufferSize + + objectQueue = NewObjectWorkQueue(objectQueueBufferSize) if trace.IsLogging(logger.INFO) { - trace.Info("ObjectQueue initialzed with buffer size %d", queueBufferSize) + trace.Info("ObjectQueue initialzed with buffer size %d", objectQueueBufferSize) } - destReqQueue = communications.NewDestinationRequestQueue(queueBufferSize) + objectDataVerifyQueue = NewObjectVerifyQueue(objectDataVerifyQueueBufferSize) if trace.IsLogging(logger.INFO) { - trace.Info("DestinationRequestQueue initialzed with buffer size %d", queueBufferSize) + trace.Info("ObjectVerifyQueue initialzed with buffer size %d", objectDataVerifyQueueBufferSize) + } + + destReqQueue = communications.NewDestinationRequestQueue(objectQueueBufferSize) + if trace.IsLogging(logger.INFO) { + trace.Info("DestinationRequestQueue initialzed with buffer size %d", objectQueueBufferSize) } communications.DestReqQueue = destReqQueue @@ -344,6 +352,16 @@ func Stop(quiesceTime int, unregisterSelf bool) { } } + if objectDataVerifyQueue != nil { + if trace.IsLogging(logger.INFO) { + trace.Info("Closing objectVerifyQueue...") + } + objectDataVerifyQueue.Close() + if trace.IsLogging(logger.INFO) { + trace.Info("ObjectVerifyQueue closed") + } + } + if destReqQueue != nil { if trace.IsLogging(logger.INFO) { trace.Info("Closing destReqQueue...") diff --git a/core/base/objectVerifyQueue.go b/core/base/objectVerifyQueue.go new file mode 100644 index 0000000..5ae5d78 --- /dev/null +++ b/core/base/objectVerifyQueue.go @@ -0,0 +1,148 @@ +package base + +import ( + "github.com/open-horizon/edge-sync-service/common" + "github.com/open-horizon/edge-sync-service/core/dataVerifier" + "github.com/open-horizon/edge-utilities/logger" + "github.com/open-horizon/edge-utilities/logger/log" + "github.com/open-horizon/edge-utilities/logger/trace" +) + +type ObjectVerifyQueue struct { + objectDataVerifyQueue chan common.ObjectInVerifyQueue + bufferSize uint64 +} + +// Only support doing data verification at sender side +func NewObjectVerifyQueue(bufferSize uint64) *ObjectVerifyQueue { + q := &ObjectVerifyQueue{ + objectDataVerifyQueue: make(chan common.ObjectInVerifyQueue, bufferSize), + bufferSize: bufferSize, + } + + go q.run() + return q +} + +func (q *ObjectVerifyQueue) run() { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Check object verify queue to process object data verification") + } + + for { + select { + case i, ok := <-q.objectDataVerifyQueue: + if ok { + meta := i.Object + if trace.IsLogging(logger.TRACE) { + trace.Trace("Get an object %s/%s/%s from object verification Queue", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + + if common.NeedDataVerification(meta) { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Start data verification for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + + lockIndex := common.HashStrings(meta.DestOrgID, meta.ObjectType, meta.ObjectID) + common.ObjectLocks.Lock(lockIndex) + + // Set object status from "notReady" to "verifying" + if err := store.UpdateObjectStatus(meta.DestOrgID, meta.ObjectType, meta.ObjectID, common.Verifying); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", common.Verifying, meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + common.ObjectLocks.Unlock(lockIndex) + continue + } + status := common.VerificationFailed + dataVf := dataVerifier.NewDataVerifier(meta.HashAlgorithm, meta.PublicKey, meta.Signature) + if dr, err := dataVf.GetTempData(meta); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to get temp data for data verify for object %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + // Set object status from "verifying" to "verificationFailed" + } else if success, err := dataVf.VerifyDataSignature(dr, meta.DestOrgID, meta.ObjectType, meta.ObjectID, ""); !success || err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to verify data for object %s/%s/%s, remove unverified data", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + if err != nil { + log.Error("Error: %s", err.Error()) + } + } + dataVf.RemoveUnverifiedData(meta) + // Set object status from "verifying" to "verification_failed" + } else { + status = common.ReadyToSend + } + + // Data is verified, object status is set to "ready" during VerifyDataSignature(StoreObjectData) + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Data verification is done for object %s/%s/%s, updating object status to %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, status) + } + + if err := store.UpdateObjectStatus(meta.DestOrgID, meta.ObjectType, meta.ObjectID, status); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", status, meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + common.ObjectLocks.Unlock(lockIndex) + continue + } + + if status == common.VerificationFailed { + // Verification failed, will return without sending out notificaitons to destinations + if log.IsLogging(logger.ERROR) { + log.Error("Verification failed for object %s/%s/%s, return", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + common.ObjectLocks.Unlock(lockIndex) + continue + } + + // Data is verified, object status is set to "ready" during VerifyDataSignature(StoreObjectData) + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Data verified for object %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + + // StoreObject increments the instance id if this is a data update, we need to fetch the updated meta data + // Also, StoreObjectData updates the ObjectSize, so we need to fetch the updated meta data + updatedMetaData, err := store.RetrieveObject(meta.DestOrgID, meta.ObjectType, meta.ObjectID) + if err != nil { + common.ObjectLocks.Unlock(lockIndex) + continue + } + + if updatedMetaData.Inactive { + // Don't send inactive objects to the other side + common.ObjectLocks.Unlock(lockIndex) + continue + } + common.ObjectLocks.Unlock(lockIndex) + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Updating object status to ready for %s/%s/%s\n", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Send object to objectQueue %s/%s/%s\n", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + + objectInQueue := common.ObjectInQueue{NotificationAction: common.Update, NotificationType: common.TypeObject, Object: *updatedMetaData, Destinations: []common.StoreDestinationStatus{}} + objectQueue.SendObjectToQueue(objectInQueue) + + } + } else { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Nothing from object verify Queue") + } + + } + } + } +} + +func (q *ObjectVerifyQueue) Close() { + close(q.objectDataVerifyQueue) +} + +func (q *ObjectVerifyQueue) SendObjectToQueue(objectInVerifyQueue common.ObjectInVerifyQueue) { + q.objectDataVerifyQueue <- objectInVerifyQueue +} diff --git a/core/communications/objectWorkQueue.go b/core/base/objectWorkQueue.go similarity index 82% rename from core/communications/objectWorkQueue.go rename to core/base/objectWorkQueue.go index ed5545e..d13447f 100644 --- a/core/communications/objectWorkQueue.go +++ b/core/base/objectWorkQueue.go @@ -1,7 +1,8 @@ -package communications +package base import ( "github.com/open-horizon/edge-sync-service/common" + "github.com/open-horizon/edge-sync-service/core/communications" "github.com/open-horizon/edge-utilities/logger" "github.com/open-horizon/edge-utilities/logger/trace" ) @@ -41,13 +42,13 @@ func (q *ObjectWorkQueue) run() { if trace.IsLogging(logger.TRACE) { trace.Trace("Prepare update notifications for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID) } - notificationsInfo, _ = PrepareObjectNotifications(meta) + notificationsInfo, _ = communications.PrepareObjectNotifications(meta) } else if i.NotificationType == common.TypeDestination { if trace.IsLogging(logger.TRACE) { trace.Trace("For object %s/%s/%s, prepare update destination notifications for destinations %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destinations) } if len(i.Destinations) > 0 { - notificationsInfo, _ = PrepareNotificationsForDestinations(meta, i.Destinations, common.Update) + notificationsInfo, _ = communications.PrepareNotificationsForDestinations(meta, i.Destinations, common.Update) } } @@ -56,18 +57,18 @@ func (q *ObjectWorkQueue) run() { if trace.IsLogging(logger.TRACE) { trace.Trace("Prepare delete notifications for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID) } - notificationsInfo, _ = PrepareDeleteNotifications(meta) + notificationsInfo, _ = communications.PrepareDeleteNotifications(meta) } else if i.NotificationType == common.TypeDestination { if trace.IsLogging(logger.TRACE) { trace.Trace("For object %s/%s/%s, prepare delete destination notifications for destinations %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destinations) } if len(i.Destinations) > 0 { - notificationsInfo, _ = PrepareNotificationsForDestinations(meta, i.Destinations, common.Delete) + notificationsInfo, _ = communications.PrepareNotificationsForDestinations(meta, i.Destinations, common.Delete) } } } - SendNotifications(notificationsInfo) + communications.SendNotifications(notificationsInfo) if trace.IsLogging(logger.TRACE) { trace.Trace("Sent notifications for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID) diff --git a/core/communications/httpCommunication.go b/core/communications/httpCommunication.go index e99f89f..1b6ad43 100644 --- a/core/communications/httpCommunication.go +++ b/core/communications/httpCommunication.go @@ -797,12 +797,29 @@ func (communication *HTTP) GetAllData(metaData common.MetaData, offset int64) co var dataVf *dataVerifier.DataVerifier if common.NeedDataVerification(metaData) { + // Set object status from "partiallyReceived" to "receiverVerifying" + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Updating ESS object status to %s for %s %s %s...", common.ReceiverVerifying, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + if err := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.ReceiverVerifying); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", common.ReceiverVerifying, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + common.ObjectLocks.Unlock(lockIndex) + return err + } dataVf = dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) if dataVerified, err := dataVf.VerifyDataSignature(response.Body, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !dataVerified || err != nil { if log.IsLogging(logger.ERROR) { log.Error("Failed to verify data for object %s %s, remove temp data\n", metaData.ObjectType, metaData.ObjectID) } dataVf.RemoveUnverifiedData(metaData) + + if updateErr := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.ReceiverVerificationFailed); updateErr != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s. Error: %s", common.ReceiverVerificationFailed, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, updateErr.Error()) + } + } common.ObjectLocks.Unlock(lockIndex) return err } @@ -825,17 +842,7 @@ func (communication *HTTP) GetAllData(metaData common.MetaData, offset int64) co } } - // set metadata.DataVerified = true - if err = Store.UpdateObjectDataVerifiedStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, true); err != nil { - if log.IsLogging(logger.ERROR) { - log.Error("Failed to update metadata.DataVerified to true for object %s %s\n", metaData.ObjectType, metaData.ObjectID) - } - common.ObjectLocks.Unlock(lockIndex) - return err - } - if trace.IsLogging(logger.DEBUG) { - trace.Debug("Updated object DataVerified to true for %s %s %s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) trace.Debug("Updating ESS object status to completelyReceived for %s %s %s...", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) } if err := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.CompletelyReceived); err != nil { @@ -1053,26 +1060,55 @@ func (communication *HTTP) GetDataByChunk(metaData common.MetaData, offset int64 if isLastChunk && isTempData { // verify data - dataVf := dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) - if dr, err := dataVf.GetTempData(metaData); err != nil { + // Set object status from "partiallyReceived" to "receiverVerifying" + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Updating ESS object status to %s for %s %s %s...", common.ReceiverVerifying, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + if err = Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.ReceiverVerifying); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", common.ReceiverVerifying, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } common.ObjectLocks.Unlock(lockIndex) return err - } else if success, err := dataVf.VerifyDataSignature(dr, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !success || err != nil { + } + objectVerifyStatus := "" + dataVf := dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) + if dr, getDataErr := dataVf.GetTempData(metaData); getDataErr != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to get temp data for data verify for object %s/%s/%s. Error: %s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, getDataErr.Error()) + } + objectVerifyStatus = common.ReceiverVerificationFailed + err = getDataErr + } else if success, verifyErr := dataVf.VerifyDataSignature(dr, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !success || verifyErr != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to verify data for object %s/%s/%s, remove unverified data", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + if verifyErr != nil { + log.Error("Error: %s", verifyErr.Error()) + } + } // remove temp data dataVf.RemoveUnverifiedData(metaData) - common.ObjectLocks.Unlock(lockIndex) - return err + objectVerifyStatus = common.ReceiverVerificationFailed + err = verifyErr } - // set metadata.DataVerified = true - if trace.IsLogging(logger.DEBUG) { - trace.Debug("Updated object DataVerified to true for %s %s %s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) - } - - if err = Store.UpdateObjectDataVerifiedStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, true); err != nil { + if objectVerifyStatus == common.ReceiverVerificationFailed { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Updating ESS object status to %s for %s %s %s", objectVerifyStatus, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + if updateErr := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, objectVerifyStatus); updateErr != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s. Error: %s", objectVerifyStatus, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, updateErr.Error()) + } + common.ObjectLocks.Unlock(lockIndex) + return updateErr + } common.ObjectLocks.Unlock(lockIndex) return err } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Data verified for object %s/%s/%s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } } } @@ -1481,24 +1517,39 @@ func (communication *HTTP) handlePutData(orgID string, objectType string, object func (communication *HTTP) handlePutAllData(metaData common.MetaData, request *http.Request) (bool, common.SyncServiceError) { var dataVf *dataVerifier.DataVerifier if common.NeedDataVerification(metaData) { + // Set object status from "partiallyReceived" to "receiverVerifying" + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Need data verification, set object status to %s for %s %s %s...", common.ReceiverVerifying, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + if err := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.ReceiverVerifying); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", common.ReceiverVerifying, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + return true, err + } + dataVf = dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) if dataVerified, err := dataVf.VerifyDataSignature(request.Body, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !dataVerified || err != nil { if log.IsLogging(logger.ERROR) { - log.Error("Failed to verify data for object %s %s, remove unverified data\n", metaData.ObjectType, metaData.ObjectID) + log.Error("Failed to verify data for object %s/%s/%s, remove unverified data", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) } dataVf.RemoveUnverifiedData(metaData) + if updateErr := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.ReceiverVerificationFailed); updateErr != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s. Error: %s", common.ReceiverVerificationFailed, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, updateErr.Error()) + } + } + if err == nil { + return true, &common.InternalError{Message: "Failed to verify object data"} + } return true, err } - } - - if dataVf != nil { - if err := Store.UpdateObjectDataVerifiedStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, true); err != nil { + } else { + if found, err := Store.StoreObjectData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, request.Body); err != nil { // No data verification applied, then store data directly return true, err + } else if !found { + return true, &common.InvalidRequest{Message: "Failed to find object to set data"} } - } else if found, err := Store.StoreObjectData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, request.Body); err != nil { // No data verification applied, then store data directly - return true, err - } else if !found { - return true, &common.InvalidRequest{Message: "Failed to find object to set data"} } if err := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.CompletelyReceived); err != nil { @@ -1554,28 +1605,61 @@ func (communication *HTTP) handlePutChunkedData(metaData common.MetaData, reques } if isLastChunk && isTempData { + // Set object status from "partiallyReceived" to "receiverVerifying" + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Updating object status to %s for %s %s %s...", common.ReceiverVerifying, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + if err := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.ReceiverVerifying); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s", common.ReceiverVerifying, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + return isLastChunk, err + } + if trace.IsLogging(logger.DEBUG) { trace.Debug("Start data verification for %s %s %s\n", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) } // verify + objectVerifyStatus := "" dataVf := dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) - if dr, err := dataVf.GetTempData(metaData); err != nil { - return isLastChunk, err - } else if success, err := dataVf.VerifyDataSignature(dr, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !success || err != nil { + if dr, getDataErr := dataVf.GetTempData(metaData); getDataErr != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to get temp data for data verify for object %s/%s/%s. Error: %s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, getDataErr.Error()) + } + objectVerifyStatus = common.ReceiverVerificationFailed + err = getDataErr + } else if success, verifyErr := dataVf.VerifyDataSignature(dr, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !success || verifyErr != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to verify data for object %s/%s/%s, remove unverified data", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + if verifyErr != nil { + log.Error("Error: %s", verifyErr.Error()) + } + } // remove temp data dataVf.RemoveUnverifiedData(metaData) + objectVerifyStatus = common.ReceiverVerificationFailed + err = verifyErr + } + + // Failed during verification, err could be nil + if objectVerifyStatus == common.ReceiverVerificationFailed { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Updating object status to %s for %s %s %s", objectVerifyStatus, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + if updateErr := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, objectVerifyStatus); updateErr != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update object status to %s for object %s/%s/%s. Error: %s", objectVerifyStatus, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, updateErr.Error()) + } + return isLastChunk, updateErr + } return isLastChunk, err } - // set metadata.DataVerified = true if trace.IsLogging(logger.DEBUG) { - trace.Debug("Updated object DataVerified to true for %s %s %s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + trace.Debug("Data verified for object %s/%s/%s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) } - if err := Store.UpdateObjectDataVerifiedStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, true); err != nil { - return isLastChunk, err - } } if isLastChunk { diff --git a/core/communications/notificationHandler.go b/core/communications/notificationHandler.go index 77dc727..05fd1be 100644 --- a/core/communications/notificationHandler.go +++ b/core/communications/notificationHandler.go @@ -418,17 +418,11 @@ func handleUpdate(metaData common.MetaData, maxInflightChunks int) common.SyncSe // check if object is already exist, and if it is created from other side, return with error _, existingObjStatus, _ := Store.RetrieveObjectAndStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) - if existingObjStatus == common.ReadyToSend || existingObjStatus == common.NotReadyToSend { + if existingObjStatus == common.ReadyToSend || existingObjStatus == common.NotReadyToSend || existingObjStatus == common.Verifying || existingObjStatus == common.VerificationFailed { common.ObjectLocks.Unlock(lockIndex) return ¬ificationHandlerError{"Error in handleUpdate: cannot update object from the receiver side."} } - // If has data, and need to verifiy data, set DataVerified to false - metaData.DataVerified = true - if status == common.PartiallyReceived && common.NeedDataVerification(metaData) { - metaData.DataVerified = false - } - // Store the object. Now change the receiver status to "PartiallyReceived" or "CompletelyReceived" if _, err := Store.StoreObject(metaData, nil, status); err != nil { common.ObjectLocks.Unlock(lockIndex) diff --git a/core/dataVerifier/dataVerifier_test.go b/core/dataVerifier/dataVerifier_test.go index 7f7962d..f8ad7b6 100644 --- a/core/dataVerifier/dataVerifier_test.go +++ b/core/dataVerifier/dataVerifier_test.go @@ -99,7 +99,7 @@ func testVerifyDataSignature(hashAlgo string, t *testing.T) { HashAlgorithm: hashAlgo, PublicKey: publicKey, Signature: signature, - DataVerified: false, + //DataVerified: false, } // Store object metadata @@ -121,10 +121,6 @@ func testVerifyDataSignature(hashAlgo string, t *testing.T) { t.Errorf("Error verifying data, data should pass verification. verified: %t, error: %s\n", verified, err.Error()) } - if err = Store.UpdateObjectDataVerifiedStatus(orgID, objectType, objectID, true); err != nil { - t.Errorf("Failed to update DataVerified to true, error: %s\n", err.Error()) - } - var reader io.Reader if reader, err = Store.RetrieveObjectData(orgID, objectType, objectID, false); err != nil { t.Errorf("Error retrieve verified data for %s %s %s, error: %s\n", orgID, objectType, objectID, err.Error()) @@ -219,7 +215,6 @@ func setupObjectForVerify(objectID string, publicKey string, signature string, h HashAlgorithm: hashAlgo, PublicKey: publicKey, Signature: signature, - DataVerified: false, DestinationDataURI: destinationURI, } diff --git a/core/leader/leader.go b/core/leader/leader.go index 97a959a..1e3e0ce 100644 --- a/core/leader/leader.go +++ b/core/leader/leader.go @@ -95,33 +95,41 @@ func SetUnsubcribeCallback(callback func() common.SyncServiceError) { } func startLeadershipPeriodicUpdate() { - leaderTicker = time.NewTicker(time.Second * time.Duration(common.Configuration.LeadershipTimeout) / 2) + leaderTicker = time.NewTicker(time.Second * time.Duration(common.Configuration.LeadershipTimeout) / 6) go func() { common.GoRoutineStarted() keepRunning := true + leaderFailureLimit := 3 + leaderFailureCnt := 0 for keepRunning { select { case <-leaderTicker.C: if isLeader { ok, err := store.LeaderPeriodicUpdate(leaderID.String()) if err != nil || !ok { - isLeader = false - if changeLeadership != nil { - changeLeadership(false) - } - if err != nil { - if unsubscribe != nil { - unsubscribe() + + leaderFailureCnt++ + if leaderFailureCnt >= leaderFailureLimit { + isLeader = false + leaderFailureCnt = 0 + if changeLeadership != nil { + changeLeadership(false) } - if log.IsLogging(logger.ERROR) { - log.Error("%s\n", err) + if err != nil { + if unsubscribe != nil { + unsubscribe() + } + if log.IsLogging(logger.ERROR) { + log.Error("%s\n", err) + } + } + if trace.IsLogging(logger.TRACE) { + trace.Trace("Have lost the leadership") } - } - if trace.IsLogging(logger.TRACE) { - trace.Trace("Have lost the leadership") } } else { lastTimestamp = time.Now() + leaderFailureCnt = 0 } } else { _, heartbeatTimeout, lastHeartbeatTS, version, err := store.RetrieveLeader() diff --git a/core/storage/boltStorage.go b/core/storage/boltStorage.go index 8c878b3..d675ade 100644 --- a/core/storage/boltStorage.go +++ b/core/storage/boltStorage.go @@ -179,7 +179,7 @@ func (store *BoltStorage) PerformMaintenance() { function := func(object boltObject) bool { if object.Meta.Expiration != "" && object.Meta.Expiration <= currentTime && - (object.Status == common.ReadyToSend || object.Status == common.NotReadyToSend) { + (object.Status == common.ReadyToSend || object.Status == common.NotReadyToSend || object.Status == common.Verifying || object.Status == common.VerificationFailed) { return true } return false @@ -236,10 +236,10 @@ func (store *BoltStorage) StoreObject(metaData common.MetaData, data []byte, sta var dests []common.StoreDestinationStatus var deletedDests []common.StoreDestinationStatus - // If the object was receieved from a service (status NotReadyToSend/ReadyToSend), i.e. this node is the origin of the object, + // If the object was receieved from a service (status NotReadyToSend/ReadyToSend/Verifying/VerificationFailed), i.e. this node is the origin of the object, // set instance id. If the object was received from the other side, this node is the receiver of the object: // keep the instance id of the meta data. - if status == common.NotReadyToSend || status == common.ReadyToSend { + if status == common.NotReadyToSend || status == common.ReadyToSend || status == common.Verifying || status == common.VerificationFailed { newID := store.getInstanceID() metaData.InstanceID = newID if data != nil && !metaData.NoData && !metaData.MetaOnly { @@ -345,10 +345,12 @@ func (store *BoltStorage) StoreObjectData(orgID string, objectType string, objec } function := func(object boltObject) (boltObject, common.SyncServiceError) { + // If it is called by dataVerifier, the status is verifying. The object status will not changed to "ready". + // This is because at this moment, the data is not yet verified. if object.Status == common.NotReadyToSend { object.Status = common.ReadyToSend } - if object.Status == common.NotReadyToSend || object.Status == common.ReadyToSend { + if object.Status == common.NotReadyToSend || object.Status == common.ReadyToSend || object.Status == common.Verifying { newID := store.getInstanceID() object.Meta.InstanceID = newID object.Meta.DataID = newID @@ -777,7 +779,7 @@ func (store *BoltStorage) GetObjectsToActivate() ([]common.MetaData, common.Sync currentTime := time.Now().UTC().Format(time.RFC3339) result := make([]common.MetaData, 0) function := func(object boltObject) { - if (object.Status == common.NotReadyToSend || object.Status == common.ReadyToSend) && + if (object.Status == common.NotReadyToSend || object.Status == common.ReadyToSend || object.Status == common.Verifying || object.Status == common.VerificationFailed) && object.Meta.Inactive && object.Meta.ActivationTime != "" && object.Meta.ActivationTime <= currentTime { result = append(result, object.Meta) } @@ -812,7 +814,7 @@ func (store *BoltStorage) AppendObjectData(orgID string, objectType string, obje return dataURI.AppendData(dataPath, dataReader, dataLength, offset, total, isFirstChunk, isLastChunk, isTempData) } -// Handles the last data chunk +// Handles the last data chunk when no data verification needed func (store *BoltStorage) HandleObjectInfoForLastDataChunk(orgID string, objectType string, objectID string, isTempData bool, dataSize int64) (bool, common.SyncServiceError) { //dataPath := createDataPath(store.localDataPath, orgID, objectType, objectID) function := func(object boltObject) (boltObject, common.SyncServiceError) { @@ -851,15 +853,6 @@ func (store *BoltStorage) UpdateObjectStatus(orgID string, objectType string, ob return store.updateObjectHelper(orgID, objectType, objectID, function) } -// UpdateObjectDataVerifiedStatus updates object's dataVerified field -func (store *BoltStorage) UpdateObjectDataVerifiedStatus(orgID string, objectType string, objectID string, verified bool) common.SyncServiceError { - function := func(object boltObject) (boltObject, common.SyncServiceError) { - object.Meta.DataVerified = verified - return object, nil - } - return store.updateObjectHelper(orgID, objectType, objectID, function) -} - // UpdateObjectSourceDataURI pdates object's source data URI func (store *BoltStorage) UpdateObjectSourceDataURI(orgID string, objectType string, objectID string, sourceDataURI string) common.SyncServiceError { function := func(object boltObject) (boltObject, common.SyncServiceError) { @@ -1241,7 +1234,7 @@ func (store *BoltStorage) UpdateObjectDelivering(orgID string, objectType string func (store *BoltStorage) GetNumberOfStoredObjects() (uint32, common.SyncServiceError) { var count uint32 function := func(object boltObject) { - if object.Status == common.ReadyToSend || object.Status == common.NotReadyToSend { + if object.Status == common.ReadyToSend || object.Status == common.NotReadyToSend || object.Status == common.Verifying || object.Status == common.VerificationFailed { count++ } } diff --git a/core/storage/cache.go b/core/storage/cache.go index c318294..8973810 100644 --- a/core/storage/cache.go +++ b/core/storage/cache.go @@ -99,11 +99,6 @@ func (store *Cache) UpdateObjectStatus(orgID string, objectType string, objectID return store.Store.UpdateObjectStatus(orgID, objectType, objectID, status) } -// UpdateObjectDataVerifiedStatus updates object's dataVerified field -func (store *Cache) UpdateObjectDataVerifiedStatus(orgID string, objectType string, objectID string, verified bool) common.SyncServiceError { - return store.Store.UpdateObjectDataVerifiedStatus(orgID, objectType, objectID, verified) -} - // UpdateObjectSourceDataURI pdates object's source data URI func (store *Cache) UpdateObjectSourceDataURI(orgID string, objectType string, objectID string, sourceDataURI string) common.SyncServiceError { return store.Store.UpdateObjectSourceDataURI(orgID, objectType, objectID, sourceDataURI) diff --git a/core/storage/inMemoryStorage.go b/core/storage/inMemoryStorage.go index ac0647b..7271b23 100644 --- a/core/storage/inMemoryStorage.go +++ b/core/storage/inMemoryStorage.go @@ -90,10 +90,10 @@ func (store *InMemoryStorage) StoreObject(metaData common.MetaData, data []byte, defer store.unLock() id := getObjectCollectionID(metaData) - // If the object was receieved from a service (status NotReadyToSend/ReadyToSend), i.e. this node is the origin of the object, + // If the object was receieved from a service (status NotReadyToSend/ReadyToSend/Verifying/VerificationFailed), i.e. this node is the origin of the object, // set instance id. If the object was received from the other side, this node is the receiver of the object: // keep the instance id of the meta data. - if status == common.NotReadyToSend || status == common.ReadyToSend { + if status == common.NotReadyToSend || status == common.ReadyToSend || status == common.Verifying || status == common.VerificationFailed { newID := store.getInstanceID() metaData.InstanceID = newID if data != nil && !metaData.NoData && !metaData.MetaOnly { @@ -147,7 +147,7 @@ func (store *InMemoryStorage) StoreObjectData(orgID string, objectType string, o if object.status == common.NotReadyToSend { object.status = common.ReadyToSend } - if object.status == common.NotReadyToSend || object.status == common.ReadyToSend { + if object.status == common.NotReadyToSend || object.status == common.ReadyToSend || object.status == common.Verifying { newID := store.getInstanceID() object.meta.InstanceID = newID object.meta.DataID = newID @@ -320,21 +320,6 @@ func (store *InMemoryStorage) UpdateObjectStatus(orgID string, objectType string return &NotFound{"Object not found"} } -// UpdateObjectDataVerifiedStatus updates object's dataVerified field -func (store *InMemoryStorage) UpdateObjectDataVerifiedStatus(orgID string, objectType string, objectID string, verified bool) common.SyncServiceError { - store.lock() - defer store.unLock() - - id := createObjectCollectionID(orgID, objectType, objectID) - if object, ok := store.objects[id]; ok { - object.meta.DataVerified = verified - store.objects[id] = object - return nil - } - - return &NotFound{"Object not found"} -} - // UpdateObjectSourceDataURI updates object's source data URI func (store *InMemoryStorage) UpdateObjectSourceDataURI(orgID string, objectType string, objectID string, sourceDataURI string) common.SyncServiceError { store.lock() @@ -635,7 +620,7 @@ func (store *InMemoryStorage) GetObjectsToActivate() ([]common.MetaData, common. currentTime := time.Now().UTC().Format(time.RFC3339) result := make([]common.MetaData, 0) for _, obj := range store.objects { - if (obj.status == common.NotReadyToSend || obj.status == common.ReadyToSend) && + if (obj.status == common.NotReadyToSend || obj.status == common.ReadyToSend || obj.status == common.Verifying || obj.status == common.VerificationFailed) && obj.meta.Inactive && obj.meta.ActivationTime != "" && obj.meta.ActivationTime <= currentTime { result = append(result, obj.meta) } @@ -737,7 +722,7 @@ func (store *InMemoryStorage) DeleteObjectDestinations(orgID string, objectType func (store *InMemoryStorage) GetNumberOfStoredObjects() (uint32, common.SyncServiceError) { var count uint32 for _, object := range store.objects { - if object.status == common.ReadyToSend || object.status == common.NotReadyToSend { + if object.status == common.ReadyToSend || object.status == common.NotReadyToSend || object.status == common.Verifying || object.status == common.VerificationFailed { count++ } } diff --git a/core/storage/mongoStorage.go b/core/storage/mongoStorage.go index 13c6db4..800dde1 100644 --- a/core/storage/mongoStorage.go +++ b/core/storage/mongoStorage.go @@ -292,7 +292,9 @@ func (store *MongoStorage) GetObjectsToActivate() ([]common.MetaData, common.Syn currentTime := time.Now().UTC().Format(time.RFC3339) query := bson.M{"$or": []bson.M{ bson.M{"status": common.NotReadyToSend}, - bson.M{"status": common.ReadyToSend}}, + bson.M{"status": common.ReadyToSend}, + bson.M{"status": common.Verifying}, + bson.M{"status": common.VerificationFailed}}, "metadata.inactive": true, "$and": []bson.M{ bson.M{"metadata.activation-time": bson.M{"$ne": ""}}, @@ -328,7 +330,7 @@ func (store *MongoStorage) StoreObject(metaData common.MetaData, data []byte, st var dests []common.StoreDestinationStatus var deletedDests []common.StoreDestinationStatus - if status == common.NotReadyToSend || status == common.ReadyToSend { + if status == common.NotReadyToSend || status == common.ReadyToSend || status == common.Verifying || status == common.VerificationFailed { // The object was receieved from a service, i.e. this node is the origin of the object: // set its instance id and create destinations array newID := store.getInstanceID() @@ -1084,10 +1086,12 @@ func (store *MongoStorage) StoreObjectData(orgID string, objectType string, obje } } + // If it is called by dataVerifier, the status is verifying. The object status will not changed to "ready". + // This is because at this moment, the data is not yet verified. if result.Status == common.NotReadyToSend { store.UpdateObjectStatus(orgID, objectType, objectID, common.ReadyToSend) } - if result.Status == common.NotReadyToSend || result.Status == common.ReadyToSend { + if result.Status == common.NotReadyToSend || result.Status == common.ReadyToSend || result.Status == common.Verifying { newID := store.getInstanceID() if err := store.update(objects, bson.M{"_id": id}, bson.M{ @@ -1097,7 +1101,6 @@ func (store *MongoStorage) StoreObjectData(orgID string, objectType string, obje return false, &Error{fmt.Sprintf("Failed to set instance id. Error: %s.", err)} } } - _, size, err := store.copyDataToFile(id, dataReader, true, true) if err != nil { return false, err @@ -1256,7 +1259,7 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj return updatedLastChunk, nil } -// Handles the last data chunk +// Handles the last data chunk when no data verification needed func (store *MongoStorage) HandleObjectInfoForLastDataChunk(orgID string, objectType string, objectID string, isTempData bool, dataSize int64) (bool, common.SyncServiceError) { if isTempData { return false, nil @@ -1309,19 +1312,6 @@ func (store *MongoStorage) UpdateObjectStatus(orgID string, objectType string, o return nil } -// UpdateObjectDataVerifiedStatus updates object's dataVerified field -func (store *MongoStorage) UpdateObjectDataVerifiedStatus(orgID string, objectType string, objectID string, verified bool) common.SyncServiceError { - id := createObjectCollectionID(orgID, objectType, objectID) - if err := store.update(objects, bson.M{"_id": id}, - bson.M{ - "$set": bson.M{"metadata.data-verified": verified}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, - }); err != nil { - return &Error{fmt.Sprintf("Failed to update object's data-verified status. Error: %s.", err)} - } - return nil -} - // UpdateObjectSourceDataURI updates object's source data URI func (store *MongoStorage) UpdateObjectSourceDataURI(orgID string, objectType string, objectID string, sourceDataURI string) common.SyncServiceError { return nil @@ -1405,6 +1395,8 @@ func (store *MongoStorage) GetNumberOfStoredObjects() (uint32, common.SyncServic "$or": []bson.M{ bson.M{"status": common.ReadyToSend}, bson.M{"status": common.NotReadyToSend}, + bson.M{"status": common.Verifying}, + bson.M{"status": common.VerificationFailed}, }} return store.count(objects, query) } diff --git a/core/storage/mongoStorageHelpers.go b/core/storage/mongoStorageHelpers.go index 7f561a1..737ee53 100644 --- a/core/storage/mongoStorageHelpers.go +++ b/core/storage/mongoStorageHelpers.go @@ -37,7 +37,9 @@ func (store *MongoStorage) checkObjects() { bson.M{"metadata.expiration": bson.M{"$lte": currentTime}}, bson.M{"$or": []bson.M{ bson.M{"status": common.NotReadyToSend}, - bson.M{"status": common.ReadyToSend}}}}, + bson.M{"status": common.ReadyToSend}, + bson.M{"status": common.Verifying}, + bson.M{"status": common.VerificationFailed}}}}, } selector := bson.M{"metadata": bson.ElementDocument, "last-update": bson.ElementTimestamp} diff --git a/core/storage/storage.go b/core/storage/storage.go index c35def4..de30ec1 100644 --- a/core/storage/storage.go +++ b/core/storage/storage.go @@ -60,9 +60,6 @@ type Storage interface { // Update object's status UpdateObjectStatus(orgID string, objectType string, objectID string, status string) common.SyncServiceError - // UpdateObjectDataVerifiedStatus updates object's dataVerified field - UpdateObjectDataVerifiedStatus(orgID string, objectType string, objectID string, verified bool) common.SyncServiceError - // Update object's source data URI UpdateObjectSourceDataURI(orgID string, objectType string, objectID string, sourceDataURI string) common.SyncServiceError