Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent Row Counts from Upsert Tables #7849

Closed
mapshen opened this issue Dec 1, 2021 · 16 comments
Closed

Inconsistent Row Counts from Upsert Tables #7849

mapshen opened this issue Dec 1, 2021 · 16 comments

Comments

@mapshen
Copy link

mapshen commented Dec 1, 2021

Summary

We recently found that querying Pinot upsert enabled tables can return different numbers of rows at different times. The row count returned can be higher than, equal to, or lower than the true count, whereas we expect it to be always the same as the true one.

Setup

Versions

Pinot Version: 0.8.0
Trino Version: 362

Kafka Topic

Topic topic1 is single partitioned, and the publisher sends over 1000 messages per second.

Table Config

The table has about 200 columns, 100 metric fields, and 100 dimension fields. Column A serves as the primary key with ~20,000 unique values. The segment flush threshold is set to 10,000, which means a new segment is produced every 10 seconds.

{
   "tableName":"table1",
   "schema":{
      "metricFieldSpecs":[
         {
            "name":"B",
            "dataType":"DOUBLE"
         }
      ],
      "dimensionFieldSpecs":[
         {
            "name":"A",
            "dataType":"STRING"
         }
      ],
      "dateTimeFieldSpecs":[
         {
            "name":"EPOCH",
            "dataType":"INT",
            "format":"1:SECONDS:EPOCH",
            "granularity":"1:SECONDS"
         }
      ],
      "primaryKeyColumns":[
         "A"
      ],
      "schemaName":"schema1"
   },
   "realtime":{
      "tableName":"table1",
      "tableType":"REALTIME",
      "segmentsConfig":{
         "schemaName":"schema1",
         "timeColumnName":"EPOCH",
         "replicasPerPartition":"1",
         "retentionTimeUnit":"DAYS",
         "retentionTimeValue":"4",
         "segmentPushType":"APPEND",
         "completionConfig":{
            "completionMode":"DOWNLOAD"
         }
      },
      "tableIndexConfig":{
         "invertedIndexColumns":[
            "A"
         ],
         "loadMode":"MMAP",
         "nullHandlingEnabled":false,
         "streamConfigs":{
            "realtime.segment.flush.threshold.rows":"10000",
            "realtime.segment.flush.threshold.time":"96h",
            "streamType":"kafka",
            "stream.kafka.consumer.type":"lowLevel",
            "stream.kafka.topic.name":"topic1",
            "stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.broker.list":"kafka:9092",
            "stream.kafka.consumer.prop.auto.offset.reset":"largest"
         }
      },
      "tenants":{
         
      },
      "metadata":{
         
      },
      "routing":{
         "instanceSelectorType":"strictReplicaGroup"
      },
      "upsertConfig":{
         "mode":"FULL"
      }
   }
}

Issues

Case 1: one or two rows are missing

Steps to reproduce

  1. Bump up the segment flush threshold to a large enough number like 1,000,000 to make sure there is no flush.
  2. Run select count(*) from table1 in PQL repeatedly and you will periodically see numbers like 19,998 or 19,997 while we expect 20,000.

Root cause

The process of updating an upsert table segment is not atomic, using two steps remove and add instead. #7844 and #7860 by @Jackie-Jiang are trying to address it.

Case 2: hundreds or thousands of rows are missing

Steps to reproduce

  1. Set the segment flush threshold to a small enough number like 1,000 so that segment flushes happen frequently.
  2. Run select count(*) from table1 in PQL repeatedly and you will see numbers like 16,703 or 18,234 while we expect 20,000 when a segment is flushed.

Root cause

Unclear - upsert across segments not atomic?

If this is truly due to the two-step update and there is no way to achieve atomicity, I would argue we should do add before remove since it is easier to de-duplicate than identity and backfill the missing row(s).

Case 3: duplicates are returned via Trino

Steps to reproduce

  1. Continue with the setup in Case 2.
  2. Run query1 select count(*) from table1 where from_unixtime(EPOCH) > current_timestamp - interval '15' minute via Trino repeatedly and you will see numbers like 20023 when you expect to see 18,000 - over 2,000 duplicates returned.

Root cause

We happened to notice that an equivalent Trino query2 select count(*) from table1 where EPOCH > to_unixtime(current_timestamp - interval '15' minute) doesn't yield duplicates. The difference is that query2 utilizes the pushdown support but query1 doesn't. We suspect when query1 is executed, it examines segments one by one and no locking is in place. For instance, it may first pull out all the valid records from segment1, after which all the valid records' locations are updated to segment2. Now when it comes to segment2, it again then retrieves all the valid records over there. In the end, it returns a union of the records from both segment1 and segment2, which contain duplicates.

CC: @mayankshriv @Jackie-Jiang @yupeng9 @elonazoulay

@mapshen mapshen changed the title Inconsistent Row Counts Inconsistent Row Counts from Upsert Tables Dec 1, 2021
@yupeng9
Copy link
Contributor

yupeng9 commented Dec 3, 2021

Thanks a lot for the detailed report and investigation!

Regarding 3, I think it's possible if the Trino execution takes long and the segment fetch takes some time apart. There's no snapshot concept so multiple calls to pinot table cannot guarantee consistency.

@mapshen
Copy link
Author

mapshen commented Dec 3, 2021

@yupeng9 As discussed in Slack, after patching #7844 and #7860, the off-by-one rate in case 1 has dropped from 50% to 2% during a test with 3000 repeated queries of select count(1) from <table> before and after. However, the bad news is that it’s not down to zero yet. Any idea on what else we should look at?

Regarding case 3, Trino perhaps needs to perform deduplication when querying multiple validDocsIndex.

@yupeng9
Copy link
Contributor

yupeng9 commented Dec 3, 2021

One idea on debugging is to print virtual columns of $docId and $segmentName of the duplicated records. This will give some insights on the internal state, and along with some logs about the ingestion activity at that time.

@mapshen
Copy link
Author

mapshen commented Dec 3, 2021

@yupeng9 duplicated record is for case 3. Any idea on debugging case 1 further?

So What logs should we look for? There are tons of logs with debugging turned on.

@colemast
Copy link

colemast commented Mar 7, 2022

This is a very interesting thread and great analysis. I can see issue 1 is being addressed and issue 3 seems like a limitation with the trino query pattern of hitting segments. Has anybody investigated issue 2 and possible reasons?

@tuor713
Copy link
Contributor

tuor713 commented Mar 8, 2022

The issue of rows missing during segment flush seem to come from the fact that there is period where there are two consuming segments (the old one being rolled into immutable and the new one). In that period queries only hit the first consuming segment, which means any records having been updated and going into the new segments are invisible.

I have seen success running with a branch that changes RealtimeSegmentSelector (tuor713@c2bc4f5) - but that may well not be the right or best fix :)

@Jackie-Jiang
Copy link
Contributor

@tuor713 Within a single streaming partition, there will be up to one consuming segment at the same time. The small inconsistency is caused by the consuming segment replacing the doc from the completed segments, and the validDocIds from the segments are not read at the same time, e.g.:

  1. Query engine reads validDocIds from the completed segment (created a copy)
  2. Consuming segment invalidates one doc from the completed segment (not visible to the query engine because a copy/snapshot is already made), and mark the doc as valid in its own validDocIds
  3. Query engine reads validDocIds from the consuming segment
  4. The same doc will be double counted

In order to solve this problem, we need to make global sync - take a snapshot of all queried segments while blocking the ingestion (as shown in the fix above). The solution works, and we can avoid creating the extra IndexSegment snapshot objects by snapshotting the validDocIds within the FilterPlanNode, but it can cause starvation between query and ingestion. For high QPS use case, the query can block each other, and also the ingestion.

We can make it configurable for use cases that requires 100% consistency, but 100% consistency is usually not necessary for analytical purpose. Essentially it is a trade-off between consistency and performance.

@tuor713
Copy link
Contributor

tuor713 commented Mar 10, 2022

Hm that is not what I saw on (at least on mid-Dec snapshot of 0.10) - on the broker side during query routing there were multiple consuming segments. I presume that is because the new online segments get created in ideal state at the same time the old one gets moved to online and the change in the external view is happening in parallel.

Agreed the replacing doc causes issues too (I have tried a patch where the upsert hash map track both the old consuming validDocIds and the new online validDocIds and replaces both, which appears to work). However, this is an issue of duplication. The loss of records with count dropping below expected population in my tests has come from the temporary existence of two consuming segments (as seen by the broker) only the first of which is queried.

Also agreed that after fixing these two issues there are still potential inconsistencies between consuming and online segments due to lack of global lock - fortunately this is an order of magnitude smaller issue at least in the tests I have tried than the aforementioned issues.

@Jackie-Jiang
Copy link
Contributor

@tuor713
I see the routing issue now. On a single server, it can have up to one consuming segment per partition; but on multiple servers serving the same segment, they might not finish sealing the segment at the same time. On the broker side, it might show as multiple consuming segments.
The routing fix in your commit can solve the inconsistency, but might also cause hotspot server because all the queries will be routed to the servers which finish the segment sealing first. To address this, one solution would be to also enhance the instance selector so that it queries the consuming segment on a server only when the server is picked to serve the last completed segment.

@mayankshriv
Copy link
Contributor

@Jackie-Jiang Is this PR still valid, or already covered by #8392 and #7958? If latter, could we close it?

@Jackie-Jiang
Copy link
Contributor

@mayankshriv The routing issue is not solved yet, so let's keep the issue open

@mapshen
Copy link
Author

mapshen commented Apr 12, 2022

@tuor713 I see the routing issue now. On a single server, it can have up to one consuming segment per partition; but on multiple servers serving the same segment, they might not finish sealing the segment at the same time. On the broker side, it might show as multiple consuming segments. The routing fix in your commit can solve the inconsistency, but might also cause hotspot server because all the queries will be routed to the servers which finish the segment sealing first. To address this, one solution would be to also enhance the instance selector so that it queries the consuming segment on a server only when the server is picked to serve the last completed segment.

There is more to the current design on segment replicas. Like I mentioned in #7850, no only is it problematic during a segment flush, it also leads to split query views when one replica goes down.

@TT1103
Copy link

TT1103 commented Oct 5, 2022

Hi, just wondering if there are any updates on Case 2? For our use case, it would be ideal if there weren't missing rows from when a segment is being flushed.

@elonazoulay
Copy link
Contributor

Hi , it looks like a recent pr (#16511) for trino fixed query1 , can you verify that a recent version of trino returns correct results?

@mapshen
Copy link
Author

mapshen commented May 10, 2023

Hi , it looks like a recent pr (#16511) for trino fixed query1 , can you verify that a recent version of trino returns correct results?

Thanks for the follow-up. I left the previous company already so I won't be able to help verify this. I can try reaching out my ex-colleagues and see if they can do it.

@Jackie-Jiang
Copy link
Contributor

The routing issue should be fixed with #11978

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants