-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Event stream MVP #9013
Event stream MVP #9013
Conversation
Things I am tracking and need to be resolved:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll finish up on Monday. Looking really great so far!
nomad/event_endpoint.go
Outdated
// ACL check | ||
// TODO(drew) ACL checks need to be per topic | ||
// All Events Management | ||
// System Events Management | ||
// Node Events NamespaceCapabilityReadEvents | ||
// Job/Alloc Events NamespaceCapabilityReadEvents | ||
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { | ||
handleJsonResultError(err, nil, encoder) | ||
return | ||
} else if aclObj != nil && !aclObj.IsManagement() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't want to refine this before merging to master just make sure to file an issue to track the future work.
nomad/event_endpoint.go
Outdated
return | ||
} | ||
select { | ||
case <-errCh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we receiving on the errCh here?
I know this is the pattern used in Monitor, but is checking the error returned by conn.Read
above even necessary? Seems like in streaming APIs where we never expect to read anything, conn.Read
returning anything at all is an indication we should shut things down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need receive on errCh here, removing. I think we want to try to read from the conn just to make sure the conneciton hasn't closed?
0ed92a6
to
c12d912
Compare
This pull request is being automatically deployed with Vercel (learn more). 🔍 Inspect: https://vercel.com/hashicorp/nomad/mo67ibodj |
// } | ||
if n.config.EnableEventPublisher { | ||
if err := rehydratePublisherFromState(n.state); err != nil { | ||
n.logger.Error("Error re-hydrating event publisher during restore", "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no error return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Event durability seemed like a best effort, failing the entire snapshot restore process because we couldn't rehydrate seemed aggressive, but maybe its worth doing? Maybe this needs a TODO for when it comes out of beta? not really sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Soft failing on event restoration seems like the right tradeoff to me. My only fear is that it may be very difficult to know that durability works at all! I'm not sure our E2E suite currently supports restarting servers as part of a test, and users may never notice this log line and be unaware there's some functionality missing. A unit test should be sufficient to ensure durability is working. Unit testing and even manual testing the soft fail case seems very awkward since you would have to manipulate the snapshot very carefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still haven't finished. Sorry. event_buffer
is tricky stuff.
nomad/event_endpoint.go
Outdated
// TODO(drew) handle streams without ACLS | ||
reqToken := args.AuthToken | ||
if reqToken == "" { | ||
// generate a random request token | ||
reqToken = uuid.Generate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand what's not being handled here. What's the request token represent? It can't be a unique consumer identifier because a single auth token could open multiple streams.
If TODO
s represent missing core functionality, please file an issue for tracking before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought I replied here but I added a comment to the file.. if an auth token is passed, we want to to add it to the subscription request so the event publisher can track ACL events and force close subscriptions if a token expires.
If ACLs are disabled we give it a random UUID, to satisfy the subscription map, but it will never be closed due to token expiry
nomad/state/apply_plan_events.go
Outdated
) | ||
|
||
func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) (structs.Events, error) { | ||
var events []structs.Event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be nice to avoid reallocations from appending to the slice in this critical path.
var events []structs.Event | |
var events []stream.Event | |
events := make([]stream.Event, 0, len(changes)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
len(changes) won't always end up being equal to the events that are created, (no new events for changes to index table)
nomad/stream/event_buffer.go
Outdated
// Check if the reader is too slow and the event buffer as discarded the event | ||
select { | ||
case <-i.link.droppedCh: | ||
return nil, fmt.Errorf("event dropped from buffer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't this checked in the select above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If droppedCh is ready linkCh will likely be as well (slow reader is on event 1, the new head is event 3, so 1 linkCh to 2 will be ready). Since both will be ready the select will be random so we need to check after, I'll add a comment.
nomad/stream/event_buffer.go
Outdated
b.tail.Store(item) | ||
|
||
// Increment the buffer size | ||
size := atomic.AddInt64(b.size, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is only ever used by externally synchronized methods and therefore doesn't need synchronization? Not a big deal to keep just to avoid races in the future.
b8a0cbd
to
9925e7d
Compare
364b23e
to
17ebff0
Compare
* only enable publisher based on config * add default prune tick * back out state abandon changes on fsm close
* Node Register/Deregister event sourcing example upsert node with context fill in writetxnwithctx ctx passing to handle event type creation, wip test node deregistration event drop Node from registration event * node batch deregistration
This Commit adds an /v1/events/stream endpoint to stream events from. The stream framer has been updated to include a SendFull method which does not fragment the data between multiple frames. This essentially treats the stream framer as a envelope to adhere to the stream framer interface in the UI. If the `encode` query parameter is omitted events will be streamed as newline delimted JSON.
* Node Drain events and Node Events (#8980) Deployment status updates handle deployment status updates (paused, failed, resume) deployment alloc health generate events from apply plan result txn err check, slim down deployment event one ndjson line per index * consolidate down to node event + type * fix UpdateDeploymentAllocHealth test invocations * fix test
* generic eval update event first pass at alloc client update events * api/event client
table for durability
address pr feedback
are removed from the event buffer. Wire up event buffer size config, use pointers for structs.Events instead of copying.
4b64007
to
8c88f29
Compare
properly wire up durable event count move newline responsibility moves newline creation from NDJson to the http handler, json stream only encodes and sends now ignore snapshot restore if broker is disabled enable dev mode to access event steam without acl use mapping instead of switch use pointers for config sizes, remove unused ttl, simplify closed conn logic
092918a
to
3c15f41
Compare
8dc90ba
to
8c5f3a0
Compare
I'm going to lock this pull request because it has been closed for 120 days ⏳. This helps our maintainers find and focus on the active contributions. |
This PR adds an event streaming to Nomad.
Event Types
Currently there are event types for the major Nomad objects, JobEvent, AllocEvent, EvalEvent, DeploymentEvent and NodeEvent. These events all contain a fully hydrated object for the event.
Event Creation
Events are sourced through FSM requests. During the request a transactions changes are tracked and used to create events.
/v1/event/stream
A new
/v1/event/stream
api is added to consume events from. The output will be streamed as new line delimted JSON (ndjson.org). An example request to stream all deployment events would beMore API documentation to follow in a separate PR.
Durability
Currently events are held in memory for 1 hour and the configuration option for how many to store is hardcoded to 250. Eventually a configurable amount of messages to hold in memory and a configurable amount of messages to store in go-memdb for durability will be exposed.