Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

View: expose view and connection state #248

Closed
frairon opened this issue Apr 15, 2020 · 15 comments
Closed

View: expose view and connection state #248

frairon opened this issue Apr 15, 2020 · 15 comments

Comments

@frairon
Copy link
Contributor

frairon commented Apr 15, 2020

@mattburman in #239

Additionally, it would be nice to expose the view state: possibly ViewStateIdle, ViewStateCatchUp, or ViewStateRunning? Ideally I want to be able to differentiate between being behind and a connection cant currently be established.

I have also found the view.Recovered method returns true whilst it is struggling to connect to kafka. Is this intended? I need some way of determining whether the app is connected to kafka for metrics. It looks like partitionStats.Status also returns 4 (PartitionRunning) when no connection to kafka can be established. Stalled returns false too

@frairon
Copy link
Contributor Author

frairon commented Apr 15, 2020

I agree, we can easily enhance the view state to provide more details about the current state including what we have already, and adding a Reconnect/Connecting state that will be entered whenever the view is reconnecting.

A state for "lagging behind" is harder to achieve, because we don't have criteria to decide at which point you're lagging. Currently it's like this: before the partition recovers, it retrieves the highwatermark and as soon as it gets to highwatermark it is considered "recovered", even if it is lagging in future. I mean technically it is always lagging, because by the time it reaches hwm, that actual value is incremented anyway.

So the question is, what is considered "being behind". Is it a fixed value? Then the stats should be used to detect that as you described.
if it's related to the connection, we could set the view state after reconnecting to "recovering" until it reaches hwm again.
So instead of doing the state transitions like this:

Idle->
  Connecting->
     Recovering->
        Running->
     Connecting->
        Running->
      Connecting->
        Running
...
we could try to do:
Idle->
  Connecting->
     Recovering->
        Running->
     Connecting->
       Recovering->
          Running->
      Connecting->
        Recovering->
          Running->
...

So adding a Recovering-step before the running in case of reconnection. Would that seem more logical?
The stalled stats is really mainly for statistics in the monitor. It's a simple timer that sets the partition to stalled when there's no incoming message for some time. We shouldn't use that for state-tracking.

@mattburman
Copy link

My main priority is definitely the Connecting state. A single boolean allowing me to fail a healthcheck and/or set a gauge metric for alerting on and graphing. If I'm successfully consuming all partitions, I don't care so much if I'm behind (Recovering vs Running) for my use case (just a low volume compacted topic at the moment).

That said I can see a case for being able to detect lagging behind. I think for the initial state transition for Connecting -> Recovering -> Running we can assume you are Running once you reach the initial Hwm. But after that, transitioning Running -> Recovering can't necessarily be assumed. It would really depend on the use case and what is normal for a certain topic.

If I have offset lag and Connecting state, I can already determine Recovering for myself without Goka telling me. I can set my thresholds for the maximum allowable offset lag and/or time over that threshold. Maybe that's not obvious to the average user and providing a function (offsetLag int, timeSinceConnected float64) (reconnected bool) would be a more explicit way to let Goka handle it?

Either way, Recovering state is generally something I would alert on externally using exposed consumer offset lag. e.g. page me out if Offset lag > 100 for 5 minutes. The app can't necessarily do much to recover faster by itself, so maybe Running -> Recovering is not a useful state transition anyway

@frairon
Copy link
Contributor Author

frairon commented Apr 16, 2020

Awesome, agreed. So let's add a connecting-State which is entered when connecting/reconnecting. As you said, the rest can be achieved with Stats for now.
When that's done we can think about adding the convenience function to detect connection loss/lagging.

@mattburman
Copy link

mattburman commented Apr 22, 2020

Hey @frairon, would love to see this implemented. I think I would struggle to add this myself. I'm wondering whether this is likely to be implemented soon?

I'm looking forward to the retries with the connected status access. Once they're in a release I'm fairly happy with this new version and we can integrate without any regressions in our metrics and availability. I don't need recovering status, just Connecting

@frairon
Copy link
Contributor Author

frairon commented Apr 22, 2020

Sorry, our goka engagement comes in bursts :)
I'll try to do it this week, it's probably not a big issue to add.

@mattburman
Copy link

No worries, I understand that :)

Thanks you for all of your engagement so far, it's really encouraging for our team to see :)

@frairon
Copy link
Contributor Author

frairon commented Apr 26, 2020

First though (because I started that already earlier: I added more docs to the behavior change of view.Run when using restartable/autoreconnect).
Also added a different option, because the name "restartable" now does not make sense anymore.
Take a look at #252 if you want. I'll do the state thing next.

frairon added a commit that referenced this issue May 7, 2020
@frairon
Copy link
Contributor Author

frairon commented May 7, 2020

@mattburman there's progress, turned out to be more tricky than I thought. I tried to solve it by observing the partition states and merging them into a view state. Kind of reactive, maybe we should use rxgo at some point :).
Anyway, would be super helpful if you could check out the branch (248-connection-state), run the example 6-reconnecting-view I created for that use case and let me know if that would suit your needs.

Also I realized, that an autoreconnecting view fails when the kafka cluster is not available at startup. This might be an issue and I'll have a look if I can solve that somehow. Would that be critical for your use case? In the old style (backoff around view.Run), I guess a view would even tolerate a missing kafka at startup and just serve the local cache. Didn't think of that before.

@mattburman
Copy link

Thank you @frairon!

run the example 6-reconnecting-view I created for that use case and let me know if that would suit your needs.

I've tested it out and view.CurrentState() and view.ObserveStateChanges() don't seem to do a Running -> Connecting state transition.
When I take down the kafka cluster with make stop, view.CurrentState() still returns 4. There's also no logs from the state observer. There are error logs for the partitions but not from the state observer goroutine.
Here's the goroutine I added to poll the view.

	go func(v *goka.View) {
		ticker := time.NewTicker(time.Second * 1)
		for {
			<-ticker.C
			fmt.Println(view.CurrentState())
		}
	}(view)

Also I realized, that an autoreconnecting view fails when the kafka cluster is not available at startup

I just tested this out, and it looks like NewView construction blocks until it's connected? I don't want NewView to block. We are currently pinned to v0.9.0-beta3 where NewView construction does not block. It's view.Run that would block until there is an error (can't initally connect). We still need to be able to exit the app if we have not been able to initialise a connection on startup

@frairon
Copy link
Contributor Author

frairon commented Jun 15, 2020

Hi @mattburman, sorry for the long delay.
So for the state change, that should be fixed now, I simply updated the partition reconnecting state too late. I guess the unit tests I'm going to have to write later would have found that :)

But let's solve the NewView issue. When I start the example without a running kafka cluster, it does not block, but fail with "kafka: client has run out of available brokers to talk to". Is that call really blocking for you? Could you give me a code snippet to reproduce that?

This fail-fast behavior when calling the constructor is the same in the old version.
If we really want that call to succeed and only the Run(ctx) block/retry, I think we'll have to pass the number of partitions to the Restartable option. Otherwise the view doesn't know how many partitions to load from disk. Usually it would get this information from the TopicManager, but that fails due to the missing Kafka.
Alternatively it could scan the directory and open all caches it finds. And if it doesn't find anything, then the view never ran, at which point it makes sense to fail because there wouldn't be any data to serve anyway.
What do you think?

@mattburman
Copy link

mattburman commented Jun 16, 2020

Hey @frairon :) Thanks for all your work so far

So for the state change, that should be fixed now

I have tested this manually and works well :)

.

Is that call really blocking for you?

Tested it again. I didn't realise this before, but it does return after 2 minutes. Is this configurable? I'd need apps to exit within seconds if it can't connect. This is just running 6-reconnecting-view/main.go (with some log.Println before/after construction):

2020/06/16 16:31:04 constructing goka.View...
2020/06/16 16:33:05 unblocked
2020/06/16 16:33:05 Cannot create view: Error creating sarama consumer for brokers [localhost:9092]: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

This is logs from an example from our wrapper library pinned to v0.9.0-beta3, when there is no kafka dependency running (also tried it with no zookeeper too):

1-simplest    | 2020/06/16 15:52:34 constructing a goka view v0.9.0-beta3...
1-simplest    | 2020/06/16 15:52:35 unblocked
1-simplest    | 2020/06/16 15:52:35 panicing
1-simplest    | panic: Error creating sarama consumer for brokers [kafka:9092]: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

Unfortunately our wrapper is coupled to internal tooling/libraries at the moment so not so easy to share.
The main differences between our example and gokas latest example are:

  • using v0.9.0-beta3
  • we set a sarama logger on sarama.Logger
  • we pass in goka.WithViewHasher(kafkautil.MurmurHasher)
  • im running the example in docker-compose too

I've also replicated NewView returning quickly in the goka examples. I made the following changes to v0.9.0-beta3 2-clicks example:

