-
Notifications
You must be signed in to change notification settings - Fork 322
/
Copy pathmem_catalog.h
777 lines (710 loc) · 25.9 KB
/
mem_catalog.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HYBRIDSE_INCLUDE_VM_MEM_CATALOG_H_
#define HYBRIDSE_INCLUDE_VM_MEM_CATALOG_H_
#include <deque>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "base/fe_slice.h"
#include "codec/list_iterator_codec.h"
#include "glog/logging.h"
#include "vm/catalog.h"
namespace hybridse {
namespace vm {
using hybridse::codec::Row;
using hybridse::codec::RowIterator;
using hybridse::codec::WindowIterator;
struct AscKeyComparor {
bool operator()(std::pair<std::string, Row> i,
std::pair<std::string, Row> j) {
return i.first < j.first;
}
};
struct AscComparor {
bool operator()(std::pair<uint64_t, Row> i, std::pair<uint64_t, Row> j) {
return i.first < j.first;
}
};
struct DescComparor {
bool operator()(std::pair<uint64_t, Row> i, std::pair<uint64_t, Row> j) {
return i.first > j.first;
}
};
typedef std::deque<std::pair<uint64_t, Row>> MemTimeTable;
typedef std::vector<Row> MemTable;
typedef std::map<std::string, MemTimeTable, std::greater<std::string>>
MemSegmentMap;
class MemTimeTableIterator : public RowIterator {
public:
MemTimeTableIterator(const MemTimeTable* table, const vm::Schema* schema);
MemTimeTableIterator(const MemTimeTable* table, const vm::Schema* schema,
int32_t start, int32_t end);
~MemTimeTableIterator();
void Seek(const uint64_t& ts);
void SeekToFirst();
const uint64_t& GetKey() const;
void Next();
bool Valid() const;
const Row& GetValue() override;
bool IsSeekable() const override;
private:
const MemTimeTable* table_;
const Schema* schema_;
const MemTimeTable::const_iterator start_iter_;
const MemTimeTable::const_iterator end_iter_;
MemTimeTable::const_iterator iter_;
};
class MemTableIterator : public RowIterator {
public:
MemTableIterator(const MemTable* table, const vm::Schema* schema);
MemTableIterator(const MemTable* table, const vm::Schema* schema,
int32_t start, int32_t end);
~MemTableIterator();
void Seek(const uint64_t& ts);
void SeekToFirst();
const uint64_t& GetKey() const;
const Row& GetValue();
void Next();
bool Valid() const;
bool IsSeekable() const override;
private:
const MemTable* table_;
const Schema* schema_;
const MemTable::const_iterator start_iter_;
const MemTable::const_iterator end_iter_;
MemTable::const_iterator iter_;
uint64_t key_;
};
class MemWindowIterator : public WindowIterator {
public:
MemWindowIterator(const MemSegmentMap* partitions, const Schema* schema);
~MemWindowIterator();
void Seek(const std::string& key);
void SeekToFirst();
void Next();
bool Valid();
std::unique_ptr<RowIterator> GetValue();
RowIterator* GetRawValue();
const Row GetKey();
private:
const MemSegmentMap* partitions_;
const Schema* schema_;
MemSegmentMap::const_iterator iter_;
const MemSegmentMap::const_iterator start_iter_;
const MemSegmentMap::const_iterator end_iter_;
};
class MemRowHandler : public RowHandler {
public:
explicit MemRowHandler(const Row row)
: RowHandler(), table_name_(""), db_(""), schema_(nullptr), row_(row) {}
MemRowHandler(const Row row, const vm::Schema* schema)
: RowHandler(), table_name_(""), db_(""), schema_(schema), row_(row) {}
~MemRowHandler() {}
const Schema* GetSchema() override { return schema_; }
const std::string& GetName() override { return table_name_; }
const std::string& GetDatabase() override { return db_; }
const Row& GetValue() override { return row_; }
const std::string GetHandlerTypeName() override { return "MemRowHandler"; }
private:
std::string table_name_;
std::string db_;
const Schema* schema_;
Row row_;
};
class MemTableHandler : public TableHandler {
public:
MemTableHandler();
explicit MemTableHandler(const Schema* schema);
MemTableHandler(const std::string& table_name, const std::string& db,
const Schema* schema);
~MemTableHandler() override;
const Types& GetTypes() override { return types_; }
inline const Schema* GetSchema() { return schema_; }
inline const std::string& GetName() { return table_name_; }
inline const IndexHint& GetIndex() { return index_hint_; }
inline const std::string& GetDatabase() { return db_; }
std::unique_ptr<RowIterator> GetIterator() override;
RowIterator* GetRawIterator() override;
std::unique_ptr<WindowIterator> GetWindowIterator(
const std::string& idx_name);
void AddRow(const Row& row);
void Reverse();
virtual const uint64_t GetCount() { return table_.size(); }
virtual Row At(uint64_t pos) {
return pos < table_.size() ? table_.at(pos) : Row();
}
const OrderType GetOrderType() const { return order_type_; }
void SetOrderType(const OrderType order_type) { order_type_ = order_type; }
const std::string GetHandlerTypeName() override {
return "MemTableHandler";
}
protected:
void Resize(const size_t size);
bool SetRow(const size_t idx, const Row& row);
const std::string table_name_;
const std::string db_;
const Schema* schema_;
Types types_;
IndexHint index_hint_;
MemTable table_;
OrderType order_type_;
};
class MemTimeTableHandler : public TableHandler {
public:
MemTimeTableHandler();
explicit MemTimeTableHandler(const Schema* schema);
MemTimeTableHandler(const std::string& table_name, const std::string& db,
const Schema* schema);
const Types& GetTypes() override;
~MemTimeTableHandler() override;
inline const Schema* GetSchema() { return schema_; }
inline const std::string& GetName() { return table_name_; }
inline const IndexHint& GetIndex() { return index_hint_; }
std::unique_ptr<RowIterator> GetIterator();
RowIterator* GetRawIterator();
inline const std::string& GetDatabase() { return db_; }
std::unique_ptr<WindowIterator> GetWindowIterator(
const std::string& idx_name);
void AddRow(const uint64_t key, const Row& v);
void AddFrontRow(const uint64_t key, const Row& v);
void PopBackRow();
void PopFrontRow();
virtual const std::pair<uint64_t, Row>& GetFrontRow() {
return table_.front();
}
virtual const std::pair<uint64_t, Row>& GetBackRow() {
return table_.back();
}
void Sort(const bool is_asc);
void Reverse();
virtual const uint64_t GetCount() { return table_.size(); }
virtual Row At(uint64_t pos) {
return pos < table_.size() ? table_.at(pos).second : Row();
}
void SetOrderType(const OrderType order_type) { order_type_ = order_type; }
const OrderType GetOrderType() const { return order_type_; }
const std::string GetHandlerTypeName() override {
return "MemTimeTableHandler";
}
protected:
const std::string table_name_;
const std::string db_;
const Schema* schema_;
Types types_;
IndexHint index_hint_;
MemTimeTable table_;
OrderType order_type_;
};
class Window : public MemTimeTableHandler {
public:
enum WindowFrameType {
kFrameRows,
kFrameRowsRange,
kFrameRowsMergeRowsRange
};
Window()
: MemTimeTableHandler(),
exclude_current_time_(false),
instance_not_in_window_(false) {}
virtual ~Window() {}
std::unique_ptr<RowIterator> GetIterator() override {
std::unique_ptr<vm::MemTimeTableIterator> it(
new vm::MemTimeTableIterator(&table_, schema_));
return std::move(it);
}
RowIterator* GetRawIterator() {
return new vm::MemTimeTableIterator(&table_, schema_);
}
virtual bool BufferData(uint64_t key, const Row& row) = 0;
virtual void PopBackData() { PopBackRow(); }
virtual void PopFrontData() = 0;
virtual const uint64_t GetCount() { return table_.size(); }
virtual Row At(uint64_t pos) {
if (pos >= table_.size()) {
return Row();
} else {
return table_[pos].second;
}
}
const std::string GetHandlerTypeName() override { return "Window"; }
const bool instance_not_in_window() const {
return instance_not_in_window_;
}
void set_instance_not_in_window(const bool flag) {
instance_not_in_window_ = flag;
}
const bool exclude_current_time() const { return exclude_current_time_; }
void set_exclude_current_time(const bool flag) {
exclude_current_time_ = flag;
}
protected:
bool exclude_current_time_;
bool instance_not_in_window_;
};
class WindowRange {
public:
WindowRange()
: frame_type_(Window::kFrameRows),
start_offset_(0),
end_offset_(0),
start_row_(0),
end_row_(0),
max_size_(0) {}
WindowRange(Window::WindowFrameType frame_type, int64_t start_offset,
int64_t end_offset, uint64_t rows_preceding, uint64_t max_size)
: frame_type_(frame_type),
start_offset_(start_offset),
end_offset_(end_offset),
start_row_(rows_preceding),
end_row_(0),
max_size_(max_size) {}
virtual ~WindowRange() {}
static WindowRange CreateRowsWindow(uint64_t rows_preceding) {
return WindowRange(Window::kFrameRows, 0, 0, rows_preceding, 0);
}
static WindowRange CreateRowsRangeWindow(int64_t start_offset,
int64_t end_offset,
uint64_t max_size = 0) {
return WindowRange(Window::kFrameRowsRange, start_offset, end_offset, 0,
max_size);
}
static WindowRange CreateRowsMergeRowsRangeWindow(int64_t start_offset,
uint64_t rows_preceding,
uint64_t max_size = 0) {
return WindowRange(Window::kFrameRowsMergeRowsRange, start_offset, 0,
rows_preceding, max_size);
}
enum WindowPositionStatus {
kInWindow,
kExceedWindow,
kBeforeWindow,
};
inline const WindowPositionStatus GetWindowPositionStatus(
bool out_of_rows, bool before_window, bool exceed_window) const {
switch (frame_type_) {
case Window::WindowFrameType::kFrameRows:
return out_of_rows ? kExceedWindow : kInWindow;
case Window::WindowFrameType::kFrameRowsMergeRowsRange: {
return out_of_rows
? (before_window
? kBeforeWindow
: exceed_window ? kExceedWindow : kInWindow)
: kInWindow;
}
case Window::WindowFrameType::kFrameRowsRange:
return exceed_window
? kExceedWindow
: before_window ? kBeforeWindow : kInWindow;
default:
return kExceedWindow;
}
return kExceedWindow;
}
Window::WindowFrameType frame_type_;
int64_t start_offset_;
int64_t end_offset_;
uint64_t start_row_;
uint64_t end_row_;
uint64_t max_size_;
};
/**
* |start_ts.............end_ts| current_ts|
* |.............history window| current history buffer|
*/
class HistoryWindow : public Window {
public:
explicit HistoryWindow(const WindowRange& window_range)
: Window(), window_range_(window_range), current_history_buffer_() {}
~HistoryWindow() {}
virtual void PopFrontData() {
if (current_history_buffer_.empty()) {
PopFrontRow();
} else {
current_history_buffer_.pop_front();
}
}
virtual void PopEffectiveData() {
if (!table_.empty()) {
PopFrontRow();
}
}
bool BufferData(uint64_t key, const Row& row) {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
return false;
}
auto cur_size = table_.size();
if (cur_size < window_range_.start_row_) {
// current row InWindow
int64_t sub = (key + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
if (0 == window_range_.end_offset_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);
}
} else if (0 == window_range_.end_offset_) {
// current InWindow
int64_t sub = (key + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
// current row BeforeWindow
int64_t sub = (key + window_range_.end_offset_);
uint64_t end_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentHistoryBuffer(key, row, end_ts);
}
}
protected:
bool BufferCurrentHistoryBuffer(uint64_t key, const Row& row,
uint64_t end_ts) {
current_history_buffer_.emplace_front(std::make_pair(key, row));
int64_t sub = (key + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
while (!current_history_buffer_.empty()) {
auto& back = current_history_buffer_.back();
if (back.first > end_ts) {
break;
}
BufferEffectiveWindow(back.first, back.second, start_ts);
current_history_buffer_.pop_back();
}
return true;
}
bool BufferEffectiveWindow(uint64_t key, const Row& row,
uint64_t start_ts) {
AddFrontRow(key, row);
auto cur_size = table_.size();
while (window_range_.max_size_ > 0 &&
cur_size > window_range_.max_size_) {
PopBackRow();
--cur_size;
}
// Slide window when window size >= rows_preceding
while (cur_size > 0) {
const auto& pair = GetBackRow();
if ((kFrameRows == window_range_.frame_type_ ||
kFrameRowsMergeRowsRange == window_range_.frame_type_) &&
cur_size <= window_range_.start_row_ + 1) {
break;
}
if (kFrameRows == window_range_.frame_type_ ||
pair.first < start_ts) {
PopBackRow();
--cur_size;
} else {
break;
}
}
return true;
}
bool BufferCurrentTimeBuffer(uint64_t key, const Row& row,
uint64_t start_ts) {
if (!exclude_current_time_) {
return BufferEffectiveWindow(key, row, start_ts);
} else {
PopEffectiveData();
BufferCurrentHistoryBuffer(key, row, key - 1);
return BufferEffectiveWindow(key, row, start_ts);
}
}
WindowRange window_range_;
MemTimeTable current_history_buffer_;
};
/**
* |start_ts....................................current_ts|
* |................................current history window|
* current history window is effective window
* there is no current_history_buffer_
*/
class CurrentHistoryWindow : public HistoryWindow {
public:
explicit CurrentHistoryWindow(const WindowRange& window_range)
: HistoryWindow(window_range) {}
explicit CurrentHistoryWindow(const Window::WindowFrameType window_frame,
uint64_t start_offset, uint64_t max_size)
: HistoryWindow(
WindowRange(window_frame, start_offset, 0, 0, max_size)) {}
explicit CurrentHistoryWindow(const Window::WindowFrameType window_frame,
uint64_t start_offset, uint64_t start_rows,
uint64_t max_size)
: HistoryWindow(WindowRange(window_frame, start_offset, 0, start_rows,
max_size)) {}
~CurrentHistoryWindow() {}
virtual void PopFrontData() { PopFrontRow(); }
bool BufferData(uint64_t key, const Row& row) {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
return false;
}
int64_t sub = (key + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
if (exclude_current_time_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);
}
}
};
typedef std::map<std::string,
std::map<std::string, std::shared_ptr<MemTimeTableHandler>>>
MemTables;
typedef std::map<std::string, std::shared_ptr<type::Database>> Databases;
class MemSegmentHandler : public TableHandler {
public:
MemSegmentHandler(std::shared_ptr<PartitionHandler> partition_hander,
const std::string& key)
: partition_hander_(partition_hander), key_(key) {}
virtual ~MemSegmentHandler() {}
inline const vm::Schema* GetSchema() {
return partition_hander_->GetSchema();
}
inline const std::string& GetName() { return partition_hander_->GetName(); }
inline const std::string& GetDatabase() {
return partition_hander_->GetDatabase();
}
inline const vm::Types& GetTypes() { return partition_hander_->GetTypes(); }
inline const vm::IndexHint& GetIndex() {
return partition_hander_->GetIndex();
}
const OrderType GetOrderType() const {
return partition_hander_->GetOrderType();
}
std::unique_ptr<vm::RowIterator> GetIterator() {
auto iter = partition_hander_->GetWindowIterator();
if (iter) {
iter->Seek(key_);
return iter->Valid() ? iter->GetValue()
: std::unique_ptr<RowIterator>();
}
return std::unique_ptr<RowIterator>();
}
RowIterator* GetRawIterator() override {
auto iter = partition_hander_->GetWindowIterator();
if (iter) {
iter->Seek(key_);
return iter->Valid() ? iter->GetRawValue() : nullptr;
}
return nullptr;
}
std::unique_ptr<vm::WindowIterator> GetWindowIterator(
const std::string& idx_name) {
LOG(WARNING) << "SegmentHandler can't support window iterator";
return std::unique_ptr<WindowIterator>();
}
virtual const uint64_t GetCount() {
auto iter = GetIterator();
if (!iter) {
return 0;
}
iter->SeekToFirst();
uint64_t cnt = 0;
while (iter->Valid()) {
cnt++;
iter->Next();
}
return cnt;
}
Row At(uint64_t pos) override {
auto iter = GetIterator();
if (!iter) {
return Row();
}
iter->SeekToFirst();
while (pos-- > 0 && iter->Valid()) {
iter->Next();
}
return iter->Valid() ? iter->GetValue() : Row();
}
const std::string GetHandlerTypeName() override {
return "MemSegmentHandler";
}
private:
std::shared_ptr<vm::PartitionHandler> partition_hander_;
std::string key_;
};
class MemPartitionHandler
: public PartitionHandler,
public std::enable_shared_from_this<PartitionHandler> {
public:
MemPartitionHandler();
explicit MemPartitionHandler(const Schema* schema);
MemPartitionHandler(const std::string& table_name, const std::string& db,
const Schema* schema);
~MemPartitionHandler();
const Types& GetTypes() override;
const IndexHint& GetIndex() override;
const Schema* GetSchema() override;
const std::string& GetName() override;
const std::string& GetDatabase() override;
virtual std::unique_ptr<WindowIterator> GetWindowIterator();
bool AddRow(const std::string& key, uint64_t ts, const Row& row);
void Sort(const bool is_asc);
void Reverse();
void Print();
virtual const uint64_t GetCount() { return partitions_.size(); }
virtual std::shared_ptr<TableHandler> GetSegment(const std::string& key) {
return std::shared_ptr<MemSegmentHandler>(
new MemSegmentHandler(shared_from_this(), key));
}
void SetOrderType(const OrderType order_type) { order_type_ = order_type; }
const OrderType GetOrderType() const { return order_type_; }
const std::string GetHandlerTypeName() override {
return "MemPartitionHandler";
}
private:
std::string table_name_;
std::string db_;
const Schema* schema_;
MemSegmentMap partitions_;
Types types_;
IndexHint index_hint_;
OrderType order_type_;
};
class ConcatTableHandler : public MemTimeTableHandler {
public:
ConcatTableHandler(std::shared_ptr<TableHandler> left, size_t left_slices,
std::shared_ptr<TableHandler> right, size_t right_slices)
: MemTimeTableHandler(),
status_(base::Status::Running()),
left_(left),
left_slices_(left_slices),
right_(right),
right_slices_(right_slices) {}
~ConcatTableHandler() {}
Row At(uint64_t pos) override {
if (!status_.isRunning()) {
return MemTimeTableHandler::At(pos);
}
status_ = SyncValue();
return MemTimeTableHandler::At(pos);
}
std::unique_ptr<RowIterator> GetIterator() {
if (status_.isRunning()) {
status_ = SyncValue();
}
return MemTimeTableHandler::GetIterator();
}
RowIterator* GetRawIterator() {
if (status_.isRunning()) {
status_ = SyncValue();
}
return MemTimeTableHandler::GetRawIterator();
}
virtual const uint64_t GetCount() {
if (status_.isRunning()) {
status_ = SyncValue();
}
return MemTimeTableHandler::GetCount();
}
private:
base::Status SyncValue() {
DLOG(INFO) << "Sync... concat left table and right table";
if (!left_) {
return base::Status::OK();
}
auto left_iter = left_->GetIterator();
if (!left_iter) {
return base::Status::OK();
}
auto right_iter = std::unique_ptr<RowIterator>();
if (right_) {
right_iter = right_->GetIterator();
}
left_iter->SeekToFirst();
while (left_iter->Valid()) {
if (!right_iter || !right_iter->Valid()) {
AddRow(left_iter->GetKey(),
Row(left_slices_, left_iter->GetValue(), right_slices_,
Row()));
} else {
AddRow(left_iter->GetKey(),
Row(left_slices_, left_iter->GetValue(), right_slices_,
right_iter->GetValue()));
right_iter->Next();
}
left_iter->Next();
}
return base::Status::OK();
}
base::Status status_;
std::shared_ptr<TableHandler> left_;
size_t left_slices_;
std::shared_ptr<TableHandler> right_;
size_t right_slices_;
};
class MemCatalog : public Catalog {
public:
MemCatalog();
~MemCatalog();
bool Init();
std::shared_ptr<type::Database> GetDatabase(const std::string& db) {
return dbs_[db];
}
std::shared_ptr<TableHandler> GetTable(const std::string& db,
const std::string& table_name) {
return tables_[db][table_name];
}
bool IndexSupport() override { return true; }
private:
MemTables tables_;
Databases dbs_;
};
/**
* Result table handler specified for request union:
* (1) The first row is fixed to be the request row
* (2) O(1) time construction
*/
class RequestUnionTableHandler : public TableHandler {
public:
RequestUnionTableHandler(uint64_t request_ts, const Row& request_row,
const std::shared_ptr<TableHandler>& window)
: request_ts_(request_ts), request_row_(request_row), window_(window) {}
~RequestUnionTableHandler() {}
std::unique_ptr<RowIterator> GetIterator() override {
return std::unique_ptr<RowIterator>(GetRawIterator());
}
RowIterator* GetRawIterator() override;
const Types& GetTypes() override { return window_->GetTypes(); }
const IndexHint& GetIndex() override { return window_->GetIndex(); }
std::unique_ptr<WindowIterator> GetWindowIterator(const std::string&) {
return nullptr;
}
const OrderType GetOrderType() const { return window_->GetOrderType(); }
const Schema* GetSchema() override { return window_->GetSchema(); }
const std::string& GetName() override { return window_->GetName(); }
const std::string& GetDatabase() override { return window_->GetDatabase(); }
private:
uint64_t request_ts_;
const Row request_row_;
std::shared_ptr<TableHandler> window_;
};
// row iter interfaces for llvm
void GetRowIter(int8_t* input, int8_t* iter);
bool RowIterHasNext(int8_t* iter);
void RowIterNext(int8_t* iter);
int8_t* RowIterGetCurSlice(int8_t* iter, size_t idx);
size_t RowIterGetCurSliceSize(int8_t* iter, size_t idx);
void RowIterDelete(int8_t* iter);
int8_t* RowGetSlice(int8_t* row_ptr, size_t idx);
size_t RowGetSliceSize(int8_t* row_ptr, size_t idx);
} // namespace vm
} // namespace hybridse
#endif // HYBRIDSE_INCLUDE_VM_MEM_CATALOG_H_