Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Implement windowing semantics #99

Open
burdiyan opened this issue Mar 5, 2018 · 24 comments
Open

Proposal: Implement windowing semantics #99

burdiyan opened this issue Mar 5, 2018 · 24 comments

Comments

@burdiyan
Copy link
Contributor

burdiyan commented Mar 5, 2018

It would be great if Goka could natively provide primitives for windowed processing. It’s a pretty complex topic, but seems really needed in many stream processing workloads.

Kafka streams handles that by providing timestamp extractors to identify the actual time of the message, and modifying the record keys to be stored in the state store augmenting them with the window bucket.

Another approach that may help users to implement their own windowing is that if goka.Context would allow to get and set arbitrary values from the storage. I really don’t like it conceptually but it gives more freedom and is less complicated to implement.

I’m sure you were thinking about windowing in general. Do you have any clear idea on how it could be built into Goka?

@db7
Copy link
Collaborator

db7 commented Mar 5, 2018

This would be awesome! We don't have use cases for that at the moment, but I'm happy to discuss about it.

So goka.Context is an interface anyway. Whenever writing some value X, one could wrap that with some window bucket. The timestamp extractor could be either part of the codec or passed via option. One thing to consider is that automatically augmenting the state would make the table unreadable from views that are not aware of the augmentation.

Would you like to give a try? :-)

@burdiyan
Copy link
Contributor Author

burdiyan commented Mar 6, 2018

I'd like to give it a try, but first would like to have it super clear in terms of the design.

So goka.Context is an interface anyway. Whenever writing some value X, one could wrap that with some window bucket.

The thing is that it seems like there's no way to swap the context implementation, it's somehow hardcoded in the processor here: https://sourcegraph.com/github.com/lovoo/goka/-/blob/processor.go#L643:9

What is the best way you see for someone to create custom context? Pass it as an option? Create something like context builder maybe?

The timestamp extractor could be either part of the codec or passed via option.

I think both are valid solutions, but I think I like the option better.

One thing to consider is that automatically augmenting the state would make the table unreadable from views that are not aware of the augmentation.

That's true. The thing is that in kafka streams nobody is supposed to use state store change log topics, they are only for restoring the state store. But Goka is explicitly allowing other pieces to use group table's changelog topic, which is also totally valid I think.

@db7
Copy link
Collaborator

db7 commented Mar 6, 2018

Nice!

I think it should be easy to wrap the callbacks in goka.Input(topic, codec, callback). One idea would be to create a subpackage, say timebucket, then

g := goka.DefineGroup(
    tb.Input(topic, codec, callback), // callback will get wrapped
    tb.Persist(otherCodec), // otherCodec will get wrapped
)

And the wrapper would do something like this:

func Input(topic goka.Stream, codec goka.Codec, cb goka.ProcessCallback) Edges {
   return goka.Input(topic, codec,  func(ctx context.Context, m interface{}) {
       tbContext := convertContext(ctx)
       cb(tbContext, m)   
   })
}

A similar idea for persist. What do you think?

@db7
Copy link
Collaborator

db7 commented Mar 6, 2018

I guess that was unclear. I'll write a longer example later today.

@db7
Copy link
Collaborator

db7 commented Mar 6, 2018

