Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase lock duration and paginate in getBsdsIdentifiers #3246

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 86 additions & 8 deletions back/src/bsds/indexation/__tests__/bulkIndexBsds.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,89 @@ describe("processDbIdentifiersByChunk", () => {
describe("getBsdIdentifiers", () => {
afterEach(resetDatabase);

it("should return all identifiers of a bsd type", async () => {
const user = await userFactory();

const form = await formFactory({ ownerId: user.id });
const ids = await getBsdIdentifiers("bsdd");
expect(ids).toEqual([form.id]);
});
it.each([1, 2, 3, 4, 5, 100])(
"should return all BSDD's identifiers when paginating by %p",
async paginateBy => {
const user = await userFactory();

const form1 = await formFactory({ ownerId: user.id });
const form2 = await formFactory({ ownerId: user.id });
const form3 = await formFactory({ ownerId: user.id });
const form4 = await formFactory({ ownerId: user.id });
const form5 = await formFactory({ ownerId: user.id });

const ids = await getBsdIdentifiers("bsdd", { paginateBy });
expect(ids).toEqual([form1.id, form2.id, form3.id, form4.id, form5.id]);
}
);

it.each([1, 2, 3, 4, 5, 100])(
"should return all BSDA's identifiers when paginating by %p",
async paginateBy => {
const bsda1 = await bsdaFactory({});
const bsda2 = await bsdaFactory({});
const bsda3 = await bsdaFactory({});
const bsda4 = await bsdaFactory({});
const bsda5 = await bsdaFactory({});

const ids = await getBsdIdentifiers("bsda", { paginateBy });
expect(ids).toEqual([bsda1.id, bsda2.id, bsda3.id, bsda4.id, bsda5.id]);
}
);

it.each([1, 2, 3, 4, 5, 100])(
"should return all BSDASRI's identifiers when paginating by %p",
async paginateBy => {
const bsdasri1 = await bsdasriFactory({});
const bsdasri2 = await bsdasriFactory({});
const bsdasri3 = await bsdasriFactory({});
const bsdasri4 = await bsdasriFactory({});
const bsdasri5 = await bsdasriFactory({});

const ids = await getBsdIdentifiers("bsdasri", { paginateBy });
expect(ids).toEqual([
bsdasri1.id,
bsdasri2.id,
bsdasri3.id,
bsdasri4.id,
bsdasri5.id
]);
}
);

it.each([1, 2, 3, 4, 5, 100])(
"should return all BSFF's identifiers when paginating by %p",
async paginateBy => {
const bsff1 = await createBsff();
const bsff2 = await createBsff({});
const bsff3 = await createBsff({});
const bsff4 = await createBsff({});
const bsff5 = await createBsff({});

const ids = await getBsdIdentifiers("bsff", { paginateBy });
expect(ids).toEqual([bsff1.id, bsff2.id, bsff3.id, bsff4.id, bsff5.id]);
}
);

it.each([1, 2, 3, 4, 5, 100])(
"should return all BSVHU's identifiers when paginating by %p",
async paginateBy => {
const bsvhu1 = await bsvhuFactory({});
const bsvhu2 = await bsvhuFactory({});
const bsvhu3 = await bsvhuFactory({});
const bsvhu4 = await bsvhuFactory({});
const bsvhu5 = await bsvhuFactory({});

const ids = await getBsdIdentifiers("bsvhu", { paginateBy });
expect(ids).toEqual([
bsvhu1.id,
bsvhu2.id,
bsvhu3.id,
bsvhu4.id,
bsvhu5.id
]);
}
);

it("should not return identifiers updated before since paramater", async () => {
const user = await userFactory();
Expand All @@ -54,7 +130,9 @@ describe("getBsdIdentifiers", () => {
ownerId: user.id,
opt: { updatedAt: new Date("2023-02-01") }
});
const ids = await getBsdIdentifiers("bsdd", new Date("2023-02-01"));
const ids = await getBsdIdentifiers("bsdd", {
since: new Date("2023-02-01")
});
expect(ids).toEqual([form2.id]);
});
});
Expand Down
61 changes: 50 additions & 11 deletions back/src/bsds/indexation/bulkIndexBsds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,24 +265,63 @@ export async function isIndexMappingsVersionChanged(
}
}

type GetBsdIdentifiersOpt = {
since?: Date;
paginateBy?: number;
};

/**
* Retrieves all BSD identifiers for a given BSD type
*/
export async function getBsdIdentifiers(
bsdName: string,
since?: Date
{ since, paginateBy }: GetBsdIdentifiersOpt = {}
): Promise<string[]> {
const prismaModelDelegate = prismaModels[bsdName];

const bsds = await prismaModelDelegate.findMany({
where: {
isDeleted: false,
...(since ? { updatedAt: { gte: since } } : {})
},
select: { id: true }
});
const defaultPaginateBy = 500000;
const take = paginateBy ?? defaultPaginateBy;

// Renvoie <take> bordereaux après le bordereau
// identifié par son curseur
async function nextPage(after: string | null) {
const bsds = await prismaModelDelegate.findMany({
take,
...(after
? {
// Cf https://www.prisma.io/docs/orm/prisma-client/queries/pagination#cursor-based-pagination
skip: 1,
cursor: {
rowNumber: after
}
}
: {}),
where: {
isDeleted: false,
...(since ? { updatedAt: { gte: since } } : {})
},
select: { id: true, rowNumber: true },
orderBy: { rowNumber: "asc" }
});

return bsds;
}

// Récupère tous les identifiants en paginant la liste des bordereaux
// de manière récursive grâce à une pagination par curseur qui utilise `rowNumber`
async function paginate(after: string | null = null, ids: string[] = []) {
const bsds = await nextPage(after);
const length = bsds.length;
if (length === 0) {
return ids;
} else {
const bsdIds = bsds.map(bsd => bsd.id) as string[];
const nextCursor = bsds[length - 1].rowNumber;
return paginate(nextCursor, [...ids, ...bsdIds]);
}
}

return bsds.map(bsd => bsd.id);
return paginate();
}

export async function processDbIdentifiersByChunk(
Expand All @@ -305,7 +344,7 @@ export async function indexAllBsdTypeSync({
since,
indexConfig
}: IndexAllFnSignature): Promise<void> {
const ids = await getBsdIdentifiers(bsdName, since);
const ids = await getBsdIdentifiers(bsdName, { since });

logger.info(`Starting synchronous indexation of ${ids.length} ${bsdName}`);

Expand All @@ -331,7 +370,7 @@ export async function indexAllBsdTypeConcurrentJobs({
}: IndexAllFnSignature) {
const jobs: Job<string>[] = [];
const data: { name: string; data: string; opts?: JobOptions }[] = [];
const ids = await getBsdIdentifiers(bsdName, since);
const ids = await getBsdIdentifiers(bsdName, { since });
logger.info(`Starting indexation of ${ids.length} ${bsdName}`);

// Prepare Job data payload to call indexQueue.addBulk
Expand Down
13 changes: 13 additions & 0 deletions back/src/queue/producers/elastic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,23 @@ export const bulkIndexMasterQueue = new Queue<string>(
// 24h pour prendre de la marge, les jobs de cette queue attendent que toutes les chunks
// soit process lors d'un reindex global
timeout: 24 * 3600 * 1000
},
settings: {
// https://github.com/OptimalBits/bull?tab=readme-ov-file#important-notes
// https://github.com/OptimalBits/bull/issues/1591
lockDuration: 2 * 60 * 1000, // 2 minutes - default 3000 (30s)
// prevent the job from being run twice if it is stalled
maxStalledCount: 0 // default is 1
}
}
);

bulkIndexMasterQueue.on("stalled", opt => {
logger.warn(
`The job ${opt.id} in queue bulkIndexMasterQueue has been stalled`
);
});

indexQueue.on("completed", async job => {
const id = job.data;

Expand Down
Loading