-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathcls_tabular.cc
2433 lines (2083 loc) · 88.7 KB
/
cls_tabular.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (C) 2018 The Regents of the University of California
* All Rights Reserved
*
* This library can redistribute it and/or modify under the terms
* of the GNU Lesser General Public License Version 2.1 as published
* by the Free Software Foundation.
*
*/
#include "cls_tabular.h"
#include "cls_tabular_utils.h"
#include "cls_tabular_processing.h"
#include <errno.h>
#include <string>
#include <sstream>
#include <boost/lexical_cast.hpp>
#include <time.h>
#include "re2/re2.h"
#include "include/types.h"
#include "objclass/objclass.h"
CLS_VER(1,0)
CLS_NAME(tabular)
cls_handle_t h_class;
cls_method_handle_t h_exec_query_op;
cls_method_handle_t h_example_query_op;
cls_method_handle_t h_test_query_op;
cls_method_handle_t h_hep_query_op;
cls_method_handle_t h_wasm_query_op;
cls_method_handle_t h_exec_runstats_op;
cls_method_handle_t h_build_index;
cls_method_handle_t h_exec_build_sky_index_op;
cls_method_handle_t h_transform_db_op;
cls_method_handle_t h_freelockobj_query_op;
cls_method_handle_t h_inittable_group_obj_query_op;
cls_method_handle_t h_getlockobj_query_op;
cls_method_handle_t h_acquirelockobj_query_op;
cls_method_handle_t h_createlockobj_query_op;
void cls_log_message(std::string msg, bool is_err = false, int log_level = 20) {
if (is_err)
CLS_ERR("skyhook: %s", msg.c_str());
else
CLS_LOG(log_level,"skyhook: %s", msg.c_str());
}
static inline uint64_t __getns(clockid_t clock)
{
struct timespec ts;
int ret = clock_gettime(clock, &ts);
assert(ret == 0);
return (((uint64_t)ts.tv_sec) * 1000000000ULL) + ts.tv_nsec;
}
static inline uint64_t getns()
{
return __getns(CLOCK_MONOTONIC);
}
// extract bytes as string for regex matching
static std::string string_ncopy(const char* buffer, std::size_t buffer_size) {
const char* copyupto = std::find(buffer, buffer + buffer_size, 0);
return std::string(buffer, copyupto);
}
// Get fb_seq_num from xattr, if not present set to min val
static
int get_fb_seq_num(cls_method_context_t hctx, unsigned int& fb_seq_num) {
bufferlist fb_bl;
int ret = cls_cxx_getxattr(hctx, "fb_seq_num", &fb_bl);
if (ret == -ENOENT || ret == -ENODATA) {
fb_seq_num = Tables::DATASTRUCT_SEQ_NUM_MIN;
// If fb_seq_num is not present then insert it in xattr.
}
else if (ret < 0) {
return ret;
}
else {
try {
bufferlist::iterator it = fb_bl.begin();
::decode(fb_seq_num,it);
} catch (const buffer::error &err) {
CLS_ERR("ERROR: cls_tabular:get_fb_seq_num: decoding fb_seq_num");
return -EINVAL;
}
}
return 0;
}
// Insert fb_seq_num to xattr
static
int set_fb_seq_num(cls_method_context_t hctx, unsigned int fb_seq_num) {
bufferlist fb_bl;
::encode(fb_seq_num, fb_bl);
int ret = cls_cxx_setxattr(hctx, "fb_seq_num", &fb_bl);
if (ret < 0) {
return ret;
}
return 0;
}
/*
* Build a skyhook index, insert to omap.
* Index types are
* 1. fb_index: points (physically within the object) to the fb
* <string fb_num, struct idx_fb_entry>
* where fb_num is a sequence number of flatbufs within an obj
*
* 2. rec_index: points (logically within the fb) to the relevant row
* <string rec-val, struct idx_rec_entry>
* where rec-val is the col data value(s) or RID
*
*/
static
int exec_build_sky_index_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
// iterate over all fbs within an obj and create 2 indexes:
// 1. for each fb, create idx_fb_entry (physical fb offset)
// 2. for each row of an fb, create idx_rec_entry (logical row offset)
// a ceph property encoding the len of each bl in front of the bl,
// seems to be an int32 currently.
const int ceph_bl_encoding_len = sizeof(int32_t);
// fb_seq_num is stored in xattrs and used as a stable counter of the
// current number of fbs in the object.
unsigned int fb_seq_num = Tables::DATASTRUCT_SEQ_NUM_MIN;
int ret = get_fb_seq_num(hctx, fb_seq_num);
if (ret < 0) {
CLS_ERR("ERROR: exec_build_sky_index_op: fb_seq_num entry from xattr %d", ret);
return ret;
}
std::string key_fb_prefix;
std::string key_data_prefix;
std::string key_data;
std::string key;
std::map<std::string, bufferlist> fbs_index;
std::map<std::string, bufferlist> recs_index;
std::map<std::string, bufferlist> rids_index;
std::map<std::string, bufferlist> txt_index;
// extract the index op instructions from the input bl
idx_op op;
try {
bufferlist::iterator it = in->begin();
::decode(op, it);
} catch (const buffer::error &err) {
CLS_ERR("ERROR: exec_build_sky_index_op decoding idx_op");
return -EINVAL;
}
Tables::schema_vec idx_schema = Tables::schemaFromString(op.idx_schema_str);
// obj contains one bl that itself wraps a seq of encoded bls of skyhook fb
bufferlist wrapped_bls;
ret = cls_cxx_read(hctx, 0, 0, &wrapped_bls);
if (ret < 0) {
CLS_ERR("ERROR: exec_build_sky_index_op: reading obj. %d", ret);
return ret;
}
// decode and process each wrapped bl (each bl contains 1 flatbuf)
uint64_t off = 0;
ceph::bufferlist::iterator it = wrapped_bls.begin();
uint64_t obj_len = it.get_remaining();
while (it.get_remaining() > 0) {
off = obj_len - it.get_remaining();
ceph::bufferlist bl;
try {
::decode(bl, it); // unpack the next bl
} catch (ceph::buffer::error&) {
assert(Tables::BuildSkyIndexDecodeBlsErr==0);
}
const char* fb = bl.c_str(); // get fb as contiguous bytes
int fb_len = bl.length();
Tables::sky_root root = Tables::getSkyRoot(fb, fb_len);
// DATA LOCATION INDEX (PHYSICAL data reference):
// IDX_FB get the key prefix and key_data (fb sequence num)
++fb_seq_num;
key_fb_prefix = buildKeyPrefix(Tables::SIT_IDX_FB, root.db_schema_name,
root.table_name);
std::string str_seq_num = Tables::u64tostr(fb_seq_num); // key data
int len = str_seq_num.length();
// create string of len min chars per type, int32 here
int pos = len - 10;
key_data = str_seq_num.substr(pos, len);
// IDX_FB create the entry struct, encode into bufferlist
bufferlist fb_bl;
struct idx_fb_entry fb_ent(off, fb_len + ceph_bl_encoding_len);
::encode(fb_ent, fb_bl);
key = key_fb_prefix + key_data;
fbs_index[key] = fb_bl;
// DATA CONTENT INDEXES (LOGICAL data reference):
// Build the key prefixes for each index type (IDX_RID/IDX_REC/IDX_TXT)
if (op.idx_type == Tables::SIT_IDX_RID) {
std::vector<std::string> index_cols;
index_cols.push_back(Tables::RID_INDEX);
key_data_prefix = buildKeyPrefix(Tables::SIT_IDX_RID,
root.db_schema_name,
root.table_name,
index_cols);
}
if (op.idx_type == Tables::SIT_IDX_REC or
op.idx_type == Tables::SIT_IDX_TXT) {
std::vector<std::string> keycols;
for (auto it = idx_schema.begin(); it != idx_schema.end(); ++it) {
keycols.push_back(it->name);
}
key_data_prefix = Tables::buildKeyPrefix(op.idx_type,
root.db_schema_name,
root.table_name,
keycols);
}
// IDX_REC/IDX_RID/IDX_TXT: create the key data for each row
for (uint32_t i = 0; i < root.nrows; i++) {
Tables::sky_rec rec = Tables::getSkyRec(static_cast<Tables::row_offs>(root.data_vec)->Get(i));
switch (op.idx_type) {
case Tables::SIT_IDX_RID: {
// key_data is just the RID val
key_data = Tables::u64tostr(rec.RID);
// create the entry, encode into bufferlist, update map
bufferlist rec_bl;
struct idx_rec_entry rec_ent(fb_seq_num, i, rec.RID);
::encode(rec_ent, rec_bl);
key = key_data_prefix + key_data;
recs_index[key] = rec_bl;
break;
}
case Tables::SIT_IDX_REC: {
// key data is built up from the relevant col vals
key_data.clear();
auto row = rec.data.AsVector();
for (unsigned i = 0; i < idx_schema.size(); i++) {
if (i > 0) key_data += Tables::IDX_KEY_DELIM_INNER;
key_data += Tables::buildKeyData(
idx_schema[i].type,
row[idx_schema[i].idx].AsUInt64());
}
// to enforce uniqueness, append RID to key data
if (!op.idx_unique) {
key_data += (Tables::IDX_KEY_DELIM_OUTER +
Tables::IDX_KEY_DELIM_UNIQUE +
Tables::IDX_KEY_DELIM_INNER +
std::to_string(rec.RID));
}
// create the entry, encode into bufferlist, update map
bufferlist rec_bl;
struct idx_rec_entry rec_ent(fb_seq_num, i, rec.RID);
::encode(rec_ent, rec_bl);
key = key_data_prefix + key_data;
recs_index[key] = rec_bl;
break;
}
case Tables::SIT_IDX_TXT: {
// add each word in the row to a words vector, store as
// lower case and and preserve word sequence order.
std::vector<std::pair<std::string, int>> words;
std::string text_delims;
if (!op.idx_text_delims.empty())
text_delims = op.idx_text_delims;
else
text_delims = " \t\r\f\v\n"; // whitespace chars
auto row = rec.data.AsVector();
for (unsigned i = 0; i < idx_schema.size(); i++) {
if (i > 0) key_data += Tables::IDX_KEY_DELIM_INNER;
std::string line = \
row[idx_schema[i].idx].AsString().str();
boost::trim(line);
if (line.empty())
continue;
vector<std::string> elems;
boost::split(elems, line, boost::is_any_of(text_delims),
boost::token_compress_on);
for (uint32_t i = 0; i < elems.size(); i++) {
std::string word = \
boost::algorithm::to_lower_copy(elems[i]);
boost::trim(word);
// skip stopwords?
if (op.idx_ignore_stopwords and
Tables::IDX_STOPWORDS.count(word) > 0) {
continue;
}
words.push_back(std::make_pair(elems[i], i));
}
}
// now create a key and val (an entry struct) for each
// word extracted from line
for (auto it = words.begin(); it != words.end(); ++it) {
key_data.clear();
std::string word = it->first;
key_data += word;
// add the RID for uniqueness,
// in case of repeated words within all rows
key_data += (Tables::IDX_KEY_DELIM_OUTER +
Tables::IDX_KEY_DELIM_UNIQUE +
Tables::IDX_KEY_DELIM_INNER +
std::to_string(rec.RID));
// add the word pos for uniqueness,
// in case of repeated words within same row
int word_pos = it->second;
key_data += (Tables::IDX_KEY_DELIM_INNER +
std::to_string(word_pos));
// create the entry, encode into bufferlist, update map
bufferlist txt_bl;
struct idx_txt_entry txt_ent(fb_seq_num, i,
rec.RID, word_pos);
::encode(txt_ent, txt_bl);
key = key_data_prefix + key_data;
txt_index[key] = txt_bl;
/*CLS_LOG(20,"kv=%s",
(key+";"+txt_ent.toString()).c_str());*/
}
break;
}
default: {
CLS_ERR("exec_build_sky_index_op: %s", (
"Index type unknown. type=" +
std::to_string(op.idx_type)).c_str());
}
}
// IDX_REC/IDX_RID batch insert to omap (minimize IOs)
if (recs_index.size() > op.idx_batch_size) {
ret = cls_cxx_map_set_vals(hctx, &recs_index);
if (ret < 0) {
CLS_ERR("exec_build_sky_index_op: error setting recs index entries %d", ret);
return ret;
}
recs_index.clear();
}
// IDX_TXT batch insert to omap (minimize IOs)
if (txt_index.size() > op.idx_batch_size) {
ret = cls_cxx_map_set_vals(hctx, &txt_index);
if (ret < 0) {
CLS_ERR("exec_build_sky_index_op: error setting recs index entries %d", ret);
return ret;
}
txt_index.clear();
}
} // end foreach row
// IDX_FB batch insert to omap (minimize IOs)
if (fbs_index.size() > op.idx_batch_size) {
ret = cls_cxx_map_set_vals(hctx, &fbs_index);
if (ret < 0) {
CLS_ERR("exec_build_sky_index_op: error setting fbs index entries %d", ret);
return ret;
}
fbs_index.clear();
}
} // end while decode wrapped_bls
// IDX_TXT insert remaining entries to omap
if (txt_index.size() > 0) {
ret = cls_cxx_map_set_vals(hctx, &txt_index);
if (ret < 0) {
CLS_ERR("exec_build_sky_index_op: error setting recs index entries %d", ret);
return ret;
}
}
// IDX_REC/IDX_RID insert remaining entries to omap
if (recs_index.size() > 0) {
ret = cls_cxx_map_set_vals(hctx, &recs_index);
if (ret < 0) {
CLS_ERR("exec_build_sky_index_op: error setting recs index entries %d", ret);
return ret;
}
}
// IDX_FB insert remaining entries to omap
if (fbs_index.size() > 0) {
ret = cls_cxx_map_set_vals(hctx, &fbs_index);
if (ret < 0) {
CLS_ERR("exec_build_sky_index_op: error setting fbs index entries %d", ret);
return ret;
}
}
// Update counter and Insert fb_seq_num to xattr
ret = set_fb_seq_num(hctx, fb_seq_num);
if(ret < 0) {
CLS_ERR("exec_build_sky_index_op: error setting fb_seq_num entry to xattr %d", ret);
return ret;
}
// LASTLY insert a marker key to indicate this index exists,
// here we are using the key prefix with no data vals
// TODO: make this a valid entry (not empty_bl), but with empty vals.
bufferlist empty_bl;
empty_bl.append("");
std::map<std::string, bufferlist> index_exists_marker;
index_exists_marker[key_data_prefix] = empty_bl;
ret = cls_cxx_map_set_vals(hctx, &index_exists_marker);
if (ret < 0) {
CLS_ERR("exec_build_sky_index_op: error setting index_exists_marker %d", ret);
return ret;
}
return 0;
}
/*
* Build an index from the primary key (orderkey,linenum), insert to omap.
* Index contains <k=primarykey, v=offset of row within BL>
*/
static
int build_index(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
uint32_t batch_size;
try {
bufferlist::iterator it = in->begin();
::decode(batch_size, it);
} catch (const buffer::error &err) {
CLS_ERR("ERROR: decoding batch_size");
return -EINVAL;
}
bufferlist bl;
int ret = cls_cxx_read(hctx, 0, 0, &bl);
if (ret < 0) {
CLS_ERR("ERROR: reading obj %d", ret);
return ret;
}
const size_t row_size = 141;
const char *rows = bl.c_str();
const size_t num_rows = bl.length() / row_size;
const size_t order_key_field_offset = 0;
const size_t line_number_field_offset = 12;
std::map<string, bufferlist> index;
// read all rows and extract the key fields
for (size_t rid = 0; rid < num_rows; rid++) {
const char *row = rows + rid * row_size;
const char *o_vptr = row + order_key_field_offset;
const int order_key_val = *(const int*)o_vptr;
const char *l_vptr = row + line_number_field_offset;
const int line_number_val = *(const int*)l_vptr;
// key
uint64_t key = ((uint64_t)order_key_val) << 32;
key |= (uint32_t)line_number_val;
const std::string strkey = Tables::u64tostr(key);
// val
bufferlist row_offset_bl;
const size_t row_offset = rid * row_size;
::encode(row_offset, row_offset_bl);
if (index.count(strkey) != 0)
return -EINVAL;
index[strkey] = row_offset_bl;
if (index.size() > batch_size) {
int ret = cls_cxx_map_set_vals(hctx, &index);
if (ret < 0) {
CLS_ERR("error setting index entries %d", ret);
return ret;
}
index.clear();
}
}
if (!index.empty()) {
int ret = cls_cxx_map_set_vals(hctx, &index);
if (ret < 0) {
CLS_ERR("error setting index entries %d", ret);
return ret;
}
}
return 0;
}
// busy loop work
volatile uint64_t __tabular_x;
static void add_extra_row_cost(uint64_t cost)
{
for (uint64_t i = 0; i < cost; i++) {
__tabular_x += i;
}
}
static
int
update_idx_reads(
cls_method_context_t hctx,
std::map<int, struct Tables::read_info>& idx_reads,
bufferlist bl,
std::string key_fb_prefix,
std::string key_data_prefix) {
struct idx_rec_entry rec_ent;
int ret = 0;
try {
bufferlist::iterator it = bl.begin();
::decode(rec_ent, it);
} catch (const buffer::error &err) {
CLS_ERR("ERROR: decoding query idx_rec_ent");
return -EINVAL;
}
// keep track of the specified row num for this record
std::vector<unsigned int> row_nums;
row_nums.push_back(rec_ent.row_num);
// now build the key to lookup the corresponding flatbuf entry
std::string key_data = Tables::buildKeyData(Tables::SDT_INT32, rec_ent.fb_num);
std::string key = key_fb_prefix + key_data;
struct idx_fb_entry fb_ent;
bufferlist bl1;
ret = cls_cxx_map_get_val(hctx, key, &bl1);
if (ret < 0) {
if (ret == -ENOENT) {
CLS_LOG(20,"WARN: NO FB key ENTRY FOUND!! ret=%d", ret);
}
else {
CLS_ERR("cant read map val index for idx_fb_key, %d", ret);
return ret;
}
}
if (ret >= 0) {
try {
bufferlist::iterator it = bl1.begin();
::decode(fb_ent, it);
} catch (const buffer::error &err) {
CLS_ERR("ERROR: decoding query idx_fb_ent");
return -EINVAL;
}
// our reads are indexed by fb_num
// either add these row nums to the existing read_info
// struct for the given fb_num, or create a new one
auto it = idx_reads.find(rec_ent.fb_num);
if (it != idx_reads.end()) {
it->second.rnums.insert(it->second.rnums.end(),
row_nums.begin(),
row_nums.end());
}
else {
idx_reads[rec_ent.fb_num] = \
Tables::read_info(rec_ent.fb_num,
fb_ent.off,
fb_ent.len,
row_nums);
}
}
return 0;
}
/*
* Lookup matching records in omap, based on the index specified and the
* index predicates. Set the idx_reads info vector with the corresponding
* flatbuf off/len and row numbers for each matching record.
*/
static
int
read_fbs_index(
cls_method_context_t hctx,
std::string key_fb_prefix,
std::map<int, struct Tables::read_info>& reads)
{
using namespace Tables;
int ret = 0;
unsigned int seq_min = Tables::DATASTRUCT_SEQ_NUM_MIN;
unsigned int seq_max = Tables::DATASTRUCT_SEQ_NUM_MIN;
// get the actual max fb seq number
ret = get_fb_seq_num(hctx, seq_max);
if (ret < 0) {
CLS_ERR("error getting fb_seq_num entry from xattr %d", ret);
return ret;
}
// fb seq num grow monotically, so try to read each key
for (unsigned int i = seq_min; i <= seq_max; i++) {
// create key for this seq num.
std::string key_data = Tables::buildKeyData(Tables::SDT_INT32, i);
std::string key = key_fb_prefix + key_data;
bufferlist bl;
ret = cls_cxx_map_get_val(hctx, key, &bl);
// a seq_num may not be present due to fb deleted/compaction
// if key not found, just continue (this is not an error)
if (ret < 0) {
if (ret == -ENOENT) {
// CLS_LOG(20, "omap entry NOT found for key=%s", key.c_str());
continue;
}
else {
CLS_ERR("Cannot read omap entry for key=%s", key.c_str());
return ret;
}
}
// key found so decode the entry and set the read vals.
if (ret >= 0) {
// CLS_LOG(20, "omap entry found for key=%s", key.c_str());
struct idx_fb_entry fb_ent;
try {
bufferlist::iterator it = bl.begin();
::decode(fb_ent, it);
} catch (const buffer::error &err) {
CLS_ERR("ERROR: decoding idx_fb_ent for key=%s", key.c_str());
return -EINVAL;
}
reads[i] = Tables::read_info(i, fb_ent.off, fb_ent.len, {});
}
}
return 0;
}
/*
Check for index existence, always used before trying to perform index reads
We check omap for the presence of the base key (prefix only), which is used
to indicate the index exists.
Note that even if key does exists, key_val_map will be empty since there is
no associated record data for this key.
*/
static
bool
sky_index_exists (cls_method_context_t hctx, std::string key_prefix)
{
std::map<std::string, bufferlist> key_val_map;
bufferlist dummy_bl;
int ret = cls_cxx_map_get_val(hctx, key_prefix, &dummy_bl);
if (ret < 0 && ret != -ENOENT) {
CLS_ERR("Cannot read idx_rec entry for key, errorcode=%d", ret);
return false;
}
// If no entries were found for this key, assume index does not exist.
if (ret == -ENOENT)
return false;
return true;
}
/*
Decide to use index or not.
Check statistics and index predicates, if expected selectivity is high
enough then use the index (return true) else if low selectivity too many
index entries will match and we should not use the index (return false).
Returning false indicates to use a table scan instead of index.
*/
static
bool
use_sky_index(
cls_method_context_t hctx,
std::string index_prefix,
Tables::predicate_vec index_preds)
{
// we assume to use by default, since the planner requested it.
bool use_index = true;
bufferlist bl;
int ret = cls_cxx_map_get_val(hctx, index_prefix, &bl);
if (ret < 0 && ret != -ENOENT) {
CLS_ERR("Cannot read idx_rec entry for key, errorcode=%d", ret);
return false;
}
// If an entry was found for this index key, check the statistics for
// each predicate to see if it is expected to be highly selective.
if (ret != -ENOENT) {
// TODO: this should be based on a cost model, not a fixed value.
const float SELECTIVITY_HIGH_VAL = 0.10;
// for each pred, check the bl idx_stats struct and decide selectivity
for (auto it = index_preds.begin(); it != index_preds.end(); ++it) {
// TODO: compute this from the predicate value and stats struct
float expected_selectivity = 0.10;
if (expected_selectivity <= SELECTIVITY_HIGH_VAL)
use_index &= true;
else
use_index &= false;
}
}
return use_index;
}
/*
* Lookup matching records in omap, based on the index specified and the
* index predicates. Set the idx_reads info vector with the corresponding
* flatbuf off/len and row numbers for each matching record.
*/
static
int
read_sky_index(
cls_method_context_t hctx,
Tables::predicate_vec index_preds,
std::string key_fb_prefix,
std::string key_data_prefix,
int index_type,
int idx_batch_size,
std::map<int, struct Tables::read_info>& idx_reads) {
using namespace Tables;
int ret = 0, ret2 = 0;
std::vector<std::string> keys; // to contain all keys found after lookups
// for each fb_seq_num, a corresponding read_info struct to
// indicate the relevant rows within a given fb.
// fb_seq_num is used as key, so that subsequent reads will always be from
// a higher byte offset, if that matters.
// build up the key data portion from the idx pred vals.
// assumes all indexes here are integers, we extract the predicate vals
// as ints and use our uint to padded string method to build the keys
std::string key_data;
for (unsigned i = 0; i < index_preds.size(); i++) {
uint64_t val = 0;
switch (index_preds[i]->colType()) {
case SDT_INT8:
case SDT_INT16:
case SDT_INT32:
case SDT_INT64: { // TODO: support signed ints in index ranges
int64_t v = 0;
extract_typedpred_val(index_preds[i], v);
val = static_cast<uint64_t>(v); // force to unsigned for now.
break;
}
case SDT_UINT8:
case SDT_UINT16:
case SDT_UINT32:
case SDT_UINT64: {
extract_typedpred_val(index_preds[i], val);
break;
}
default:
assert (BuildSkyIndexUnsupportedColType==0);
}
if (i > 0) // add delim for multicol index vals
key_data += IDX_KEY_DELIM_INNER;
key_data += buildKeyData(index_preds[i]->colType(), val);
}
std::string key = key_data_prefix + key_data;
// Add base key when all index predicates include equality
if (check_predicate_ops_all_include_equality(index_preds)) {
keys.push_back(key);
}
// Find the starting key for range query keys:
// 1. Greater than predicates we start after the base key,
// 2. Less than predicates we start after the prefix of the base key.
std::string start_after = "";
if (check_predicate_ops(index_preds, SOT_geq) or
check_predicate_ops(index_preds, SOT_gt)) {
start_after = key;
}
else if (check_predicate_ops(index_preds, SOT_lt) or
check_predicate_ops(index_preds, SOT_leq)) {
start_after = key_data_prefix;
}
// Get keys in batches at a time and print row number/ offset detail
// Equality query does not loop TODO: this is not true for non-unique idx!
bool stop = false;
if (start_after.empty()) stop = true;
// Retrieve keys for range queries in batches of "max_to_get"
// until no more keys
bool more = true;
int max_to_get = idx_batch_size;
std::map<std::string, bufferlist> key_val_map;
while(!stop) {
ret2 = cls_cxx_map_get_vals(hctx, start_after, string(),
max_to_get, &key_val_map, &more);
if (ret2 < 0 && ret2 != -ENOENT) {
CLS_ERR("cant read map val index rec for idx_rec key %d", ret2);
return ret2;
}
// If no more entries found break out of the loop
if (ret2 == -ENOENT || key_val_map.size() == 0) {
break;
}
if (ret2 >= 0) {
try {
for (auto it = key_val_map.cbegin();
it != key_val_map.cend(); it++) {
const std::string& key1 = it->first;
bufferlist record_bl_entry = it->second;
// Break if keyprefix in fetched key does not match that
// passed by user, means we have gone too far, possibly
// into keys for another index.
if (key1.find(key_data_prefix) == std::string::npos) {
stop = true;
break;
}
// If this is the last key, update start_after value
if (std::next(it, 1) == key_val_map.cend()) {
start_after = key1;
}
// break if key matches or exceeds key passed by the user
// prevents going into the next index (next key prefix)
if (check_predicate_ops(index_preds, SOT_lt) or
check_predicate_ops(index_preds, SOT_leq)) {
//~ if(key_val_map.find(key) != key_val_map.end()) {
//~ stop = true;
//~ break;
//~ }
if (key1 == key) { /// TODO: is this extra check needed?
stop = true;
break;
}
else if (key1 > key) { // Special handling for leq
if (check_predicate_ops(index_preds, SOT_leq) and
compare_keys(key, key1)) {
stop = false;
}
else {
stop = true;
break;
}
}
}
// Skip equality entries in geq query
if (check_predicate_ops(index_preds, SOT_gt) and
compare_keys(key, key1)) {
continue;
}
// Set the idx_reads info vector with the corresponding
// flatbuf off/len and row numbers for each matching record
ret2 = update_idx_reads(hctx, idx_reads, record_bl_entry,
key_fb_prefix, key_data_prefix);
if(ret2 < 0)
return ret2;
}
} catch (const buffer::error &err) {
CLS_ERR("ERROR: decoding query idx_rec_ent");
return -EINVAL;
}
}
}
// lookup key in omap to get the row offset
if (!keys.empty()) {
for (unsigned i = 0; i < keys.size(); i++) {
bufferlist record_bl_entry;
ret = cls_cxx_map_get_val(hctx, keys[i], &record_bl_entry);
if (ret < 0 && ret != -ENOENT) {
CLS_ERR("cant read map val index rec for idx_rec key %d", ret);
return ret;
}
if (ret >= 0) {
ret2 = update_idx_reads(hctx,
idx_reads,
record_bl_entry,
key_fb_prefix,
key_data_prefix);
if (ret2 < 0)
return ret2;
} else {
// no rec found for key
}
}
}
return 0;
}
/*
* Primary method to process queries
*/
static
int exec_query_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
int ret = 0;
// accounting.
uint64_t read_start = 0;
uint64_t read_ns = 0;
uint64_t eval_start = 0;
uint64_t eval_ns = 0;
// result set to be returned to client.
bufferlist result_bl;
// contains the serialized user request.
query_op op;
// extract the query op to get the query request params
try {
bufferlist::iterator it = in->begin();
::decode(op, it);
} catch (const buffer::error &err) {
CLS_ERR("ERROR: exec_query_op: decoding query op failed");
return -EINVAL;
}
// remove newlines for cls logging purpose
std::string msg = op.toString();
std::replace(msg.begin(), msg.end(), '\n', ' ');
if (op.debug) {
CLS_LOG(20, "exec_query_op decoded successfully");
CLS_LOG(20, "exec_query_op op.toString()=%s", op.toString().c_str());
}
using namespace Tables;
// hold result of index lookups or read all flatbufs
bool index1_exists = false;
bool index2_exists = false;
bool use_index1 = false;
bool use_index2 = false;
std::map<int, struct read_info> reads;
std::map<int, struct read_info> idx1_reads;
std::map<int, struct read_info> idx2_reads;
// data_schema is the table's current schema
// TODO: redundant, this is also stored in the fb, extract from fb?
schema_vec data_schema = schemaFromString(op.data_schema);
// query_schema is the query schema
schema_vec query_schema = schemaFromString(op.query_schema);
// predicates to be applied, if any
predicate_vec query_preds = predsFromString(data_schema,
op.query_preds);
/* INDEXING LOOKUPS */
//
// required for index plan or scan plan if index plan not chosen.
predicate_vec index_preds;
predicate_vec index2_preds;
std::string key_fb_prefix = buildKeyPrefix(SIT_IDX_FB,
op.db_schema_name,
op.table_name);
// lookup correct flatbuf and potentially set specific row nums
// to be processed next in processFb()
if (op.index_read) {