Ok, so the idea was to create a package, eg, timebucket. In this package we wrap the edges of the GroupGraph and redefine the ProcessCallback to use a more specialized callback. (disclaimer: I havent thought about the window functions, so this is very high level and I don't know what extra functions the context should have)

The table values could be defined like this:

// bucket for the messages of one stream
type bucket struct {
	messages []interface{}
}

// wrapper around the user value. This is the actual value stored in the group table.
type bucketValue struct {
	// buckets for the streams that are buffered
	buckets map[goka.Stream]bucket
	// actual value of the user
	value interface{}
}

// codec of codecs: keeps codecs of buffered streams and table
type bucketCodec struct {
	stream map[goka.Stream]goka.Codec
	value  goka.Codec
}

func (bc *bucketCodec) Encode(value interface{}) (data []byte, err error) {
	// serialize a bucketValue usind the codecs in bc
	return nil, nil
}

func (bc *bucketCodec) Decode(data []byte) (value interface{}, err error) {
	// deserialize a bucketValue using the codecs in bc
	return new(bucketValue), nil
}

The context would be some specialized context containing any function relevant to get messages from the buckets. To process input streams with timebuckets, the user should implement callbacks with this signature: func(ctx timebucket.Context, m interface{})

timebucket could provide 3 functions to create the groupgraph: Input, Persist and DefineGroup.

Input() creates an inputStream which implements goka.Edge interface but also stores, for example, a retrieve function to get a timestamp from the events.

// goka.Edge for buffered streams
func Input(topic goka.Stream, codec goka.Codec, cb goka.ProcessCallback, retrieve RetrieveTimeFunc) goka.Edge {
	return &inputStream{topic, codec, cb, retrieve}
}

type inputStream struct {
	topic    goka.Stream
	codec    goka.Codec
	cb       ProcessCallback
	retrieve RetrieveTimeFunc
}

func (i *inputStream) String() string    { return "..." }
func (i *inputStream) Topic() string     { return string(i.topic) }
func (i *inputStream) Codec() goka.Codec { return i.codec }

// Functions with this signature can retrieve a timestamp from a message
type RetrieveTimeFunc func(message interface{}) (time.Time, error)

The persistency edge would be something like this:

// goka.Edge for wrapped value
func Persist(codec goka.Codec, windowLength time.Duration) goka.Edge {
	return &table{codec, windowLength}
}
type table struct {
	topic      goka.Stream
	codec      goka.Codec
	windowLength time.Duration
}

When defining a group graph, we can wrap the callbacks and codecs and provide a normal goka.GroupGraph to the goka.Processor.
If timebucket.DefineGroup only takes goka.Edge objects as argument, it also allows one passing normal goka.Input, goka.Table, etc, which is nice. We can internally do type switches to find the inputStream and table types defined above.

// timebucket.DefineGraph
func DefineGraph(group goka.Group, edges ...goka.Edge) *goka.GroupGraph {
	var (
		t = findPersist(edges)      // persistency edge
		i = findTimeBucketed(edges) // timebucketed inputStreams
		o = removeFrom(edges, t, i) // remaining edges
		e []Edge                    // result
	)
	// 1. replace codec of persistency edge
	c := &bucketCodec{
		stream: make(map[goka.Stream]goka.Codec),
		value: t.Codec(),
	}
	for _, edge := range i {
		c.stream[goka.Stream(edge.Topic())] = edge.Codec()
	}
	e = append(e, goka.Persist(c))

	// 2. wrap callbacks in timebucketed inputStreams
	for _, in := range i {
		// callback with the timebucket context
		cb := func(ctx goka.Context, m interface{}) {
			// create timebucket Context
			tbctx := newContext(ctx)

			// save message in bucket if necessary
			if decide(in.retrieve(m), t.windowLength) {
				tbctx.bucket.Add(m)
			}

			// call user callback func(ctx timebucket.Context, m interface{})
			in.cb(tbctx, m)

			// if messages are in buffer and SetValue not called, call it now
			tbctx.persist()
		}
		e = append(e, goka.Input(in.Topic(), in.Codec(), cb)
	}

	// 3. put everything together
	return goka.DefineGroup(group, append(e, o...)...)
}

I just don't know how timebucket.Context should look like since I don't know much about time joins.

There are definitely other ways how to inject that into goka. Let me know if you have other ideas.

@burdiyan
Copy link
Contributor Author

burdiyan commented Mar 9, 2018

Just read the thread and from the high level it seems pretty reasonable, although conceptually there’s something I don’t like about the idea of duplicating core functions (goka.Stream and etc.) specifically for doing windowing.

I’ll play a little bit more with Kafka Streams to see how it works internally, because I only was reading code without actually playing with it.

On the other hand, there’s an idea that comes into my mind that seems should be taken into account: TTL for the records. Windows should have expiry time (maybe optional) after which they should go away. It maybe something similar to Kafka’s compaction mechanism.

Kafka Streams is using rocks db as a state store, and rocks db has TTL built in. Although I don’t know if they are using it for windowing at all.

@db7
Copy link
Collaborator

db7 commented Mar 9, 2018

We'd not be duplicating core functions. The functions I suggested should be seen as "builders" which wrap the original code with additional functionality. But other solutions are also possible.

And yes, it would be great to learn from your experience with Kafka Streams.

Just the TTL for records sounds complex in the context of Goka. Of course we can expire entries in the buckets when events arrive for the respective keys, but expiring entries in buckets that receive no message will be harder.

@burdiyan
Copy link
Contributor Author

burdiyan commented Mar 9, 2018

i could see it being something along these lines (from a very hight level perspective):

  1. We could have a new goka.Edge that would create a group table. Same thing as goka.Persist but something like goka.Windowed. Only one of these two would be allowed in a group.

  2. goka.Windowed would accept the codec, window size and window retention period (TTL). Retention period can be longer than window size to allow for late arrival data.

  3. We could start using message's .Timestamp parameter to define time buckets.

  4. Current implementation of goka.Context would have to understand when group table is a windowed one, and would wrap .Value and .SetValue modifying the record's key to some common pattern that identifies the key in a bucket. For example Kafka Streams uses the following: [<original-key>@<bucket-start-ms>/<bucket-end-ms>].

  5. This way, a user would obtain a corresponding aggregate for each processing message, and the actual aggregate is defined by the user.

  6. There should be a cleanup process, that would purge expired windows, once per TTL duration. It can be best-effort operation, so sometimes buckets that are already expired can still be in the state store for some time.

  7. Windows are based on the epoch time. (round(current-timestamp-nano/window-size-nano would be a bucket start time).

Things to take into account:

  1. Users of windowed group table's topics should be aware of the modified key pattern.

  2. Incoming records that correspond to already expired buckets should be discarded (or maybe let the user decide what to do with them).

A possible user experience could be something like this:

goka.DefineGroup("count-clicks-5m",
    goka.Input("clicks", new(codec.String), func(ctx goka.Context, message interface{}) {
        // This would grab the value from the state store
        // that corresponds to the time bucket of the incoming message
        // based on it's Timestamp parameter.
        
        var count int
        if v := ctx.Value(); v != nil {
            count = v.(int)
        }

        count++

        ctx.SetValue(count)
    }),
    goka.Windowed(new(codec.Int), 5 * time.Minute, 24 * time.Hour),
)

This is very similar to how Kafka Streams works (without taking into account different window types and more advanced features).

What do you think?

@db7
Copy link
Collaborator

db7 commented Mar 13, 2018

That sounds really nice! It is simpler than my proposal above and should also solve your issue with joining two streams, right? Do you have a suggestion how to implement the cleaning up process?

Sorry for the late response.

@burdiyan
Copy link
Contributor Author

burdiyan commented Mar 14, 2018

@db7 In Kafka Streams, time is advanced only when new messages are coming.

So basically it could be something like this when we process a new message:

if windowExist(message):
  addToWindow(message)
else:
  createNewWindow(message)
  purgeExpiredWindows(key)

@db7
Copy link
Collaborator

db7 commented Mar 14, 2018

@burdiyan that sounds easy to implement. As far as I see, all these changes can be implemented on top of the existing edges. We just have to be careful what edges can be combined together. If Window is used, we should not use Persist, but also should not use Join nor Lookup because those are not timed. Perhaps we should also not allow Loopback ? So the Window will restrict quite a lot how the graph will look like.

We were planning at some point to come up with an operators subpackage and I think windowing would fit there perfectly. The operators subpackage would be a library of predefined design patterns. It would provide GroupGraph builders with a predetermined structure, for example, if you want to map the values of a topic into another topic, currently you'd write something like this:

goka.DefineGraph(group,
   goka.Input(inTopic, inCodec, func(ctx goka.Context, m interface{} {
      result := mapFunction(m)
      ctx.Emit(outTopic, ctx.Key(), result)
   }),
   goka.Output(outTopic, outCodec),
)

Instead of doing that by hand, the operators library would provide a GroupGraph builder like this:

operators.Map(inTopic, inCodec, outTopic, outCodec, 
    func(m interface{}) interface{} { return mapFunction(m) })

The same thing can be done for ReduceByKey and Filter and some other simple operators.

A Window operator could look like this:

operators.Window("count-clicks-5m", new(codec.Int), 5*time.Minute, 24*time.Hour,
	operators.WindowInput("add-clicks", new(codec.String),
		func(value interface{}, message interface{}) interface{} {
			var count int
			if v := value; v != nil {
				count = v.(int)
			}
			count++
			return count
		}),
	operators.WindowInput("sub-clicks", new(codec.String),
		func(value interface{}, message interface{}) interface{} {
			var count int
			if v := value; v != nil {
				count = v.(int)
			}
			count--
			return count
		}),
)

The interface{} types could be replaced by some specialized context or other types we find adequate.

The advantage of building such patterns on top of the edge primitives is that we have type safety when combining the edges and we can restrict/extend the callback interface, tailoring it to the use case. I would prefer that than pushing Window at the same level as the edges. What do you think about that?

Also, in the operators package we could use a more fluent definition for such group graphs if desired, eg,

operators.Window(group, codec, time, time).
  AddInput(topic, codec, function).
  AddInput(...).
  Build()

@andrewmunro
Copy link

@db7 Is there any update on this? I'd love to add time based windowing to my application but am unsure on how to implement it myself :(

@burdiyan
Copy link
Contributor Author

burdiyan commented Apr 3, 2018

@andrewmunro currently the easiest approach I would recommend is to use a group table, create a record there that would manage all the windowing.

For example you could have something like:

type WindowedRecord struct {
    windows map[int64]MyAggregatedValue
}

Then on each incoming record from your input topic, you would retrieve WindowedRecord from the group table store (or initialize one if it's not there), then identify the window based on epoch time of your incoming record, and put it in the right key of the underlying map. Then you would store the record back in the group table.

Let me know if the above make sense for you, I'm not sure I'm explaining myself clear enough.

IMO, something similar right now is the only way to implement windowing semantics without modifying Goka itself.

BTW, @andrewmunro could you describe a little bit more your use case for windowing semantics that you need in your application?

@andrewmunro
Copy link

andrewmunro commented Apr 4, 2018

@burdiyan That kind of makes sense. So here's an example.

Say I have an incoming stream of transactions from customers. I want to aggregate the amount of money a customer has spent in different time windows, such as this week, this month, this year (maybe too much data to store).

So my input model would look like this:

type Transaction {
	CustomerID string
	Amount float64
}

And my group model looks like this:

type Customer struct {
	ID string
	AmountTransacted float64
}

And finally, my aggregator looks like this:

	g := goka.DefineGroup(
		group,
		goka.Input(topic, new(transaction.Codec), func(ctx goka.Context, msg interface{}){
                      t := msg.(*transaction.Transaction)
                      c = ctx.Value().(*customer.Customer)
                      c.AmountTransacted += t.Amount
                      ctx.SetValue(c)
                 }),
		goka.Persist(new(customer.Codec)),
	)

Essentially what I want is to add an AmountTransactedThisMonth field to my Customer struct.

type Customer struct {
	ID string
	AmountTransacted float64
	AmountTransactedThisMonth float64
}

@db7
Copy link
Collaborator

db7 commented Apr 4, 2018

@andrewmunro you'll also need to keep track of what is the current month you are considering using a timestamp and then you'd have to reset the aggregation of the month once the month is over. Something like this:

func(ctx goka.Context, msg interface{}){
   t := msg.(*transaction.Transaction)
   c = ctx.Value().(*customer.Customer)
   c.AmountTransacted += t.Amount
   // reset amount if month is over
   if t.Timestamp.Month() != c.Timestamp.Month() {
     c.Timestamp = t.Timestamp
     c.AmountTransactedThisMonth = 0
   }
   c.AmountTransactedThisMonth += t.Amount
   ctx.SetValue(c)
})

Also note that you don't need to keep the customer ID in your state since it will be the same as the key.

@andrewmunro
Copy link

@db7 I thought about this, but won’t any views accessing a customer’s state potentially be incorrect until that customer makes another transaction?

@db7
Copy link
Collaborator

db7 commented Apr 4, 2018

Do you mean when a month is over and the customer object is not yet updated? You can check what the current month is when a service reads the data from the view. What we usually do is to add methods to the type stored in the group table. Something like this:

func (c *Customer) GetAmountThisMonth() float64 {
    if c.Timestamp.Month() != time.Now().Month() {
        return 0.0
    }
    return c.AmonthTransactedThisMonth
}

@keisar
Copy link

keisar commented Mar 18, 2021

Hi @db7
We've been looking into using goka, for our use case we'll need windowing based on specific times, for example every 15 minutes at fixed intervals and we need to make sure the window is emitted at fixed times even if no message was received.
I was thinking of implementing a Ticker to check the time and emit the window when needed, but after digging a little I noticed that if no message was received (ex. If the consumer was restarted) the context won't have the message key and it'll have no way to find the correct value (as we save the window in the context).
Any suggestion on how we can implement this kind of functionality? I would be happy to contribute this functionality but struggle to see a reasonable solution :/ any help would be much appreciated.

@db7
Copy link
Collaborator

db7 commented Mar 18, 2021

Hi @keisar I haven't been working on this for a while. I guess @frairon would be able to help you better.

If I understand your problem every 15 minutes you need to create a message for every key of a table. The only idea I have at the moment is the following. In your main (not in ProcessCallback), you create a ticker that iterates over the table every 15 minutes and sends a message with an emitter. AFAIR, you can iterate directly on the top of the processor. But it may be better to create another program with a view of the table and iterate on the view.

@keisar
Copy link

keisar commented Mar 19, 2021

Thank you @db7
I'll look into this, your approach sounds like a good one, I'm only concerned on how to make sure a window is not sent multiple times, for example if I have multiple instances running the emitter I'm guessing each emitter will see all the windows so I'll need to find a way of assigning the data between these instances or maybe goka covers us and we don't need to take care of that?

@frairon
Copy link
Contributor

frairon commented Mar 21, 2021

Hi @keisar,
yep, Diogo is basically right, there is no builtin functionality of windowing in goka. But if you want to emit a value for every key, create a view and iterate over it. But this gives you more of a snapshots-semantics, because all windows are emitted at the same time. Also it probably won't scale very well since it'll always iterate all keys even when sending only few entries.
Also, if you had multiple instances of that view-based-window-emitter, every instance would emit every window.
I'm not sure I've really understood the use case. Could you give an example of input/output, how it's aggregated and what data should end up in the window eventually? Maybe there are other solutions too.

@keisar
Copy link

keisar commented Mar 22, 2021

Hi @frairon
Thanks for the reply, you confirmed some of my concerns, I was worried on how this will scale…
Here’s an example, hopefully it’ll clarify the use case a little:
We have two topics, one for appointments and another for appointments cancellation (key for both is customer id), we want to window the events coming in per customer for 15 minutes, remove cancelled appointments and create some counters per window.
It is important that each window is sent at the same time, for example a window can be from 11:13-11:28 so the window will be emitted at 11:28 even if no message arrived at that time.

@keisar
Copy link

keisar commented Mar 24, 2021

image

Hi @frairon
I'm attaching here a diagram showing what we thought of doing in order to achieve the windowing, both for feedback and to help others if they want to achieve the same (assuming you won't find any issues we missed :) )
Short legend for the diagram:

  1. Window tagger - adds a window field to each incoming message
  2. Window collector - collects all messages into a GroupTable under the appropriate window
  3. Window notifier - produces messages to a topic whenever a window should be created
  4. Window transmitter - When a window needs to be transmitted (based on notification from the window notifier) it will grab the window, send it and clear it (we will have some logic here to deduplicate multiple notification for the same window)

Hopefully with this approach the transmitter can still scale and the notifier doesn't do much (every 15 minutes sends small messages for each window) so it won't be an issue scale-wise.

@chrisfjones
Copy link
Contributor

@keisar We've implemented similar windowing semantics in a goka processor by relying on a ticker and Processor.VisitAll. I've added more detail on our use case here: #392 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants