Skip to content

Commit

Permalink
Merge pull request #931 from terascope/merge-common-asset
Browse files Browse the repository at this point in the history
migrate processors from common assets
  • Loading branch information
ciorg authored Nov 13, 2024
2 parents 9b3ff4d + 6e4c95d commit 6a683b0
Show file tree
Hide file tree
Showing 92 changed files with 4,769 additions and 222 deletions.
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "standard",
"version": "1.1.0",
"version": "1.2.0",
"description": "Teraslice standard processor asset bundle"
}
40 changes: 23 additions & 17 deletions asset/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "standard",
"displayName": "Asset",
"version": "1.1.0",
"version": "1.2.0",
"private": true,
"description": "Teraslice standard processor asset bundle",
"repository": {
Expand All @@ -21,25 +21,31 @@
"test": "yarn --cwd ../ test"
},
"dependencies": {
"@faker-js/faker": "^9.2.0",
"@terascope/job-components": "^1.5.1",
"@terascope/standard-asset-apis": "^1.0.2",
"@terascope/utils": "^1.3.2",
"@types/chance": "^1.1.4",
"@types/express": "^4.17.19",
"chance": "^1.1.12",
"express": "^4.21.1",
"mocker-data-generator": "^3.0.3",
"prom-client": "^15.1.2",
"randexp": "^0.5.3",
"short-unique-id": "^5.2.0",
"timsort": "^0.3.0",
"ts-transforms": "^1.3.2",
"tslib": "^2.8.1"
"@faker-js/faker": "~9.2.0",
"@terascope/data-mate": "~1.4.0",
"@terascope/job-components": "~1.6.0",
"@terascope/standard-asset-apis": "~1.0.2",
"@terascope/teraslice-state-storage": "~1.4.0",
"@terascope/utils": "~1.4.0",
"@types/chance": "~1.1.4",
"@types/express": "~4.17.19",
"chance": "~1.1.12",
"express": "~4.21.1",
"mocker-data-generator": "~3.0.3",
"ms": "^2.1.3",
"prom-client": "~15.1.2",
"randexp": "~0.5.3",
"short-unique-id": "~5.2.0",
"timsort": "~0.3.0",
"ts-transforms": "~1.4.0",
"tslib": "~2.8.1"
},
"engines": {
"node": ">=18.0.0",
"yarn": ">=1.22.19"
},
"terascope": {}
"terascope": {},
"devDependencies": {
"@types/ms": "^0.7.34"
}
}
13 changes: 8 additions & 5 deletions asset/src/accumulate/processor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import {
BatchProcessor, DataEntity, Context
} from '@terascope/job-components';
import { BatchProcessor, DataEntity, Context } from '@terascope/job-components';
import { ExecutionConfig } from '@terascope/types';
import { AccumulateConfig } from './interfaces.js';
import DataWindow from '../__lib/data-window.js';
Expand All @@ -25,9 +23,14 @@ export default class Accumulate extends BatchProcessor<AccumulateConfig> {
}

async onBatch(dataArray: DataEntity[]): Promise<DataEntity[]> {
if (dataArray.length === 0) this.accumulator.emptySlice();
else this.accumulator.add(dataArray);
if (dataArray.length === 0) {
this.accumulator.emptySlice();
} else {
this.accumulator.add(dataArray);
}

let results: DataEntity[] = [];

if ((this.accumulator.readyToEmpty() || this.flushData) && this.accumulator.size > 0) {
results = DataWindow.make(
this.opConfig.data_window_key,
Expand Down
16 changes: 10 additions & 6 deletions asset/src/accumulate_by_key/processor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import {
BatchProcessor, Context, DataEntity
} from '@terascope/job-components';
import { BatchProcessor, Context, DataEntity } from '@terascope/job-components';
import { ExecutionConfig } from '@terascope/types';
import { AccumulateByKeyConfig } from './interfaces.js';
import AccumulatorByKey from '../__lib/accumulator-key.js';
Expand All @@ -27,10 +25,16 @@ export default class AccumulateByKey extends BatchProcessor<AccumulateByKeyConfi

async onBatch(dataArray: DataEntity[]): Promise<DataEntity[]> {
// on shutdown event return accumulated data
if (dataArray.length === 0) this.accumulator.emptySlice();
else this.accumulator.add(dataArray);
if (dataArray.length === 0) {
this.accumulator.emptySlice();
} else {
this.accumulator.add(dataArray);
}

if (this.accumulator.readyToEmpty() || this.flushData) {
return this.accumulator.flush();
}

if (this.accumulator.readyToEmpty() || this.flushData) return this.accumulator.flush();
return [];
}
}
26 changes: 6 additions & 20 deletions asset/src/add_key/processor.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,12 @@
import { BatchProcessor, DataEntity, AnyObject } from '@terascope/job-components';
import {
BatchProcessor,
DataEntity,
AnyObject
} from '@terascope/job-components';
import {
get,
isObjectEntity,
isEmpty,
isString,
toNumber,
geoHash,
setPrecision,
isGeoShapePoint,
isPlainObject,
get, isObjectEntity, isEmpty,
isString, toNumber, geoHash,
setPrecision, isGeoShapePoint, isPlainObject,
flatten
} from '@terascope/utils';
import {
GeoShapePoint,
GeoShapeType
} from '@terascope/types';

import crypto from 'crypto';
import { GeoShapePoint, GeoShapeType } from '@terascope/types';
import crypto from 'node:crypto';
import DataWindow from '../__lib/data-window.js';

export default class AddKey extends BatchProcessor {
Expand Down
6 changes: 1 addition & 5 deletions asset/src/add_short_id/processor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import {
MapProcessor,
DataEntity,
Context
} from '@terascope/job-components';
import { MapProcessor, DataEntity, Context } from '@terascope/job-components';
import { ExecutionConfig, OpConfig } from '@terascope/types';
import ShortUniqueId from 'short-unique-id';
import DataWindow from '../__lib/data-window.js';
Expand Down
7 changes: 7 additions & 0 deletions asset/src/copy_field/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { OpConfig } from '@terascope/types';

export interface CopyFieldConfig extends OpConfig {
source: string;
destination: string;
delete_source: boolean;
}
9 changes: 3 additions & 6 deletions asset/src/copy_field/processor.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import {
MapProcessor,
OpConfig,
DataEntity,
} from '@terascope/job-components';
import { MapProcessor, DataEntity } from '@terascope/job-components';
import { get, set } from '@terascope/utils';
import { CopyFieldConfig } from './interfaces.js';
import DataWindow from '../__lib/data-window.js';

export default class CopyField extends MapProcessor<OpConfig> {
export default class CopyField extends MapProcessor<CopyFieldConfig> {
map(doc: DataEntity): DataEntity {
if (doc instanceof DataWindow) {
return this.handleDataWindow(doc);
Expand Down
4 changes: 2 additions & 2 deletions asset/src/copy_field/schema.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ConvictSchema } from '@terascope/job-components';
import { DateRouterConfig } from '@terascope/standard-asset-apis';
import { CopyFieldConfig } from './interfaces.js';

export default class Schema extends ConvictSchema<DateRouterConfig> {
export default class Schema extends ConvictSchema<CopyFieldConfig> {
build(): Record<string, any> {
return {
source: {
Expand Down
6 changes: 6 additions & 0 deletions asset/src/copy_metadata_field/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { OpConfig } from '@terascope/types';

export interface CopyMetadataFieldConfig extends OpConfig {
destination: string;
meta_key: string;
}
10 changes: 10 additions & 0 deletions asset/src/copy_metadata_field/processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { MapProcessor, DataEntity } from '@terascope/job-components';
import { CopyMetadataFieldConfig } from './interfaces.js';

// generalize any meta data field retrieval CopyMetadataField
export default class CopyMetadataField extends MapProcessor<CopyMetadataFieldConfig> {
map(doc: DataEntity) {
doc[this.opConfig.destination] = doc.getMetadata(this.opConfig.meta_key);
return doc;
}
}
19 changes: 19 additions & 0 deletions asset/src/copy_metadata_field/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { ConvictSchema } from '@terascope/job-components';
import { CopyMetadataFieldConfig } from './interfaces.js';

export default class Schema extends ConvictSchema<CopyMetadataFieldConfig> {
build() {
return {
destination: {
doc: 'The property to copy to',
format: 'required_String',
default: null
},
meta_key: {
doc: 'The Dataentity metadata key to copy',
format: 'required_String',
default: '_key'
}
};
}
}
4 changes: 0 additions & 4 deletions asset/src/count_by_field/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import { OpConfig } from '@terascope/types';

export interface CountByField {
op_name: string;
}

export interface CountByFieldConfig extends OpConfig {
field: string;
collect_metrics: boolean;
Expand Down
8 changes: 5 additions & 3 deletions asset/src/count_by_field/processor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import {
MapProcessor, DataEntity, isPromAvailable
} from '@terascope/job-components';
import { MapProcessor, DataEntity, isPromAvailable } from '@terascope/job-components';
import { CountByFieldConfig } from './interfaces.js';

type Counters = {
Expand All @@ -9,15 +7,19 @@ type Counters = {
field: string;
};
};

export default class CountByField extends MapProcessor<CountByFieldConfig> {
static counters: Counters = {};

async initialize(): Promise<void> {
const { opConfig, context } = this;

if (opConfig.collect_metrics && isPromAvailable(context)) {
const defaultLabels = context.apis.foundation.promMetrics.getDefaultLabels();
const name = `${this.opConfig._op}_count_total`;
const help = `${this.opConfig._op} value field count`;
const labelNames = [...Object.keys(defaultLabels), 'value', 'field', 'op_name'];

await this.context.apis.foundation.promMetrics.addCounter(
name,
help,
Expand Down
7 changes: 7 additions & 0 deletions asset/src/count_unique/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { OpConfig } from '@terascope/types';

export interface CountUniqueConfig extends OpConfig {
preserve_fields: string[];
field: string;
is_meta_field: boolean;
}
37 changes: 37 additions & 0 deletions asset/src/count_unique/processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { BatchProcessor, DataEntity, has } from '@terascope/job-components';
import { CountUniqueConfig } from './interfaces.js';

export default class CountUnique extends BatchProcessor<CountUniqueConfig> {
async onBatch(dataArray: DataEntity[]) {
const results: Record<string, DataEntity> = {};

for (const doc of dataArray) {
const key = this._getIdentifier(doc);

if (!has(results, key)) {
results[key] = DataEntity.make({
count: 0,
_key: key
}, { _key: key });
}

results[key].count++;

this.opConfig.preserve_fields.forEach((field) => {
if (doc[field] != null) {
results[key][field] = doc[field];
}
});
}

return Object.values(results);
}

private _getIdentifier(doc: DataEntity): any {
if (this.opConfig.is_meta_field) {
return doc.getMetadata(this.opConfig.field);
}

return doc[this.opConfig.field];
}
}
28 changes: 28 additions & 0 deletions asset/src/count_unique/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { ConvictSchema, isString } from '@terascope/job-components';
import { CountUniqueConfig } from './interfaces.js';

export default class Schema extends ConvictSchema<CountUniqueConfig> {
build() {
return {
preserve_fields: {
doc: 'A list of fields whose last seen values are added to the result in addition to the count',
default: [],
format: (input: unknown) => {
if (!Array.isArray(input) || input.some((val) => !isString(val))) {
throw new Error('Parameter "preserve_fields" must be an array of strings');
}
}
},
field: {
doc: 'Field that is counted, defaults to metadata _key',
default: '_key',
format: 'required_String'
},
is_meta_field: {
doc: 'determines if the field to count on lives as a DataEntity meta field or on the record itself',
default: true,
format: Boolean
}
};
}
}
9 changes: 2 additions & 7 deletions asset/src/data_generator/schema.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import {
ConvictSchema,
ValidatedJobConfig,
getOpConfig,
AnyObject,
isNotNil,
getTypeOf,
isString
ConvictSchema, ValidatedJobConfig, getOpConfig,
AnyObject, isNotNil, getTypeOf, isString
} from '@terascope/job-components';
import { DataGenerator, IDType, DateOptions } from './interfaces.js';

Expand Down
5 changes: 1 addition & 4 deletions asset/src/debug_routes/processor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import {
BatchProcessor,
DataEntity,
} from '@terascope/job-components';
import { BatchProcessor, DataEntity } from '@terascope/job-components';
import { inspect } from 'util';

export default class DebugRoutesProcessor extends BatchProcessor {
Expand Down
19 changes: 19 additions & 0 deletions asset/src/filter/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { OpConfig } from '@terascope/types';

export interface ExceptionRule {
field: string;
value: any;
regex?: boolean;
}

export interface FilterConfig extends OpConfig {
field: string | string [];
value?: any;
invert: boolean;
array_index: number;
filter_by: string;
validation_function?: string;
validation_function_args?: any;
filtered_to_dead_letter_queue: boolean;
exception_rules?: ExceptionRule[];
}
Loading

0 comments on commit 6a683b0

Please sign in to comment.