Skip to content

Commit

Permalink
Merge pull request #69 from LiilyZhang/issue63
Browse files Browse the repository at this point in the history
Issue 63 - Add data verification in Sync Service SPIs
  • Loading branch information
dabooz authored Jun 3, 2021
2 parents b0b1448 + 5951359 commit 0ff96cd
Show file tree
Hide file tree
Showing 12 changed files with 603 additions and 93 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ build/*
core/base/persist
core/communications/persist
core/storage/persist
core/dataVerifier/dataURITmp
core/dataVerifier/persist
.DS_STORE
vendor/*/
7 changes: 7 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,13 @@ func BlockUntilNoRunningGoRoutines() {
waitingOnBlockChannel = false
}

func IsValidHashAlgorithm(hashAlgorithm string) bool {
if hashAlgorithm == Sha1 || hashAlgorithm == Sha256 {
return true
}
return false
}

// IsValidName checks if the string only contains letters, digits, and !@#%^*-_.~
var IsValidName = regexp.MustCompile(`^[a-zA-Z0-9|!|@|#|$|^|*|\-|_|.|~|\pL|\pN]+$`).MatchString

Expand Down
119 changes: 33 additions & 86 deletions core/base/apiModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package base

import (
"bytes"
"crypto/rsa"
"crypto/x509"
"encoding/base64"
"fmt"
"io"
Expand All @@ -17,6 +15,7 @@ import (
"github.com/open-horizon/edge-sync-service/common"
"github.com/open-horizon/edge-sync-service/core/communications"
"github.com/open-horizon/edge-sync-service/core/dataURI"
"github.com/open-horizon/edge-sync-service/core/dataVerifier"
"github.com/open-horizon/edge-sync-service/core/storage"
"github.com/open-horizon/edge-utilities/logger"
"github.com/open-horizon/edge-utilities/logger/log"
Expand Down Expand Up @@ -258,13 +257,21 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com
} else if data != nil {
// data signature verification if metadata has both publicKey and signature
// data is nil for metaOnly object. Meta-only object will not apply data verification
if metaData.HashAlgorithm != "" && metaData.PublicKey != "" && metaData.Signature != "" {
dataReader := bytes.NewReader(data)
if common.IsValidHashAlgorithm(metaData.HashAlgorithm) && metaData.PublicKey != "" && metaData.Signature != "" {
// will no store data if object metadata not exist
if success, err := VerifyAndStoreData(dataReader, orgID, metaData.ObjectType, metaData.ObjectID, metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature, false); err != nil || !success {
dataReader := bytes.NewReader(data)
dataVf := dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature)
if success, err := dataVf.VerifyDataSignature(dataReader, orgID, objectType, objectID, ""); !success || err != nil {
if trace.IsLogging(logger.ERROR) {
trace.Error("Failed to verify data for object %s %s, remove temp data\n", objectType, objectID)
}
dataVf.RemoveTempData(orgID, objectType, objectID, "")

common.ObjectLocks.Unlock(lockIndex)
return err
}
dataVf.RemoveTempData(orgID, objectType, objectID, "")

}

metaData.ObjectSize = int64(len(data))
Expand Down Expand Up @@ -533,16 +540,35 @@ func PutObjectData(orgID string, objectType string, objectID string, dataReader
return false, &common.InvalidRequest{Message: "Can't update data, the NoData flag is set to true"}
}

if metaData.HashAlgorithm != "" && metaData.PublicKey != "" && metaData.Signature != "" {
var dataVf *dataVerifier.DataVerifier
if common.IsValidHashAlgorithm(metaData.HashAlgorithm) && metaData.PublicKey != "" && metaData.Signature != "" {
//start data verification
if trace.IsLogging(logger.DEBUG) {
trace.Debug("In PutObjectData. Start data verification %s %s\n", objectType, objectID)
}

if success, err := VerifyAndStoreData(dataReader, orgID, objectType, objectID, metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature, true); !success || err != nil {
dataVf = dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature)
if success, err := dataVf.VerifyDataSignature(dataReader, orgID, objectType, objectID, ""); !success || err != nil {
if trace.IsLogging(logger.ERROR) {
trace.Error("Failed to verify data for object %s %s, remove temp data\n", objectType, objectID)
}
dataVf.RemoveTempData(orgID, objectType, objectID, "")
common.ObjectLocks.Unlock(lockIndex)
return false, &common.InvalidRequest{Message: "Failed to verify and store data, Error: " + err.Error()}
}
if trace.IsLogging(logger.DEBUG) {
trace.Debug("In PutObjectData. data verified for object %s %s\n", objectType, objectID)
}

}

// If the data has been verified, then we retrieve the temp data, store in DB, and delete temp data
if dataVf != nil {
if err := dataVf.StoreVerifiedData(orgID, objectType, objectID, ""); err != nil {
dataVf.RemoveTempData(orgID, objectType, objectID, "")
common.ObjectLocks.Unlock(lockIndex)
return false, err
}
} else {
if exists, err := store.StoreObjectData(orgID, objectType, objectID, dataReader); err != nil || !exists {
common.ObjectLocks.Unlock(lockIndex)
Expand Down Expand Up @@ -1269,82 +1295,3 @@ func RetrieveACLsInOrg(aclType string, orgID string) ([]string, common.SyncServi
defer apiLock.Unlock()
return store.RetrieveACLsInOrg(aclType, orgID)
}

func VerifyAndStoreData(data io.Reader, orgID string, objectType string, objectID string, hashAlgo string, publicKey string, signature string, storeData bool) (bool, common.SyncServiceError) {
if hashAlgo == "" || publicKey == "" || signature == "" {
message := fmt.Sprintf("hash algorithm, public key or signature is empty")
return false, &common.InvalidRequest{Message: message}

}

var dataReader io.Reader
var err error
if publicKeyBytes, err := base64.StdEncoding.DecodeString(publicKey); err != nil {
return false, &common.InvalidRequest{Message: "PublicKey is not base64 encoded. Error: " + err.Error()}
} else if signatureBytes, err := base64.StdEncoding.DecodeString(signature); err != nil {
return false, &common.InvalidRequest{Message: "Signature is not base64 encoded. Error: " + err.Error()}
} else {
if trace.IsLogging(logger.DEBUG) {
trace.Debug("In VerifyAndStoreData, starting data hash\n")
}
dataHash, cryptoHash, err := common.GetHash(hashAlgo)
if err != nil {
return false, &common.InvalidRequest{Message: "Failed to get hash. Error: " + err.Error()}
}

dr := io.TeeReader(data, dataHash)

if trace.IsLogging(logger.DEBUG) {
trace.Debug("In VerifyAndStoreData, storing temp data for object %s %s\n", objectType, objectID)
}

if exists, err := store.StoreObjectTempData(orgID, objectType, objectID, dr); err != nil || !exists {
return false, err
}

dataHashSum := dataHash.Sum(nil)

if pubKey, err := x509.ParsePKIXPublicKey(publicKeyBytes); err != nil {
return false, &common.InvalidRequest{Message: "Failed to parse public key, Error: " + err.Error()}
} else {
pubKeyToUse := pubKey.(*rsa.PublicKey)
if err = rsa.VerifyPSS(pubKeyToUse, cryptoHash, dataHashSum, signatureBytes, nil); err != nil {
store.RemoveObjectTempData(orgID, objectType, objectID)
return false, &common.InvalidRequest{Message: "Failed to verify data with public key and data signature, Error: " + err.Error()}
}
}
}

if trace.IsLogging(logger.DEBUG) {
trace.Debug("In VerifyAndStoreData, data verification is done, retrieve temp data for object %s %s\n", objectType, objectID)
}

if storeData {
dataReader, err = store.RetrieveTempObjectData(orgID, objectType, objectID)
if err != nil {
return false, &common.InvalidRequest{Message: "Failed to read temp data fro, Error: " + err.Error()}
} else if dataReader == nil {
return false, &common.InvalidRequest{Message: "Read empty temp data, Error: " + err.Error()}
}

if trace.IsLogging(logger.DEBUG) {
trace.Debug("In VerifyAndStoreData, storing data for object %s %s\n", objectType, objectID)
}

if exists, err := store.StoreObjectData(orgID, objectType, objectID, dataReader); err != nil || !exists {
return false, err
}
store.CloseDataReader(dataReader)
}

if trace.IsLogging(logger.DEBUG) {
trace.Debug("In VerifyAndStoreData, remove temp data for object %s %s\n", objectType, objectID)
}

if err = store.RemoveObjectTempData(orgID, objectType, objectID); err != nil {
if trace.IsLogging(logger.ERROR) {
trace.Error("In VerifyAndStoreData. Failed to remove temp data for object\n")
}
}
return true, nil
}
4 changes: 4 additions & 0 deletions core/base/apiModule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/open-horizon/edge-sync-service/common"
"github.com/open-horizon/edge-sync-service/core/communications"
"github.com/open-horizon/edge-sync-service/core/dataVerifier"
"github.com/open-horizon/edge-sync-service/core/storage"
)

Expand Down Expand Up @@ -81,6 +82,8 @@ func TestObjectAPI(t *testing.T) {

func testObjectAPI(store storage.Storage, t *testing.T) {
communications.Store = store
dataVerifier.Store = store

common.InitObjectLocks()

dests := []string{"device:dev1", "device2:dev", "device2:dev1"}
Expand Down Expand Up @@ -654,6 +657,7 @@ func TestESSObjectDeletedAPI(t *testing.T) {

func testESSObjectDeletedAPI(store storage.Storage, t *testing.T) {
communications.Store = store
dataVerifier.Store = store
common.InitObjectLocks()

if err := store.Init(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions core/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/open-horizon/edge-sync-service/common"
"github.com/open-horizon/edge-sync-service/core/communications"
"github.com/open-horizon/edge-sync-service/core/dataVerifier"
"github.com/open-horizon/edge-sync-service/core/leader"
"github.com/open-horizon/edge-sync-service/core/security"
"github.com/open-horizon/edge-sync-service/core/storage"
Expand Down Expand Up @@ -112,6 +113,7 @@ func Start(swaggerFile string, registerHandlers bool) common.SyncServiceError {
}
communications.Store = store
security.Store = store
dataVerifier.Store = store

leader.StartLeaderDetermination(store)

Expand Down
56 changes: 54 additions & 2 deletions core/communications/httpCommunication.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/open-horizon/edge-sync-service/common"
"github.com/open-horizon/edge-sync-service/core/dataURI"
"github.com/open-horizon/edge-sync-service/core/dataVerifier"
"github.com/open-horizon/edge-sync-service/core/security"
"github.com/open-horizon/edge-utilities/logger"
"github.com/open-horizon/edge-utilities/logger/log"
Expand Down Expand Up @@ -628,7 +629,26 @@ func (communication *HTTP) GetData(metaData common.MetaData, offset int64) commo
lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID)
common.ObjectLocks.Lock(lockIndex)

if metaData.DestinationDataURI != "" {
var dataVf *dataVerifier.DataVerifier
if common.IsValidHashAlgorithm(metaData.HashAlgorithm) && metaData.PublicKey != "" && metaData.Signature != "" {
dataVf = dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature)
if dataVerified, err := dataVf.VerifyDataSignature(response.Body, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !dataVerified || err != nil {
if trace.IsLogging(logger.ERROR) {
trace.Error("Failed to verify data for object %s %s, remove temp data\n", metaData.ObjectType, metaData.ObjectID)
}
dataVf.RemoveTempData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI)
common.ObjectLocks.Unlock(lockIndex)
return err
}
}

if dataVf != nil {
if err := dataVf.StoreVerifiedData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); err != nil {
dataVf.RemoveTempData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI)
common.ObjectLocks.Unlock(lockIndex)
return err
}
} else if metaData.DestinationDataURI != "" {
if _, err := dataURI.StoreData(metaData.DestinationDataURI, response.Body, 0); err != nil {
common.ObjectLocks.Unlock(lockIndex)
return err
Expand All @@ -643,6 +663,7 @@ func (communication *HTTP) GetData(metaData common.MetaData, offset int64) commo
return &Error{"Failed to store object's data."}
}
}

if err := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.CompletelyReceived); err != nil {
common.ObjectLocks.Unlock(lockIndex)
return &Error{fmt.Sprintf("Error in GetData: %s\n", err)}
Expand Down Expand Up @@ -936,13 +957,44 @@ func (communication *HTTP) handlePutData(orgID string, objectType string, object
lockIndex := common.HashStrings(orgID, objectType, objectID)
common.ObjectLocks.Lock(lockIndex)

if found, err := Store.StoreObjectData(orgID, objectType, objectID, request.Body); err != nil {
// retrieve metadata and check if this data need to be verified
metaData, err := Store.RetrieveObject(orgID, objectType, objectID)
if metaData == nil {
common.ObjectLocks.Unlock(lockIndex)
return &common.InvalidRequest{Message: "Failed to find object to set data"}
}
if err != nil {
common.ObjectLocks.Unlock(lockIndex)
return err
}

var dataVf *dataVerifier.DataVerifier
if common.IsValidHashAlgorithm(metaData.HashAlgorithm) && metaData.PublicKey != "" && metaData.Signature != "" {
dataVf = dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature)
if dataVerified, err := dataVf.VerifyDataSignature(request.Body, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !dataVerified || err != nil {
if trace.IsLogging(logger.ERROR) {
trace.Error("Failed to verify data for object %s %s, remove temp data\n", metaData.ObjectType, metaData.ObjectID)
}
dataVf.RemoveTempData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI)
common.ObjectLocks.Unlock(lockIndex)
return err
}
}

if dataVf != nil {
if err := dataVf.StoreVerifiedData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); err != nil {
dataVf.RemoveTempData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI)
common.ObjectLocks.Unlock(lockIndex)
return err
}
} else if found, err := Store.StoreObjectData(orgID, objectType, objectID, request.Body); err != nil { // No data verification applied, then store data directly
common.ObjectLocks.Unlock(lockIndex)
return err
} else if !found {
common.ObjectLocks.Unlock(lockIndex)
return &common.InvalidRequest{Message: "Failed to find object to set data"}
}

if err := Store.UpdateObjectStatus(orgID, objectType, objectID, common.CompletelyReceived); err != nil {
common.ObjectLocks.Unlock(lockIndex)
return err
Expand Down
50 changes: 50 additions & 0 deletions core/dataURI/dataURI.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,56 @@ func StoreData(uri string, dataReader io.Reader, dataLength uint32) (int64, comm
return written, nil
}

// StoreTempData writes the data to the tmp file stored at the given URI
func StoreTempData(uri string, dataReader io.Reader, dataLength uint32) (int64, common.SyncServiceError) {
if trace.IsLogging(logger.TRACE) {
trace.Trace("Storing data at %s", uri)
}
dataURI, err := url.Parse(uri)
if err != nil || !strings.EqualFold(dataURI.Scheme, "file") {
return 0, &Error{"Invalid data URI"}
}

filePath := dataURI.Path + ".tmp"
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return 0, common.CreateError(err, fmt.Sprintf("Failed to open file %s to write data. Error: ", dataURI.Path))
}
defer file.Close()

if _, err = file.Seek(0, io.SeekStart); err != nil {
return 0, &common.IOError{Message: "Failed to seek to the start of a file. Error: " + err.Error()}
}

written, err := io.Copy(file, dataReader)
if err != nil && err != io.EOF {
return 0, &common.IOError{Message: "Failed to write to file. Error: " + err.Error()}
}
if written != int64(dataLength) && dataLength != 0 {
return 0, &common.IOError{Message: "Failed to write all the data to file."}
}
return written, nil
}

// StoreDataFromTempData rename {dataURI.Path}.tmp to {dataURI.Path}
func StoreDataFromTempData(uri string) common.SyncServiceError {
if trace.IsLogging(logger.TRACE) {
trace.Trace("Storing data from temp data at %s", uri)
}
dataURI, err := url.Parse(uri)
if err != nil || !strings.EqualFold(dataURI.Scheme, "file") {
return &Error{"Invalid data URI"}
}

tmpFilePath := dataURI.Path + ".tmp"

if err := os.Rename(tmpFilePath, dataURI.Path); err != nil {
return &common.IOError{Message: "Failed to rename data file. Error: " + err.Error()}
}

return nil
}

// GetData retrieves the data stored at the given URI.
// After reading, the reader has to be closed.
func GetData(uri string) (io.Reader, common.SyncServiceError) {
Expand Down
Loading

0 comments on commit 0ff96cd

Please sign in to comment.