Skip to content

Commit

Permalink
Add opt to remove TypeInstance's revision, fix update mutation
Browse files Browse the repository at this point in the history
  • Loading branch information
mszostok committed Apr 6, 2022
1 parent 37b5d87 commit 0f9d7c7
Show file tree
Hide file tree
Showing 21 changed files with 1,100 additions and 392 deletions.
11 changes: 11 additions & 0 deletions hub-js/proto/storage_backend.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ message OnDeleteRequest {

message OnDeleteResponse {}

message OnDeleteRevisionRequest {
string type_instance_id = 1;
optional bytes context = 2;
optional string owner_id = 3;
uint32 resource_version = 4;
}

message OnDeleteRevisionResponse {}

message GetValueRequest {
string type_instance_id = 1;
uint32 resource_version = 2;
Expand Down Expand Up @@ -82,9 +91,11 @@ service StorageBackend {
rpc OnCreate(OnCreateRequest) returns (OnCreateResponse);
rpc OnUpdate(OnUpdateRequest) returns (OnUpdateResponse);
rpc OnDelete(OnDeleteRequest) returns (OnDeleteResponse);
rpc OnDeleteRevision(OnDeleteRevisionRequest) returns (OnDeleteRevisionResponse);

// lock
rpc GetLockedBy(GetLockedByRequest) returns (GetLockedByResponse);
rpc OnLock(OnLockRequest) returns (OnLockResponse);
rpc OnUnlock(OnUnlockRequest) returns (OnUnlockResponse);

}
165 changes: 165 additions & 0 deletions hub-js/src/generated/grpc/storage_backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ export interface OnDeleteRequest {

export interface OnDeleteResponse {}

export interface OnDeleteRevisionRequest {
typeInstanceId: string;
context?: Uint8Array | undefined;
ownerId?: string | undefined;
resourceVersion: number;
}

export interface OnDeleteRevisionResponse {}

export interface GetValueRequest {
typeInstanceId: string;
resourceVersion: number;
Expand Down Expand Up @@ -579,6 +588,154 @@ export const OnDeleteResponse = {
},
};

function createBaseOnDeleteRevisionRequest(): OnDeleteRevisionRequest {
return {
typeInstanceId: "",
context: undefined,
ownerId: undefined,
resourceVersion: 0,
};
}

export const OnDeleteRevisionRequest = {
encode(
message: OnDeleteRevisionRequest,
writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer {
if (message.typeInstanceId !== "") {
writer.uint32(10).string(message.typeInstanceId);
}
if (message.context !== undefined) {
writer.uint32(18).bytes(message.context);
}
if (message.ownerId !== undefined) {
writer.uint32(26).string(message.ownerId);
}
if (message.resourceVersion !== 0) {
writer.uint32(32).uint32(message.resourceVersion);
}
return writer;
},

decode(
input: _m0.Reader | Uint8Array,
length?: number
): OnDeleteRevisionRequest {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseOnDeleteRevisionRequest();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.typeInstanceId = reader.string();
break;
case 2:
message.context = reader.bytes();
break;
case 3:
message.ownerId = reader.string();
break;
case 4:
message.resourceVersion = reader.uint32();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},

fromJSON(object: any): OnDeleteRevisionRequest {
return {
typeInstanceId: isSet(object.typeInstanceId)
? String(object.typeInstanceId)
: "",
context: isSet(object.context)
? bytesFromBase64(object.context)
: undefined,
ownerId: isSet(object.ownerId) ? String(object.ownerId) : undefined,
resourceVersion: isSet(object.resourceVersion)
? Number(object.resourceVersion)
: 0,
};
},

toJSON(message: OnDeleteRevisionRequest): unknown {
const obj: any = {};
message.typeInstanceId !== undefined &&
(obj.typeInstanceId = message.typeInstanceId);
message.context !== undefined &&
(obj.context =
message.context !== undefined
? base64FromBytes(message.context)
: undefined);
message.ownerId !== undefined && (obj.ownerId = message.ownerId);
message.resourceVersion !== undefined &&
(obj.resourceVersion = Math.round(message.resourceVersion));
return obj;
},

fromPartial(
object: DeepPartial<OnDeleteRevisionRequest>
): OnDeleteRevisionRequest {
const message = createBaseOnDeleteRevisionRequest();
message.typeInstanceId = object.typeInstanceId ?? "";
message.context = object.context ?? undefined;
message.ownerId = object.ownerId ?? undefined;
message.resourceVersion = object.resourceVersion ?? 0;
return message;
},
};

function createBaseOnDeleteRevisionResponse(): OnDeleteRevisionResponse {
return {};
}

export const OnDeleteRevisionResponse = {
encode(
_: OnDeleteRevisionResponse,
writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer {
return writer;
},

decode(
input: _m0.Reader | Uint8Array,
length?: number
): OnDeleteRevisionResponse {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseOnDeleteRevisionResponse();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},

fromJSON(_: any): OnDeleteRevisionResponse {
return {};
},

toJSON(_: OnDeleteRevisionResponse): unknown {
const obj: any = {};
return obj;
},

fromPartial(
_: DeepPartial<OnDeleteRevisionResponse>
): OnDeleteRevisionResponse {
const message = createBaseOnDeleteRevisionResponse();
return message;
},
};

function createBaseGetValueRequest(): GetValueRequest {
return { typeInstanceId: "", resourceVersion: 0, context: new Uint8Array() };
}
Expand Down Expand Up @@ -1105,6 +1262,14 @@ export const StorageBackendDefinition = {
responseStream: false,
options: {},
},
onDeleteRevision: {
name: "OnDeleteRevision",
requestType: OnDeleteRevisionRequest,
requestStream: false,
responseType: OnDeleteRevisionResponse,
responseStream: false,
options: {},
},
/** lock */
getLockedBy: {
name: "GetLockedBy",
Expand Down
2 changes: 0 additions & 2 deletions hub-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { config } from "./config";
import { logger } from "./logger";
import { ensureCoreStorageTypeInstance } from "./local/resolver/mutation/register-built-in-storage";
import DelegatedStorageService from "./local/storage/service";
import UpdateArgsContainer from "./local/storage/update-args-container";

async function main() {
logger.info("Using Neo4j database", { endpoint: config.neo4j.endpoint });
Expand Down Expand Up @@ -73,7 +72,6 @@ async function setupHttpServer(
return {
driver,
delegatedStorage,
updateArgs: new UpdateArgsContainer(),
};
},
});
Expand Down
97 changes: 8 additions & 89 deletions hub-js/src/local/resolver/field/spec-value-field.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import { logger } from "../../../logger";
import { GetInput } from "../../storage/service";
import { Context } from "../mutation/context";
import { Operation } from "../../storage/update-args-container";
import _ from "lodash";
import { Mutex } from "async-mutex";

const mutex = new Mutex();

// Represents contract defined on `TypeInstanceResourceVersionSpec.Value` field cypher query.
interface InputObject {
Expand All @@ -24,89 +19,13 @@ export async function typeInstanceResourceVersionSpecValueField(
_: undefined,
context: Context
) {
// This is a field resolver, it can be called multiple times within the same `query/mutation`.
// We also perform, external calls to change the state if needed, due to that fact, we use
// mutex to ensure that we won't call backend multiple times as backends may be not thread safe.
return await mutex.runExclusive(async () => {
logger.debug("Executing custom field resolver for 'value' field", obj);
if (obj.abstract) {
logger.debug("Return data stored in built-in storage");
return obj.builtinValue;
}

switch (context.updateArgs.GetOperation()) {
case Operation.UpdateTypeInstancesMutation:
return await resolveMutationReturnValue(context, obj.fetchInput);
default: {
logger.debug("Return data stored in external storage");
const resp = await context.delegatedStorage.Get(obj.fetchInput);
return resp[obj.fetchInput.typeInstance.id];
}
}
});
}

async function resolveMutationReturnValue(
context: Context,
fetchInput: GetInput
) {
const tiId = fetchInput.typeInstance.id;
const revToResolve = fetchInput.typeInstance.resourceVersion;

let newValue = context.updateArgs.GetValue(tiId);
const lastKnownRev = context.updateArgs.GetLastKnownRev(tiId);

// During the mutation someone asked to return also:
// - `firstResourceVersion`
// - and/or `previousResourceVersion`
// - and/or `resourceVersion` with already known revision
// - and/or `resourceVersions` which holds also previous already stored revisions
if (revToResolve <= lastKnownRev) {
logger.debug(
"Fetch data from external storage for already known revision",
fetchInput
);
const resp = await context.delegatedStorage.Get(fetchInput);
return resp[tiId];
}

// If the revision is higher that the last known revision version, it means that we need to store that into delegated
// storage.

// 1. Based on our contract, if user didn't provide value, we need to fetch the old one and put it
// to the new revision.
if (!newValue) {
const previousValue: GetInput = _.cloneDeep(fetchInput);
previousValue.typeInstance.resourceVersion -= 1;

logger.debug(
"Fetching previous value from external storage",
previousValue
);
const resp = await context.delegatedStorage.Get(previousValue);
newValue = resp[tiId];
logger.debug("Executing custom field resolver for 'value' field", obj);
if (obj.abstract) {
logger.debug("Return data stored in built-in storage");
return obj.builtinValue;
} else {
logger.debug("Return data stored in external storage");
const resp = await context.delegatedStorage.Get(obj.fetchInput);
return resp[obj.fetchInput.typeInstance.id];
}

// 2. Update TypeInstance's value
const update = {
backend: fetchInput.backend,
typeInstance: {
id: fetchInput.typeInstance.id,
newResourceVersion: fetchInput.typeInstance.resourceVersion,
newValue,
ownerID: context.updateArgs.GetOwnerID(fetchInput.typeInstance.id),
},
};

logger.debug("Storing new value into external storage", update);
await context.delegatedStorage.Update(update);

// 3. Update last known revision, so if `value` resolver is called next time we won't update it once again
// and run into `ALREADY_EXISTS` error.
context.updateArgs.SetLastKnownRev(
update.typeInstance.id,
update.typeInstance.newResourceVersion
);

return newValue;
}
8 changes: 1 addition & 7 deletions hub-js/src/local/resolver/mutation/context.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Driver } from "neo4j-driver";
import DelegatedStorageService from "../../storage/service";
import UpdateArgsContainer from "../../storage/update-args-container";

export interface ContextWithDriver {
driver: Driver;
Expand All @@ -10,11 +9,6 @@ export interface ContextWithDelegatedStorage {
delegatedStorage: DelegatedStorageService;
}

export interface ContextWithUpdateArgs {
updateArgs: UpdateArgsContainer;
}

export interface Context
extends ContextWithDriver,
ContextWithDelegatedStorage,
ContextWithUpdateArgs {}
ContextWithDelegatedStorage {}
Loading

0 comments on commit 0f9d7c7

Please sign in to comment.