diff --git a/apps/web/inngest/functions.ts b/apps/web/inngest/functions.ts index ef4428bf..e51d9fcc 100644 --- a/apps/web/inngest/functions.ts +++ b/apps/web/inngest/functions.ts @@ -65,7 +65,7 @@ export const syncPipeline = inngest.createFunction( ...contextFactory.fromViewer({role: 'system'}), remoteResourceId: null, }) - .syncPipeline([pipelineId, {}]) + .syncPipeline({id: pipelineId}) console.log('did sync pipeline', pipelineId) return pipelineId }, @@ -90,7 +90,7 @@ export const syncResource = inngest.createFunction( ...contextFactory.fromViewer({role: 'system'}), remoteResourceId: null, }) - .syncResource([resourceId, {}]) + .syncResource({id: resourceId}) console.log('did sync pipeline', resourceId) return resourceId diff --git a/apps/web/vcommands/vcommand-definitions.ts b/apps/web/vcommands/vcommand-definitions.ts index 897f38c7..eb8915d0 100644 --- a/apps/web/vcommands/vcommand-definitions.ts +++ b/apps/web/vcommands/vcommand-definitions.ts @@ -34,7 +34,7 @@ export const pipelineCommands = { icon: 'RefreshCw', execute: ({params: {pipeline}, ctx}) => { void ctx.withToast(() => - ctx.trpcCtx.client.syncPipeline.mutate([pipeline.id, {}]), + ctx.trpcCtx.client.syncPipeline.mutate({id: pipeline.id}), ) }, }), @@ -85,7 +85,7 @@ export const resourceCommands = { icon: 'RefreshCw', execute: ({params: {resource}, ctx}) => { void ctx.withToast(() => - ctx.trpcCtx.client.syncResource.mutate([resource.id, {}]), + ctx.trpcCtx.client.syncResource.mutate({id: resource.id}), ) }, }), diff --git a/kits/sdk/package.json b/kits/sdk/package.json index 02f2cf37..bee57f68 100644 --- a/kits/sdk/package.json +++ b/kits/sdk/package.json @@ -8,9 +8,9 @@ "scripts": { "build": "tsc -p ./tsconfig.json", "clean": "rm -rf ./dist", - "generate": "pnpm generate:schema && pnpm generate:types", - "generate:schema": "NEXT_PUBLIC_SERVER_URL=https://app.venice.is npx tsx ../../apps/web/lib-server/appRouter.ts > ./venice.oas.json", - "generate:types": "openapi-typescript ./venice.oas.json --output ./venice.oas.d.ts", + "gen": "pnpm gen:schema && pnpm gen:types", + "gen:schema": "NEXT_PUBLIC_SERVER_URL=https://app.venice.is npx tsx ../../apps/web/lib-server/appRouter.ts > ./venice.oas.json", + "gen:types": "openapi-typescript ./venice.oas.json --output ./venice.oas.d.ts", "pub": "pnpm publish --no-git-checks --access public" }, "dependencies": { diff --git a/kits/sdk/venice.oas.d.ts b/kits/sdk/venice.oas.d.ts index b7319dd1..4bcaabc6 100644 --- a/kits/sdk/venice.oas.d.ts +++ b/kits/sdk/venice.oas.d.ts @@ -95,6 +95,7 @@ export interface paths { } get?: never put?: never + /** @description Return records that would have otherwise been emitted during a sync and return it instead */ post: operations['sourceSync'] delete?: never options?: never @@ -134,6 +135,22 @@ export interface paths { patch: operations['updateResource'] trace?: never } + '/core/resource/{id}/_sync': { + parameters: { + query?: never + header?: never + path?: never + cookie?: never + } + get?: never + put?: never + post: operations['syncResource'] + delete?: never + options?: never + head?: never + patch?: never + trace?: never + } '/core/connector_config': { parameters: { query?: never @@ -279,6 +296,22 @@ export interface paths { patch: operations['updatePipeline'] trace?: never } + '/core/pipeline/{id}/_sync': { + parameters: { + query?: never + header?: never + path?: never + cookie?: never + } + get?: never + put?: never + post: operations['syncPipeline'] + delete?: never + options?: never + head?: never + patch?: never + trace?: never + } '/verticals/accounting/account': { parameters: { query?: never @@ -1307,6 +1340,55 @@ export interface operations { } } } + syncResource: { + parameters: { + query?: never + header?: never + path: { + id: string + } + cookie?: never + } + requestBody: { + content: { + 'application/json': { + metaOnly?: boolean | null + fullResync?: boolean | null + todo_upstreamRefresh?: boolean | null + todo_removeUnsyncedData?: boolean | null + } + } + } + responses: { + /** @description Successful response */ + 200: { + headers: { + [name: string]: unknown + } + content: { + 'application/json': unknown + } + } + /** @description Invalid input data */ + 400: { + headers: { + [name: string]: unknown + } + content: { + 'application/json': components['schemas']['error.BAD_REQUEST'] + } + } + /** @description Internal server error */ + 500: { + headers: { + [name: string]: unknown + } + content: { + 'application/json': components['schemas']['error.INTERNAL_SERVER_ERROR'] + } + } + } + } adminListConnectorConfigs: { parameters: { query?: never @@ -1987,6 +2069,55 @@ export interface operations { } } } + syncPipeline: { + parameters: { + query?: never + header?: never + path: { + id: string + } + cookie?: never + } + requestBody: { + content: { + 'application/json': { + metaOnly?: boolean | null + fullResync?: boolean | null + todo_upstreamRefresh?: boolean | null + todo_removeUnsyncedData?: boolean | null + } + } + } + responses: { + /** @description Successful response */ + 200: { + headers: { + [name: string]: unknown + } + content: { + 'application/json': unknown + } + } + /** @description Invalid input data */ + 400: { + headers: { + [name: string]: unknown + } + content: { + 'application/json': components['schemas']['error.BAD_REQUEST'] + } + } + /** @description Internal server error */ + 500: { + headers: { + [name: string]: unknown + } + content: { + 'application/json': components['schemas']['error.INTERNAL_SERVER_ERROR'] + } + } + } + } 'verticals-accounting-account_list': { parameters: { query?: { diff --git a/kits/sdk/venice.oas.json b/kits/sdk/venice.oas.json index acffd296..0169a8e9 100644 --- a/kits/sdk/venice.oas.json +++ b/kits/sdk/venice.oas.json @@ -293,7 +293,8 @@ "/core/resource/{id}/source_sync": { "post": { "operationId": "sourceSync", - "tags": ["Core"], + "description": "Return records that would have otherwise been emitted during a sync and return it instead", + "tags": ["Internal"], "parameters": [ { "in": "path", @@ -740,6 +741,77 @@ } } }, + "/core/resource/{id}/_sync": { + "post": { + "operationId": "syncResource", + "tags": ["Core"], + "parameters": [ + { + "in": "path", + "name": "id", + "schema": { + "type": "string", + "description": "Must start with 'reso_'" + }, + "required": true + } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "metaOnly": { + "type": ["boolean", "null"] + }, + "fullResync": { + "type": ["boolean", "null"] + }, + "todo_upstreamRefresh": { + "type": ["boolean", "null"] + }, + "todo_removeUnsyncedData": { + "type": ["boolean", "null"] + } + } + } + } + } + }, + "responses": { + "200": { + "description": "Successful response", + "content": { + "application/json": { + "schema": {} + } + } + }, + "400": { + "description": "Invalid input data", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/error.BAD_REQUEST" + } + } + } + }, + "500": { + "description": "Internal server error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/error.INTERNAL_SERVER_ERROR" + } + } + } + } + } + } + }, "/core/connector_config": { "get": { "operationId": "adminListConnectorConfigs", @@ -1633,6 +1705,77 @@ } } }, + "/core/pipeline/{id}/_sync": { + "post": { + "operationId": "syncPipeline", + "tags": ["Core"], + "parameters": [ + { + "in": "path", + "name": "id", + "schema": { + "type": "string", + "description": "Must start with 'pipe_'" + }, + "required": true + } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "metaOnly": { + "type": ["boolean", "null"] + }, + "fullResync": { + "type": ["boolean", "null"] + }, + "todo_upstreamRefresh": { + "type": ["boolean", "null"] + }, + "todo_removeUnsyncedData": { + "type": ["boolean", "null"] + } + } + } + } + } + }, + "responses": { + "200": { + "description": "Successful response", + "content": { + "application/json": { + "schema": {} + } + } + }, + "400": { + "description": "Invalid input data", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/error.BAD_REQUEST" + } + } + } + }, + "500": { + "description": "Internal server error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/error.INTERNAL_SERVER_ERROR" + } + } + } + } + } + } + }, "/verticals/accounting/account": { "get": { "operationId": "verticals-accounting-account_list", diff --git a/packages/engine-backend/router/pipelineRouter.ts b/packages/engine-backend/router/pipelineRouter.ts index 6d1273c0..52ede0d6 100644 --- a/packages/engine-backend/router/pipelineRouter.ts +++ b/packages/engine-backend/router/pipelineRouter.ts @@ -1,6 +1,7 @@ import type {ZRaw} from '@usevenice/cdk' import {extractId, zEndUserId, zId, zRaw, zStandard} from '@usevenice/cdk' import {R, z} from '@usevenice/util' +import {inngest} from '../events' import {zSyncOptions} from '../types' import {protectedProcedure, trpc} from './_base' import {zListParams} from './_schemas' @@ -125,11 +126,19 @@ export const pipelineRouter = trpc.router({ }) }), syncPipeline: protectedProcedure - .input(z.tuple([zId('pipe'), zSyncOptions.optional()])) - .mutation(async function syncPipeline({input: [pipeId, opts], ctx}) { + .meta({openapi: {method: 'POST', path: '/core/pipeline/{id}/_sync', tags}}) + .input(z.object({id: zId('pipe')}).merge(zSyncOptions)) + .output(z.void()) + .mutation(async function syncPipeline({input: {id: pipeId, ...opts}, ctx}) { if (ctx.viewer.role === 'end_user') { await ctx.services.getPipelineOrFail(pipeId) // Authorization } + if (opts?.async) { + await inngest.send('sync/pipeline-requested', { + data: {pipelineId: pipeId}, + }) + return + } const pipeline = await ctx.asOrgIfNeeded.getPipelineExpandedOrFail(pipeId) console.log('[syncPipeline]', pipeline) return ctx.services._syncPipeline(pipeline, opts) diff --git a/packages/engine-backend/router/resourceRouter.ts b/packages/engine-backend/router/resourceRouter.ts index 3adde0a7..e43a0017 100644 --- a/packages/engine-backend/router/resourceRouter.ts +++ b/packages/engine-backend/router/resourceRouter.ts @@ -11,6 +11,7 @@ import { zRaw, } from '@usevenice/cdk' import {joinPath, makeUlid, Rx, rxjs, z} from '@usevenice/util' +import {inngest} from '../events' import {parseWebhookRequest} from '../parseWebhookRequest' import {zSyncOptions} from '../types' import {protectedProcedure, remoteProcedure, trpc} from './_base' @@ -51,7 +52,9 @@ export const resourceRouter = trpc.router({ openapi: { method: 'POST', path: '/core/resource/{id}/source_sync', - tags, + tags: ['Internal'], + description: + 'Return records that would have otherwise been emitted during a sync and return it instead', }, }) .input( @@ -259,11 +262,19 @@ export const resourceRouter = trpc.router({ // MARK: - Sync syncResource: protectedProcedure - .input(z.tuple([zId('reso'), zSyncOptions.optional()])) - .mutation(async function syncResource({input: [resoId, opts], ctx}) { + .meta({openapi: {method: 'POST', path: '/core/resource/{id}/_sync', tags}}) + .input(z.object({id: zId('reso')}).merge(zSyncOptions)) + .output(z.void()) + .mutation(async function syncResource({input: {id: resoId, ...opts}, ctx}) { if (ctx.viewer.role === 'end_user') { await ctx.services.getResourceOrFail(resoId) } + if (opts?.async) { + await inngest.send('sync/resource-requested', { + data: {resourceId: resoId}, + }) + return + } const reso = await ctx.asOrgIfNeeded.getResourceExpandedOrFail(resoId) console.log('[syncResource]', reso, opts) // No need to checkResource here as sourceSync should take care of it diff --git a/packages/engine-backend/types.ts b/packages/engine-backend/types.ts index 85c01913..c75f4a2e 100644 --- a/packages/engine-backend/types.ts +++ b/packages/engine-backend/types.ts @@ -55,22 +55,33 @@ export interface PipelineInput< } export const zSyncOptions = z.object({ - /** Only sync resource metadata and skip pipelines */ - metaOnly: z.boolean().nullish(), - /** - * Remove `state` of resource and trigger a full resync - */ - fullResync: z.boolean().nullish(), + async: z + .boolean() + .nullish() + .describe( + 'Run sync in the background, not compatible with other options for now...', + ), - /** - * Triggers provider to refresh data from its source - * https://plaid.com/docs/api/products/transactions/#transactionsrefresh - * This may also load historical transactions. For example, - * Finicity treats historical transaction as premium service. - */ - todo_upstreamRefresh: z.boolean().nullish(), + metaOnly: z + .boolean() + .nullish() + .describe('Only sync resource metadata and skip pipelines '), + + fullResync: z + .boolean() + .nullish() + .describe('Remove `state` of pipeline and trigger a full resync'), + + todo_upstreamRefresh: z.boolean().nullish().describe(` + Triggers provider to refresh data from its source + https://plaid.com/docs/api/products/transactions/#transactionsrefresh + This may also load historical transactions. For example, + Finicity treats historical transaction as premium service. + `), // See coda's implmementation. Requires adding a new message to the sync protocol // to remove all data from a particular source_id - todo_removeUnsyncedData: z.boolean().nullish(), + todo_removeUnsyncedData: z.boolean().nullish().describe(` + See coda's implmementation. Requires adding a new message to the sync protocol + to remove all data from a particular source_id`), })