-
Notifications
You must be signed in to change notification settings - Fork 331
SPL practice:multi index calculation in real time
Deposit details table (‘deposit’ for short):
Field name | Field type | Field meaning | Sample data |
---|---|---|---|
dt | Date | Date | 2023-11-23 |
curr | String | Currency type | 1: RMB, 0: others |
cust_no | String | Customer number | 2889412 |
dept | Number | Department number | 1-500 |
code11 | Number | Code 11 | 1, 0 |
code12 | Number | Code 12 | 1, 0 |
amt | Number | Amount | 32499.90 |
… |
This table contains a dozen fields, and stores 20 million records per day, totaling 600 million records for 30 days.
Customer table (‘cust’ for short):
Field name | Field type | Field meaning | Sample data |
---|---|---|---|
dt | Date | Date | 2023-11-23 |
cust_no | String | Customer number | 2889412 |
cust_sub | Number | Customer type | 0: farmer, 1: non-farmer |
… |
This table contains 3 million customers, and stores full records per day, totaling 90 million records for 30 days.
The two tables are associated on the date field ‘dt’ and the customer number field ‘cust_no’.
Daily index refers to the index counted by day, and the filter condition must contain a date. After the ‘deposit’ and the ‘cust’ are associated, various daily indexes can be combined based on different filter conditions. For example, based on the date November 22, 2023:
1. count the indexes related to the farmers (RMB deposit) of each department such as total amount, average amount, total number of transactions. Another filter condition is that code 11 (code11) is 1. Coding in SQL is roughly as follows:
select d.dept,sum(amt),avg(amt),count(1)…
from deposit d inner join cust c
on d.cust_no=d.cust_no and d.dt=c.dt
where d.dt=to_date('20231122','yyyymmdd')
and d.curr=1 and c.cust_sub=1 and d.code11=1
group by d.dept
2. group by code 11, and then count the indexes related to the farmers (RMB deposit) such as the number of farmers after deduplication, total amount, average amount. Another condition is that the code 12 (code12) is 0.
select d.code11,count(distinct c.cust_no),sum(d.amt),avg(d.amt)…
from deposit d inner join cust c
on d.cust_no=c.cust_no and d.dt=c.dt
where d.dt=to_date('20231122','yyyymmdd')
and d.curr=1 and c.cust_sub=1 and d.code12=0
group by code11
3. group by code 12, and then count the indexes related to the farmers (RMB deposit) such as total amount, total number of transactions.
select d.ocde12, sum(d.amt),count(d.amt),…
from deposit d inner join cust c
on d.cust_no=d.cust_no and d.dt=c.dt
where d.dt=to_date('20231122','yyyymmdd')
and d.curr=1 and c.cust_sub=1
group by code12
There are a wide variety of indexes, and hundreds of indexes need to displayed simultaneously on each page of the application. Moreover, these pages also need to meet the query needs of thousands of people. Even though the cache scheme is adopted to avoid pages with the same content from being double-counted, there will still be dozens of different page tasks to be concurrently executed during the morning peak, which is equivalent to the fact that thousands of indexes need to be calculated at the same time. Although one result set can calculate about 10-20 indexes, the calculation amount of a few thousand indexes is still very large.
Current situation: using the relational database (RDB) to calculate index in real time cannot meet the performance requirement, so user had to choose a small number of indexes to calculate in advance and then store them in database for query. Therefore, the support effect of RDB on such business is poor.
Expectation: implement the real-time calculation of index and meet the needs of concurrent queries.
The real-time calculation of multiple indexes involves tremendous detailed data, and the number of indexes to be calculated simultaneously is also very large. For such calculations, it is difficult for the existing databases to achieve a second-level response.
Moreover, using the pre-calculation method also faces serious problem: the total number of indexes is too large and there are too many results to be calculated in advance, making it impossible to store them all in server. So, the survey on the requirement has to be done in advance to calculate and save a small number of indexes for query.
Deduplication is essentially the grouping operation and requires traversing the raw data. If the result after deduplication is small, we can store the deduplicated result set in memory, and search the result set through the deduplication field when traversing the raw data so as to determine whether to discard or add. If the result set exceeds the capacity of memory, the HASH partitioning method is used in general, that is, divide the raw data into several partitions and buffer them to external storage, and then deduplicate the in-memory result sets respectively. However, this will cause extra external storage read and write actions, leading to poor performance.
By observing the characteristics of multi-index calculation, we found that the field to be deduplicated and counted is relatively certain field (customer number). In this case, we can use the ordered storage mechanism of SPL to sort the data by date and customer number and store the ordered data. In this way, the customer numbers within the same day are definitely in order.
Once the customer numbers are determined to be in order, deduplication becomes simple. We just need to save the customer number that is different from the previous one one-by-one during the traversal process. Even if the memory cannot hold them, there is no need to perform partitioning. Moreover, when deduplicating and counting, we just need to increment by 1 when the customer number differs without even storing the result set. This is the fast and ordered deduplication mechanism of SPL.
Using SPL’s ordered storage mechanism can also effectively solve the performance problem of association between large tables (the ‘deposit’ and the ‘cust’).
Having sorted both the ‘deposit’ and the ‘cust’ by the association fields (date, customer number), the two tables can be regarded as in a primary-sub relationship: the ‘cust’ is the primary table; the ‘deposit’ is the sub table. We can utilize the ordered merge mechanism of SPL to merge the data of two tables in order, which has a calculation complexity of M+N.
RDB uses HASH method to implement the association between two large tables, which has a calculation complexity of SUM(Ni*Mi). By comparison, the performance of SPL’s ordered merge mechanism is much better.
Multi-index calculation is mainly to traverse the large ‘deposit’ on external storage. If the calculation of each set of indexes is accompanied by traversing the large ‘deposit’ once, then the IO of hard disk will be a bottleneck, which will make it impossible to achieve a second-level response when calculating multiple indexes. The channel functionality of SPL implements the multipurpose traversal mechanism and can calculate multiple indexes through one traversal, thereby effectively improving the performance of multi-index calculation.
SQL, which is commonly used in relational databases and big data platforms, does not provide the multipurpose traversal syntax and cannot work out such operations, so it has to traverse multiple times or place hope on the automatic optimization function of database engine. However, actual tests have proven that even the Oracle database with better optimization engine will still need to traverse the data table many times for relatively simple multiple grouping calculations, so it obviously does not implement the multipurpose traversal mechanism.
The ‘deposit’ and the ‘cust’ will generate new data every day, including new deposit details and full customer data of a new day.
According to the above analysis that both the ‘deposit’ and the ‘cust’ are in order by date and customer number, we just need to sort the new data of each day by customer number and append them to the end of the existing data.
A | B | C | |
---|---|---|---|
1 | =file("deposit.txt") | =file("cust.txt") | |
2 | >movefile(A1),movefile(B1) | ||
3 | =periods(date(2023,11,1),date(2023,11,30)) | ||
4 | for A3 | =to(3000000).new(A4:dt,~:cust_no,rand(2):cust_sub) | |
5 | =B1.export@at(B4) | ||
6 | for 100 | =to(200000).new(A4:dt,rand(2):curr,rand(3000000)+1:cust_no,rand(500)+1:dept,rand(2):code11,rand(2):code12,rand()*10000:amt) | |
7 | =A1.export@at(C6) |
This code generates two text files, one is deposit.txt containing 600 million detailed records of November 2023; the other is cust.txt containing 90 million customer records (full records of every day in November 2023). The two files serve as the raw data exported from database or other data sources.
In practice, the deposit details are generally stored in the order in which the data is generated, and hence deposit.txt is in order by date.
A | B | |
---|---|---|
1 | =file("deposit.txt").cursor@t(dt,cust_no,curr,dept,code11,code12,amt).sortx(dt,cust_no) | |
2 | =file("cust.txt").cursor@t(dt,cust_no,cust_sub).sortx(dt,cust_no) | |
3 | =file("deposit.ctx").create@y(#dt,#cust_no,curr,dept,code11,code12,amt) | |
4 | =file("cust.ctx").create@y(#dt,#cust_no,cust_sub) | |
5 | =A3.append@i(A1) | =A4.append@i(A2) |
A1, A2: sort the data of both the ‘deposit’ and ‘cust’ by date and customer number;
A3, A4: create two composite tables, define fields, and indicate they are sorted by dt and cust_no;
A5, B5: store the data of the ‘deposit’ and ‘cust’ into the composite tables.
A | B | |
---|---|---|
1 | =file("deposit.ctx").open() | =file("cust.ctx").open() |
2 | =B1.cursor(dt,cust_no;dt==date(2023,11,22) && cust_sub==1) | |
3 | =A1.cursor(dt,cust_no,code11,code12,dept,amt;dt==date(2023,11,22) && curr==1) | |
4 | =joinx(A2:c,dt,cust_no;A3:d,dt,cust_no) | |
5 | cursor A4 | =A5.select(d.code11==1) |
6 | =B5.groups(d.dept;sum(d.amt),avg(d.amt),count(1)) | |
7 | cursor | =A7.select(d.code12==0) |
8 | =B7.groups(d.code11;icount@o(d.cust_no),sum(d.amt),avg(d.amt)) | |
9 | cursor | =A9.groups(d.code12;sum(d.amt),count(1)) |
10 | return A5,A7,A9 |
A1-A3: create cursor on the two composite tables. The cursor created here is not multi-threaded due to the need for concurrent queries;
A4: since both the composite tables are sorted by date and customer number fields, we can perform ordered merge through the two fields to implement association;
A5, A7, A9: create channels on A4 cursor respectively, and define different index calculations. The filter condition, grouping field and aggregation function vary for different indexes;
A10: once the definition of channels ends, SPL will automatically begin to retrieve data in batches from A4 cursor and sent them to the three channels for calculation. As soon as all data are retrieved, the calculations are completed, and the results are written into A5, A7, and A9. Now the multipurpose traversal is implemented;
It should be specially noted that when the date is the same, both the ‘deposit’ and ‘cust’ are in order by customer number. Therefore, the deduplication and count function icount in B8 can be appended with the @o option, hereby implementing ordered and fast deduplication.
When using channel, the indexes with basically the same filter conditions should be put into different channels of the same cursor. Only in this way can the role of channel in performing multipurpose traversal of the same batch of data be played.
For example, the filter conditions in the example above all contain: cust_sub==1, dt==date(2023,11,22), and curr==1, we can put them into the filter condition of cursor.
If adding another index whose filter conditions are cust_sub==0, dt==date(2023,11,23), and curr==0, then the data to be traversed are completely different from those traversed in the example above, and hence it is not suitable to calculate in one traversal. Instead, we can put these conditions into other cursor and channel that have basically the same conditions as this index.
When performing a single task involving the calculation of 200 indexes and a 10-concurrency task involving the calculation of 2000 indexes on one server (64-core CPU, 256G RAM), all tasks can be finished within 3 seconds.
The challenges facing the multi-index calculation include: large amounts of detailed data, many indexes to be calculated simultaneously, and the inability of traditional technologies to meet the second-level response requirement. Moreover, the use of pre-calculation method will face the problem of extremely large numbers of index combinations, so users had to calculate a small number of indexes for query.
Using SPL technologies like multipurpose traversal, fast and ordered deduplication, and ordered merge and association of large tables can significantly speed up multi-index calculation, turn pre-calculation into real-time calculation, and ensure second-level response.
In this practice, the way the ‘cust’ stores full data per day is generally called snapshot.
However, since the data of the ‘cust’ usually change little per day, the use of snapshot will generate a lot of redundant data. In this case, we can employ the time key of SPL to store ‘cust’ to further improve performance. For details, visit SPL time key.
In addition, for multi-index calculation, we can also use the column-wise computing of SPL Enterprise Edition, along with the pjoin function, which can also greatly improve performance. For details, visit New association calculation methods of SPL.
SPL Resource: SPL Official Website | SPL Blog | Download esProc SPL | SPL Source Code