Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement merge behavior to the ngsild bridge and update Scorpio Version #652

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions KafkaBridge/lib/ngsild.js
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,26 @@ function fiwareApi (conf) {
return rest.postBody({ options, body: data });
};

/**
* Run batch merge operation on the entities
* @param {array[Object]} entities - Array of JSON patches to merge
* @param {array[Object]} headers - additional headers
*/
this.batchMerge = function (entities, { headers }) {
headers = headers || {};
headers['Content-Type'] = 'application/ld+json';

const options = {
hostname: config.ngsildServer.hostname,
protocol: config.ngsildServer.protocol,
port: config.ngsildServer.port,
path: '/ngsi-ld/v1/entityOperations/merge',
headers: headers,
method: 'POST'
};
return rest.postBody({ options, body: entities });
};

/**
* Helpers
*/
Expand Down
22 changes: 9 additions & 13 deletions KafkaBridge/lib/ngsildUpdates.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,16 @@ module.exports = function NgsildUpdates (conf) {
try {
// update the entity - do not create it
if (op === 'update') {
// NOTE: The batch update API of Scorpio does not yet support noOverwrite options. For the time being
// the batch processing will be done sequentially - until this is fixed in Scorpio
for (const entity of entities) { // olet i = 0; i < entities.length; i ++) {
// basic health check of entity
if (entity.id === undefined || entity.id == null) {
logger.error('Unhealthy entity - ignoring it:' + JSON.stringify(entity));
} else {
logger.debug('Updating: ' + JSON.stringify(entities));
result = await ngsild.updateProperties({ id: entity.id, body: entity, isOverwrite: overwriteOrReplace }, { headers });
if (result.statusCode !== 204 && result.statusCode !== 207) {
logger.error('Entity cannot update entity:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it
}
// Only batch merge is run
if (entities === undefined || entities == null) {
logger.error('Unhealthy entities - ignoring it:' + JSON.stringify(entities));
} else {
logger.debug('Updating: ' + JSON.stringify(entities));
result = await ngsild.batchMerge(entities, { headers });
if (result.statusCode !== 204 && result.statusCode !== 207) {
logger.error('Entity cannot run merge:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it
}
};
}
} else if (op === 'upsert') {
// in this case, entity will be created if not existing
logger.debug('Upserting: ' + JSON.stringify(entities));
Expand Down
41 changes: 41 additions & 0 deletions KafkaBridge/test/testLibNgsild.js
Original file line number Diff line number Diff line change
Expand Up @@ -971,3 +971,44 @@ describe('Test updateEntities', function () {
revert();
});
});
describe('Test batchMerge', function () {
it('Should use correct options and headers', async function () {
const Logger = function () {
return logger;
};
const Rest = function () {
return rest;
};
const headers = { Authorization: 'Bearer token' };
const expectedOptions = {
hostname: 'hostname',
protocol: 'http:',
port: 1234,
method: 'POST',
path: '/ngsi-ld/v1/entityOperations/merge',
headers: {
'Content-Type': 'application/ld+json',
Authorization: 'Bearer token'
}
};
const rest = {
postBody: function (obj) {
assert.deepEqual(obj.options, expectedOptions);
assert.deepEqual(obj.body, entities);
return Promise.resolve('merged');
}
};

const entities = [
{ id: 'id1', type: 'type1', attr1: 'value1' },
{ id: 'id2', type: 'type2', attr2: 'value2' }
];

const revert = ToTest.__set__('Logger', Logger);
ToTest.__set__('Rest', Rest);
const ngsild = new ToTest(config);
const result = await ngsild.batchMerge(entities, { headers });
result.should.equal('merged');
revert();
});
});
86 changes: 46 additions & 40 deletions KafkaBridge/test/testLibNgsildUpdates.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const logger = {
const addSyncOnAttribute = function () {};

describe('Test libNgsildUpdates', function () {
it('Should post body with correct path and token for nonOverwrite update', async function () {
let updatePropertiesCalled = false;
it('Should post entities with correct path and token for nonOverwrite update using batchMerge', async function () {
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -66,19 +66,20 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type' });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
assert.deepEqual(entities, body.entities);
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
statusCode: 204
});
});
},
// Stub updateProperties if needed
updateProperties: function () {},
replaceEntities: function () {

}
};
};
Expand Down Expand Up @@ -108,11 +109,11 @@ describe('Test libNgsildUpdates', function () {
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
const ngsildUpdates = new ToTest(config);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
it('Should post body and filter out datasetId === "@none"', async function () {
let updatePropertiesCalled = false;
it('Should post entities and filter out datasetId === "@none"', async function () {
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -151,19 +152,24 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type', attribute: { value: 'value' } });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
entities.forEach(entity => {
// Check top-level properties
assert.equal(entity.id, 'id');
assert.equal(entity.type, 'type');

// Check attribute properties
assert.isUndefined(entity.attribute.datasetId, 'datasetId should be filtered out');
assert.property(entity.attribute, 'value');
assert.equal(entity.attribute.value, 'value');
});
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
statusCode: 204
});
});
},
replaceEntities: function () {
}
};
};
Expand Down Expand Up @@ -192,11 +198,11 @@ describe('Test libNgsildUpdates', function () {
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
const ngsildUpdates = new ToTest(config);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
it('Should post body and filter out datasetId === "@none" from attribute array', async function () {
let updatePropertiesCalled = false;
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -241,11 +247,12 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
entities.forEach(entity => {
assert.deepEqual(entity.id, 'id');
assert.deepEqual(entity, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] });
});
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
Expand Down Expand Up @@ -282,11 +289,11 @@ describe('Test libNgsildUpdates', function () {
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
const ngsildUpdates = new ToTest(config);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
it('Should post body and not filter out datasetId !== "@none"', async function () {
let updatePropertiesCalled = false;
it('Should post entities and not filter out datasetId !== "@none"', async function () {
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -325,11 +332,9 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type', attribute: { datasetId: 'https://example.com/source1', value: 'value' } });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
assert.deepEqual(entities, body.entities);
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
Expand Down Expand Up @@ -366,7 +371,7 @@ describe('Test libNgsildUpdates', function () {
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
const ngsildUpdates = new ToTest(config);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
it('Should post body with correct path and token for nonOverwrite upsert', async function () {
Expand Down Expand Up @@ -446,7 +451,7 @@ describe('Test libNgsildUpdates', function () {
revert();
});
it('Should post body with string entity', async function () {
let updatePropertiesCalled = false;
let batchMergeCalled = false;
const config = {
ngsildUpdates: {
clientSecretVariable: 'CLIENT_SECRET',
Expand Down Expand Up @@ -481,11 +486,12 @@ describe('Test libNgsildUpdates', function () {
};
const Ngsild = function () {
return {
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
updatePropertiesCalled = true;
id.should.equal('id');
assert.deepEqual(body, { id: 'id', type: 'type' });
isOverwrite.should.equal(false);
batchMerge: function (entities, { headers }) {
batchMergeCalled = true;
entities.forEach(entity => {
assert.deepEqual(entity.id, 'id');
assert.deepEqual(entity, { id: 'id', type: 'type' });
});
assert.deepEqual(headers, expHeaders);
return new Promise(function (resolve) {
resolve({
Expand Down Expand Up @@ -524,7 +530,7 @@ describe('Test libNgsildUpdates', function () {
const ngsildUpdates = new ToTest(config);
body.entities = JSON.stringify(body.entities);
await ngsildUpdates.ngsildUpdates(body);
updatePropertiesCalled.should.equal(true);
batchMergeCalled.should.equal(true);
revert();
});
});
Expand Down
Loading