-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
123 lines (105 loc) · 2.52 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
'use strict'
const assert = require('assert')
const AbortController = require('abort-controller')
async function * mapIterator (iterator, func, n = 16) {
// This works by creating two separate "processes" one that
// reads from the source iterator and enqueues tasks into the
// promises queue and another "process" that waits for tasks
// in the queue to finish and yield them back to the caller.
const promises = []
const ac = new AbortController()
let next
let done = false
let error
// pump reads from the source and invokes the transform
// func so that the promises queue always has n number
// of items.
async function pump () {
try {
for await (const item of iterator) {
if (done) {
return
}
let p
try {
p = func(item, { signal: ac.signal })
} catch (err) {
p = Promise.reject(err)
}
promises.push(p)
p.catch(() => {
done = true
})
if (next) {
next()
next = null
}
if (!done && promises.length >= n) {
await new Promise(resolve => {
next = resolve
})
assert(done || promises.length < n)
}
}
} catch (err) {
error = err
} finally {
done = true
if (next) {
next()
next = null
}
}
}
pump()
try {
// sequentially read and resolve each item in
// the promise list
while (true) {
while (promises.length > 0) {
yield await promises[0]
promises.shift()
if (next) {
next()
next = null
}
}
if (error) {
throw error
}
if (done) {
return
}
await new Promise(resolve => {
next = resolve
})
assert(done || promises.length > 0)
}
} finally {
ac.abort()
done = true
if (next) {
next()
next = null
}
}
}
async function map (iterator, func, n = 16) {
iterator = mapIterator(iterator, func, n)
const results = []
for await (const item of iterator) {
results.push(item)
}
return results
}
async function forEach (iterator, func, n = 16) {
iterator = mapIterator(iterator, func, n)
// eslint-disable-next-line no-unused-vars
for await (const item of iterator) {
// Do nothing.
}
}
module.exports.forEach = forEach
module.exports.mapIterator = mapIterator
module.exports.map = map
module.exports.mapper = (func, n = 16) => iterator => mapIterator(iterator, func, n)