-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a Reader/Writer Interface for Streaming #25
Conversation
Merged build finished. Test FAILed. |
Test FAILed. |
9f1719a
to
517ef4d
Compare
Merged build finished. Test FAILed. |
Test FAILed. |
517ef4d
to
8ed211b
Compare
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
*/ | ||
override def awaitNextBatch(): Unit = { | ||
while (!batchRun) { | ||
awaitBatchLock.synchronized { awaitBatchLock.wait(100) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why use wait(100)
instead of wait()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats a really good question. When I had it as wait()
, it was hanging non-deterministically. I think its okay to spin occasionally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds like a sign of corner cases we are missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the docs interrupts and spurious wakeups are possible, and this method should always be used in a loop
. Do my guess is that we some times wake up spuriously. There is then a race to check the done condition / finish the batch (which is why it would hang with very low probability (3/1000)). So, this actually does seem like the best solution. We don't spin a ton wastefully. In most takes we awake immediately. In very rare cases we sleep 100ms too long.
* to gurantee that a new batch has been processed. | ||
*/ | ||
@DeveloperApi | ||
def awaitNextBatch(): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awaitBatchCompletion? awaitNextBatch doesnt signify whether to wait for next batch to start or end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
* | ||
* @since 2.0.0 | ||
*/ | ||
def format(source: String): DataStreamReader = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing reader.format("kafka")
is a quite weird, and will be weird for most non-fs streaming sources. Rather I propose having an alias called source
, which works nice for both batch and streaming - source("text")
, source("parquet")
, source("kafka")
all make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @rxin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this depends on what other methods are available on the reader/writer interfaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah never mind -- i misunderstood it. your proposal makes sense
Merged build finished. Test FAILed. |
Test FAILed. |
5ad7ec6
to
9f13624
Compare
Merged build finished. Test FAILed. |
Test FAILed. |
9f13624
to
a52200b
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
Add a Reader/Writer Interface for Streaming
In this PR I add a new interface for opening new streams (as
Dataframe
s) and starting a new streaming query. These are modeled after the DataFrame reader/writer interface.Sources and Sinks are created by a
StreamSourceProvider
orStreamSinkProvider
, which are similar to aRelationProvider
in the Data Source API (and in fact a single class can be all of the above if desired).I include a throwaway implementation of a text file source/sink for demonstration / testing.
TODO: