Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sqs example #763

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added sqs example
tjcoyoca committed Jun 28, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit dcc7689f2affadf7fa0595d69a427a2c673a85c6
42 changes: 42 additions & 0 deletions apps/sqs-example/bin/jobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env node
const { promisify } = require('util');
const writeFile = promisify(require('fs').writeFile);
const path = require('path');

if (!process.env.NODE_ENV) process.env.NODE_ENV = 'development';

let logger;
let JobScheduler;

if (['testing', 'production'].includes(process.env.NODE_ENV)) {
logger = require('../lib/logger').default;
JobScheduler = require('../lib').default;
} else {
require('ts-node').register({ transpileOnly: true }); // or babel-register
logger = require('../src/logger').default;
JobScheduler = require('../src').default;
}

process.title = 'jobs';

const start = async () => {
try {
await JobScheduler.runScheduledJobs();
logger.info('scheduled jobs are ready to publish messages...');
} catch (e) {
logger.error(e);
process.exit(1);
}
};

for (const signal of ['SIGTERM', 'SIGINT']) {
process.on(signal, async () => {
if (process.env.NODE_ENV === 'development') {
process.exit();
}
logger.info(`Received ${signal}, setting termination lock`);
await writeFile(path.join(process.cwd(), 'termination.lock'), signal);
});
}

start();
32 changes: 32 additions & 0 deletions apps/sqs-example/config/default.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
const bunyan = require('bunyan');

module.exports = {
nodeEnv: process.env.NODE_ENV || 'development',
sandbox: process.env.SANDBOX || false,
logger: {
level: bunyan.INFO,
},
serviceName: 'netsuite',
// Token for segment analytics
segmentToken: process.env.SEGMENT_TOKEN || '0000',
useSegment: process.env.USE_SEGMENT || false,
jwtSecret: process.env.JWT_SECRET || 'testsecret',
steveoWorkerCount: process.env.STEVEO_WORKER_COUNT || 1,
sentryDSN: process.env.SENTRY_DSN || 'netsuite',
steveoPollInterval: process.env.STEVEO_POLL_INTERVAL || 1000,
awsAccessKey: process.env.AWS_ACCESS_KEY,
awsSecretKey: process.env.AWS_SECRET_ACCESS_KEY,
awsRegion: process.env.AWS_REGION || 'us-east-1',
kafka: {
bootstrapServers: process.env.KAFKA_BOOTSTRAP_SERVERS || '',
consumerGroupId:
process.env.KAFKA_CONSUMER_GROUP_ID || 'netsuite_CONSUMERS',
defaultPartitions: 3,
},
db: {
uri: process.env.DATABASE_URI,
initializationUri: process.env.DATABASE_INITIALIZATION_URI,
name: 'netsuite',
},
defaultJobRunInterval: process.env.DEFAULT_JOB_LAG || 5000, // seconds
};
1 change: 1 addition & 0 deletions apps/sqs-example/lib/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export {};
37 changes: 37 additions & 0 deletions apps/sqs-example/lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const logger_1 = require("./logger");
const steveo_sqs_1 = __importDefault(require("./steveo_sqs"));
const fifo_example_task_1 = __importDefault(require("./tasks/fifo_example_task"));
// const activeTask = process.env.TASK;
const tasks = {
fifo: fifo_example_task_1.default,
};
let active = true;
process.on('SIGINT', async () => {
active = false;
await steveo_sqs_1.default.stop();
});
process.on('unhandledRejection', (reason, promise) => {
steveo_sqs_1.default.logger.error('Unhandled Rejection', { reason, promise });
});
const start = async () => {
try {
await steveo_sqs_1.default.start();
logger_1.logger.info('consumers ready to process messages...');
while (active) {
const id = crypto.randomUUID();
await tasks.fifo.publish({ message: `Hello ${id}` });
steveo_sqs_1.default.logger.info('Published message', { id });
await setTimeout(500);
}
}
catch (e) {
logger_1.logger.error(e);
process.exit(1);
}
};
start();
3 changes: 3 additions & 0 deletions apps/sqs-example/lib/logger.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import bunyan from 'bunyan';
export declare const logger: bunyan;
export default logger;
9 changes: 9 additions & 0 deletions apps/sqs-example/lib/logger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.logger = void 0;
const bunyan_1 = __importDefault(require("bunyan"));
exports.logger = bunyan_1.default.createLogger({ name: 'test-sequelize' });
exports.default = exports.logger;
2 changes: 2 additions & 0 deletions apps/sqs-example/lib/steveo_sqs.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
declare const steveo: import("steveo").Steveo;
export default steveo;
66 changes: 66 additions & 0 deletions apps/sqs-example/lib/steveo_sqs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const steveo_1 = __importDefault(require("steveo"));
const https_1 = __importDefault(require("https"));
const config_1 = __importDefault(require("config"));
const path_1 = __importDefault(require("path"));
const logger_1 = __importDefault(require("./logger"));
const workerCount = config_1.default.get('steveoWorkerCount');
const steveoPollInterval = config_1.default.get('steveoPollInterval');
const nodeEnv = config_1.default.get('nodeEnv');
const awsAccessKey = config_1.default.has('awsAccessKey')
? config_1.default.get('awsAccessKey')
: undefined;
const awsSecretKey = config_1.default.has('awsSecretKey')
? config_1.default.get('awsSecretKey')
: undefined;
const awsRegion = config_1.default.get('awsRegion');
const sandbox = config_1.default.get('sandbox');
const sqsEndpoint = config_1.default.has('sqsEndpoint')
? config_1.default.get('sqsEndpoint')
: undefined;
const steveoConfig = {
region: awsRegion,
apiVersion: '2012-11-05',
receiveMessageWaitTimeSeconds: '20',
messageRetentionPeriod: '604800',
engine: 'sqs',
queuePrefix: sandbox ? 'testing' : `${nodeEnv}`,
accessKeyId: awsAccessKey,
secretAccessKey: awsSecretKey,
shuffleQueue: false,
endpoint: sqsEndpoint,
maxNumberOfMessages: 1,
workerConfig: {
max: workerCount,
},
visibilityTimeout: 180,
waitTimeSeconds: 2,
consumerPollInterval: steveoPollInterval,
httpOptions: nodeEnv === 'development'
? {
agent: new https_1.default.Agent({
rejectUnauthorized: false,
}),
}
: undefined,
tasksPath: path_1.default.resolve(__dirname, './tasks'),
upperCaseNames: true,
};
const steveo = (0, steveo_1.default)(steveoConfig, logger_1.default);
steveo.events.on('runner_failure', async (topic, ex, params) => {
logger_1.default.error(ex, { tags: { topic }, params });
});
steveo.events.on('producer_failure', async (topic, ex, params) => {
logger_1.default.error(ex, { tags: { topic }, params });
});
steveo.events.on('producer_success', async (topic, data) => {
logger_1.default.info('Message succesfully produced', topic, data);
});
steveo.events.on('task_failure', async (topic, ex, params) => {
logger_1.default.error(ex, { tags: { topic }, params });
});
exports.default = steveo;
3 changes: 3 additions & 0 deletions apps/sqs-example/lib/tasks/fifo_example_task.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export declare const callback: (payload: any) => Promise<void>;
declare const _default: import("steveo/lib/common").ITask<any, any>;
export default _default;
12 changes: 12 additions & 0 deletions apps/sqs-example/lib/tasks/fifo_example_task.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.callback = void 0;
const steveo_sqs_1 = __importDefault(require("../steveo_sqs"));
const callback = async (payload) => {
console.log('Running fifo task', payload);
};
exports.callback = callback;
exports.default = steveo_sqs_1.default.task('fifo_example', exports.callback, { fifo: true });
60 changes: 60 additions & 0 deletions apps/sqs-example/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"name": "@ordermentum/sqs-example",
"version": "3.0.4",
"description": "A playground for steveo SQS",
"main": "src/index.js",
"directories": {
"example": "example",
"test": "test"
},
"files": [
"src/*"
],
"scripts": {
"start": "node ./lib/index.js",
"lint": "yarn eslint 'src/**/*.{ts,js}'",
"build:coverage": "nyc check-coverage --statements 74 --branches 60 --functions 66 --lines 72",
"test": "NODE_ENV=test nyc npm run spec",
"report": "./node_modules/.bin/nyc report --reporter=html",
"spec": "mocha -R spec -r ts-node/register/transpile-only test/*.*",
"spec:runner": "mocha -R spec -r ts-node/register/transpile-only",
"build": "yarn clean && yarn tsc",
"prepublish": "yarn run build && yarn spec",
"clean": "rm -rf lib",
"reporter": "nyc --reporter=html yarn run test",
"typecheck": "tsc --noEmit"
},
"repository": {
"type": "git",
"url": "https://github.com/ordermentum/steveo.git"
},
"keywords": [
"sqs",
"queue"
],
"license": "Apache-2.0",
"dependencies": {
"bunyan": "^1.8.15",
"config": "^3.3.7",
"moment-timezone": "^0.5.33",
"steveo": "*",
"uuid": "^8.3.2"
},
"devDependencies": {
"@types/bunyan": "1.8.8",
"@types/chai": "4.3.16",
"@types/config": "0.0.41",
"@types/mocha": "8.2.3",
"@types/node": "17.0.45",
"@types/qs": "6.9.15",
"@types/sinon": "10.0.20",
"@types/uuid": "8.3.4",
"@types/validator": "13.11.10",
"chai": "4.4.1",
"cross-env": "7.0.3",
"eslint-config-ordermentum": "1.0.6",
"nyc": "15.1.0",
"ts-node": "10.9.2",
"typescript": "4.9.5"
}
}
40 changes: 40 additions & 0 deletions apps/sqs-example/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// @ts-nocheck
import config from 'config';
import { logger } from './logger';
import steveo from './steveo_sqs';
import fifoTask from './tasks/fifo_example_task';

// const activeTask = process.env.TASK;

const tasks = {
fifo: fifoTask,
};

let active = true;
process.on('SIGINT', async () => {
active = false;
await steveo.stop();
});

process.on('unhandledRejection', (reason, promise) => {
steveo.logger.error('Unhandled Rejection', { reason, promise });
});

const start = async () => {
try {
await steveo.start();
logger.info('consumers ready to process messages...');

while (active) {
const id = crypto.randomUUID();
await tasks.fifo.publish({ message: `Hello ${id}` });
steveo.logger.info('Published message', { id });
await setTimeout(500);
}
} catch (e) {
logger.error(e);
process.exit(1);
}
};

start();
5 changes: 5 additions & 0 deletions apps/sqs-example/src/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import bunyan from 'bunyan';

export const logger = bunyan.createLogger({ name: 'test-sequelize' });

export default logger;
Loading

Unchanged files with check annotations Beta

if (
job.failures < maxRestartsOnFailure &&
(!jobsSafeToRestart || jobsSafeToRestart.includes(job.name))

Check warning on line 202 in packages/scheduler-prisma/src/helpers.ts

GitHub Actions / lint

Unnecessary conditional, value is always falsy
) {
const backoff = retryDelay(job.failures, backOffMs);
const nextRunAt = moment()
task: TaskCallback<T, R>
) =>
async (args: T, context: JobContext): Promise<any> => {
const jobId = args?.context?.job?.id ?? context?.job?.id;

Check warning on line 304 in packages/scheduler-prisma/src/helpers.ts

GitHub Actions / lint

Unnecessary optional chain on a non-nullish value

Check warning on line 304 in packages/scheduler-prisma/src/helpers.ts

GitHub Actions / lint

Unnecessary optional chain on a non-nullish value
if (!jobId) {
try {
}
}
const jobInstance = await client?.job?.findUnique({

Check warning on line 314 in packages/scheduler-prisma/src/helpers.ts

GitHub Actions / lint

Unnecessary optional chain on a non-nullish value

Check warning on line 314 in packages/scheduler-prisma/src/helpers.ts

GitHub Actions / lint

Unnecessary optional chain on a non-nullish value
where: jobScheduler.namespace
? // @ts-expect-error namespace is an optional feature
{ id_namespace: { id: jobId, namespace: jobScheduler.namespace } }
publishMessages = async (rows: JobSet[]) => {
if (!rows || !rows.length) {
return false;

Check warning on line 336 in packages/scheduler-prisma/src/index.ts

GitHub Actions / lint

Unnecessary conditional, value is always falsy
}
this.processing = true;
const { name, items } = batch;
const task = this.tasks[name];
if (task) {
for (const item of items) {

Check warning on line 345 in packages/scheduler-prisma/src/index.ts

GitHub Actions / lint

Unnecessary conditional, value is always truthy
try {
// @ts-ignore
await task(item.data, {
: [];
if (laggyJobs && laggyJobs.length > 0) {
const notifyJobs: Job[] = [];

Check warning on line 127 in packages/scheduler-prisma/src/maintenance.ts

GitHub Actions / lint

Unnecessary conditional, value is always truthy
const resetJobs: Job[] = [];
// Final automated reset check for jobs that we consider safe to do so, otherwise notify
laggyJobs.forEach(laggyJob => {
const restartAfter = jobsCustomRestart[laggyJob.name];
if (restartAfter) {
if (

Check warning on line 134 in packages/scheduler-prisma/src/maintenance.ts

GitHub Actions / lint

Unnecessary conditional, value is always truthy
moment(laggyJob.acceptedAt).add(moment.duration(restartAfter)) >
moment()
) {
) => Math.round((jitter ? Math.random() : 1) * backoff * factor ** attempt);
const updateFailure = async (job: JobInstance, jobScheduler: JobScheduler) => {
if (!job) {

Check warning on line 140 in packages/scheduler-sequelize/src/helpers.ts

GitHub Actions / lint

Unnecessary conditional, value is always falsy
return;
}
const {