diff --git a/examples/2-clicks/main.go b/examples/2-clicks/main.go
index acf2e82..3d09e80 100644
--- a/examples/2-clicks/main.go
+++ b/examples/2-clicks/main.go
@@ -4,6 +4,7 @@ import (
        "context"
        "encoding/json"
        "fmt"
+       "log"
        "net/http"
        "time"

@@ -103,10 +104,13 @@ func runProcessor() {
 }

 func runView() {
+       log.Println("constructing goka view in 2-clicks v0.9.0-beta3")
        view, err := goka.NewView(brokers,
                goka.GroupTable(group),
                new(userCodec),
+               goka.WithViewRestartable(),
        )
+       log.Println("unblocked")
        if err != nil {
                panic(err)
        }
@@ -124,7 +128,7 @@ func runView() {
 }

 func main() {
-       go runEmitter()
-       go runProcessor()
+       //go runEmitter()
+       //go runProcessor()
        runView()
 }

i.e. commented out the emitter and processor and added some logs, yielding the following output:

1  ➦ 1eaa22f ●  ~/go/src/github.com/lovoo/goka/examples$ go run 2-clicks/main.go
2020/06/16 17:04:06 constructing goka view in 2-clicks v0.9.0-beta3
2020/06/16 17:04:07 unblocked
panic: Error creating sarama consumer for brokers [127.0.0.1:9092]: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

@frairon
Copy link
Contributor Author

frairon commented Jul 6, 2020

Hi @mattburman , sorry again (I guess it's time to stop apologizing for being late),

glad the status updates are working now, then let's consider this done.

We had a little brainstorming how to get the View tolerate a startup without a running Cluster. Since it seems that this would be too confusing and inconsistent to integrate, we decided it's best to leave it as it is. So let's concentrate on how to get the view failing faster.

If I'm understanding correctly, the delay only occurs when the internal library is used, because above you mentioned that the plain examples run fast.

Is it possible that the library modifies sarama.Config and increases some timeouts that are lower by default in the example configuration?

Maybe the network behaves differently by just blocking the request if you have no kafka cluster running?

Another idea: the sarama implementation first tries to get the metadata, by querying all brokers. If a brokers cannot be connected to, it will get removed from the list of available brokers. On other errors, however, the client will just retry with another random broker. If some deadlines/timeouts are sufficiently high, that could probably also take a while, because it's trying the nonexisting brokers over and over again. You could maybe get some insights by activating sarama's logger, which is disabled by default.
E.g. add this before starting goka:

sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)

Let me know if that helps.

@mattburman
Copy link

@frairon My bad, I wasn't clear at all

I've narrowed it down much further now. I'm running this on a mac:
2-simplest NewView returns quickly.
6-reconnecting-view NewView returns after 2 minutes.

I modified the broker in 6-reconnecting-view to 127.0.0.1 rather than localhost and it now returns quickly too. (0.0.0.0 also returns quickly).

I've also re-tested my example in our wrapper (which connects to a broker in the docker network at host kafka). It doesn't take a long time to return. I've since lost the code I was running before, and I can't remember how long it was blocking for originally in #248 (comment)

I've also worked out I was confused about the behaviour of view.Run and NewView. I assumed it wouldn't initiate a connection until view.Run, but I was wrong. NewView does block until it's connected or a network error is returned, but this is usually fast. For some reason, it takes 2 minutes on brokers resolving on localhost (on my mac at least).

I think my concerns are resolved to be honest. Sorry for the confusion. I think I am happy with this version. I should be able to set metrics using this view state functionality once this is merged and a new version is pinned

@frairon
Copy link
Contributor Author

frairon commented Jul 8, 2020

Awesome, glad we figured it out :)
No worries for the confusion, actually that got me thinking that it really is confusing to connect the view before calling Run. The processor doesn't do that, so I would like to change that in the view too.

So the new behavior would be NewView only initializes the structure. view.Run then connects (or tries reconnecting using the backoff if cluster is down). The view won't be usable until the cluster comes online, so functions like Has, Get and Iterate will return an error until it at least is connected once. Do you think that would be an issue for you?

@frairon
Copy link
Contributor Author

frairon commented Jul 8, 2020

I'll close this issue and create another one where we can discuss details if we want to change that. We'll try to release goka to get rid of the huge PR and can fix such things later...
Many thanks @mattburman for your input!!

@frairon frairon closed this as completed Jul 8, 2020
frairon added a commit that referenced this issue Jul 9, 2020
* Co-authored-by: frairon <[email protected]>
Co-authored-by: R053NR07 <[email protected]>
* bugfix in shutdown/rebalance: correctly closing joins
* run update/request/response stats in own goroutine
* fix rebalancing by adding a copartitioning rebalance strategy
* updated readme for configuration, added changelog
* Open Storage in PartitionTable when performing Setup
* return trackOutput if stats are nil
* view.get fixed for uninitialized view
added lots of godoc
fixed many linter errors
added Open call when creating storage
* context stats tracking: use queueing mechanism to avoid race conditions
* Add simpleBackoff and add backoff options for processor and view
* added strings to streams helper
* #249 view example
* issue #249: migration guide, #241 panic documentation of context
* #248 exposing partition table's connection state to view
* Migration: describe offsetbug
* partition_table: implement autoreconnect in recovery mode
* bugfix goroutine-leak in statsloop in partition table
* #248: refactored state merger, bugfix race condition when shutting down the signal/observers, created example
* bugfix partition state notification
* remove change in example, updated changelog
* fix readme example and add readme as example
* restore 1-simplest fix, remove readme-example
Co-authored-by: Jan Bickel <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants