forked from flumebase/flumebase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TODO
181 lines (139 loc) · 8.25 KB
/
TODO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
Regression:
Code introduced during FindBugs refactoring makes \f hang the console.
most important new features:
-- range intervals over rows, not just time
-- chunking on the fly (not rolling windows)
-- ability to have these intervals start exactly on the hour/minute/etc
-- persistent flow metastore
-- JSON event input format
-- dependencies: LIST<T>, MAP<KT,VT>
-- multi-threading, distribution, scalability
-- udf api
Types:
-- Add CHARACTER type which is a single char.
-- need Map<T1, T2> types
-- more list functions:
prepend/append(list<t> lst, t val) -> returns lst @ [val] ?
reverse(list<t> lst) -> reverses the order of elements in lst
map_fn(fn<t> f, list<t> lst) -> applies f to each value in lst and returns the result list.
exists(fn<t> f, list<t> lst) -> true if f returns true for some element of lst
foldl(fn<x, t> f, x initial, list<t> lst) -> folds f over elements of lst, using
initial as the first accumulator value.
-- BINARY type should be able to specify encoding when converting to/from STRING.
Aggregation:
-- unit test, unit test, unit test, baby.
... for a bit later:
-- Agg funcs cannot be used in broader expressions (Cannot select 1 + COUNT(x)).
In cases where they are used like this, assign the function's output a new
anonymous name, promote the aggfunc to the aggregate functions list,
and replace the call to the agg func in the expression with an IdentExpr to
look up the value of this function's output.
-- Allow use of WINDOW clauses inside AliasedExprs, rather than having a single
global GROUP BY?
-- Implement SQL:2003 windowing functions like LEAD, LAG, RANK. See slides at
http://wwwlgis.informatik.uni-kl.de/cms/fileadmin/courses/SS2008/NEDM/RDDM.Chapter.06.Windows_and_Query_Functions_in_SQL.pdf
-- Also new scalar functions listed there.
-- Allow additional expr arguments (where isConstant() is true) to parameterize calls
to aggregate functions. e.g., LEAD(x, 2), LAG(y, 3), etc.
-- CHECK: Do we ever need non-ConstExpr arguments? Do we ever need to aggregate over
two columns at once?
Remote environment stuff:
-- Design question: think about whether operations like ExecEnv.setFlowName() should be
synchronous or async. They're currently the latter -- would users expect
the former?
-- Icing: Operations like \w <streamName> should call a method that resolves a name to
a flowId, and then calls watch() on the flowId.
Joins:
I don't know that the various calls to streamSymbol.getName() are actually
the correct name here -- I think we want the user's alias to get its way
into the various source types.
SelectStmt should support WINDOW clauses at the end.
^-- need to eval them to WindowSpecs and store these bindings for use in joinedsrc
evaluation to hashjoinnodes.
^-- actually, I think this already works (it does for GROUP BY). Add a unit test
to ensure that they work with joins, too.
.. what happens if we use the same stream alias in two different subtrees of
a compound select stmt?
SELECT ... FROM
((SELECT x FROM f JOIN g) AS s1)
JOIN
((SELECT x FROM h AS f JOIN g) AS s2)
Specifically w.r.t. concerns over the lists returned by getSourceNames().. does
this stop at the SELECT level, or does it recursively gather?
I think we're ok for SELECT/JOIN... but I don't know how we will do for predicate
push-down later..
Expressions:
- Refine user_sel rule (or TypeChecker) so that it cannot accept identifier names that
end in "_". (We need these for internal field identifiers in avro.)
Technical debt:
- Write more unit tests as outlined at the bottom of TestSelect.
- Need more tests in TestTypeChecker; see TODO in comments.
- LocalEnv / LocalEnvThread is full of SelectableQueue<Object> because we put
both EventWrappers and ControlOps in different threads. We then typecase on
what we get back. Would be nice if we could create some common interface
with a dispatch() / delegate()-and-handle() model to clean up the big
typecase in the run() method...
- Select.join() will consult its underlying Selectables in a stable order,
which means we might starve later Selectables in the list. We need to
perturb this to ensure fairness.
- The various dag operators all type-parameterized by what sort of nodes
they expect to exist in the dag. They're invoked by calling theDag.reverseBfs(operator),
etc. This is backwards. Each operator is intended to be run in a specific fashion
(bfs, dfs, etc). We should have several operator subclasses; the base specifies
an abstract exec(theDag) method; the subclasses BfsOperator, DfsOperator, etc. then
invoke the right dag.reverseBfs(this), dag.dfs(this), etc.
SQL Features (in rough priority order):
- LIMIT (within a window?)
- [windowed] ORDER BY -- how does this work for overlapping windows?
Bug in CREATE STREAM AS SELECT and Flume node management:
- We currently assume the Flume world is fairly static. If we set up a stream to
read from node 'x', we will create a new local logical node named 'x-receiver'.
If 'x' disappears, we do not delete 'x-receiver'. But if 'x' later reappears,
we never re-connect it to 'x-receiver'. The changes needed to handle this would
be in ForeignNodeConn / EmbeddedFlumeConfig.
** Consider a watchdog thread associated with each receiver. If the upstream
disappears, close the receiver and all associated flows.
** Even better: a watchdog that waits for the upstream node to disappear. If it
then reappears, we re-configure it to again connect down to our x-receiver
in addition to whatever new downstream source it has.
- CSAS creates a local output node for the stream. If the stream gets renamed
using \dname, or dropped (via DROP STREAM), any dependent running flows will
terminate.
** We should not allow dropping a flow associated with CSAS as long as any flows
require that flow to exist upstream.
** More broadly: We should not allow DROP STREAM if any running flows depend on
that stream.
Bugs:
- Quitting is very slow due to the Flume shutdown. Can we improve this?
... it also emits a scary looking error message, that we should suppress for
hygeine's sake.
- Windowed JOIN may cause server crash under undefined conditions.
Features:
- Need ability to parse strings into timestamps.
- Windowing should operate on 'previous n rows' too.
Associate a rowid stamp with every event on input.
Start each stream at 0 for each query, join operates like a "zipper"
- Need ability to run remote physical plan on a set of configured nodes.
- EventParser names/implementations should be accessed through the BuiltInSymbolTable.
- Need a MapFunc API to allow 1-to-many transformations, and LATERAL VIEW syntax.
- Should be able to CREATE STREAM AS FILE and then specify the file format. Right now
we can't practically use avro data in files, because we parse the file itself as
if it is \n-terminated text records.
- Replace jline with libjcsi or a Java readline wrapper
Optimizations:
- ProjectionNode instances with identical input and output schemata should be removed.
- ProjectionNode immediately after NamedSource should be fused.
-- longer term: projection (and filtering) should be pushed up into previous
FlowElements, when we use DAG tiling.
- AvroEventWrapper could be improved by deserializing only the fields necessary
for a given combined set of FE's in advance, knowing the input, output, and also
internal-use schemas. (We may take in <int, string> and emit <int, string> but
only query the int component; we could project onto that as we deserialize, and
then save ourselves the trouble on the string.)
- Expr.eval() should go by the wayside; what we really want is each Expr contributing
a set of three-addr codes to a set of basic blocks that corresponds to the set of
expressions the user wants emitted; then we can perform further optimization on
this like common subexpression elimination, etc. We need to define a RTL that can
handle all this. Then if our opcodes are things like AddIntInt, AddFloatFloat, etc.,
we can dispense with the stored Type instances inside Exprs being required at
run time.