Skip to content

Commit

Permalink
update models to use the events updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanfaerman committed Jan 31, 2024
1 parent e115a1c commit 73801a1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 49 deletions.
71 changes: 54 additions & 17 deletions internal/models/event.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
package models

import (
"bytes"
"context"
"encoding/gob"
"time"

"github.com/ryanfaerman/netctl/internal/dao"
"github.com/ryanfaerman/netctl/internal/events"
)

type Event struct {
ID int64
Event any
At time.Time
Name string
StreamID string
ID int64
AccountID int64
Name string
Event any
}

// FindEventsForStreams returns a stream of events for the given streamIDs.
func FindEventsForStreams(ctx context.Context, streamIDs ...string) (EventStream, error) {
if len(streamIDs) == 0 {
return nil, nil
}

raws, err := global.dao.GetEventsForStreams(ctx, streamIDs)
if err != nil {
global.log.Error("unable to get events for streams", "error", err, "streams", streamIDs)
Expand All @@ -26,11 +32,9 @@ func FindEventsForStreams(ctx context.Context, streamIDs ...string) (EventStream
stream := make(EventStream, len(raws))

for i, raw := range raws {
decoder := gob.NewDecoder(bytes.NewReader(raw.EventData))
var p any
if err := decoder.Decode(&p); err != nil {
global.log.Error("unable to decode event", "error", err)
return stream, err
e, err := events.Decode(raw.EventType, raw.EventData)
if err != nil {
return EventStream{}, err
}

stream[i] = Event{
Expand All @@ -39,19 +43,53 @@ func FindEventsForStreams(ctx context.Context, streamIDs ...string) (EventStream
StreamID: raw.StreamID,
AccountID: raw.AccountID,
Name: raw.EventType,
Event: p,
Event: e,
}
}

return stream, nil
}

// FindEventsForCallsign returns a stream of events for the given callsign and event type.
func FindEventsForCallsign(eventType string, callsign string) (EventStream, error) {
l := global.log.With("callsign", callsign, "event_type", eventType)
raws, err := global.dao.GetEventsForCallsign(context.Background(), dao.GetEventsForCallsignParams{
EventType: eventType,
Callsign: []byte(callsign),
})
if err != nil {
l.Error("unable to get events for callsign")
return nil, err
}

stream := make(EventStream, len(raws))

for i, raw := range raws {
e, err := events.Decode(raw.EventType, raw.EventData)
if err != nil {
return EventStream{}, err
}

stream[i] = Event{
ID: raw.ID,
At: raw.Created,
StreamID: raw.StreamID,
AccountID: raw.AccountID,
Name: raw.EventType,
Event: e,
}
}

return stream, nil
}

type RecoveredEvent struct {
Event Event
RegisteredFn string
Event Event
ID int64
}

// FindRecoverableEvents returns a stream of events that have been registered for recovery.
func FindRecoverableEvents(ctx context.Context) ([]RecoveredEvent, error) {
raws, err := global.dao.GetRecoverableEvents(ctx)
if err != nil {
Expand All @@ -61,10 +99,8 @@ func FindRecoverableEvents(ctx context.Context) ([]RecoveredEvent, error) {
stream := make([]RecoveredEvent, len(raws))

for i, raw := range raws {
decoder := gob.NewDecoder(bytes.NewReader(raw.EventData))
var p any
if err := decoder.Decode(&p); err != nil {
global.log.Error("unable to decode event", "error", err)
e, err := events.Decode(raw.EventType, raw.EventData)
if err != nil {
return stream, err
}

Expand All @@ -77,13 +113,14 @@ func FindRecoverableEvents(ctx context.Context) ([]RecoveredEvent, error) {
StreamID: raw.StreamID,
AccountID: raw.AccountID,
Name: raw.EventType,
Event: p,
Event: e,
},
}
}
return stream, nil
}

// An EventStream is a stream of events.
type EventStream []Event

func (es EventStream) FilterForStream(streamID string) EventStream {
Expand Down
47 changes: 15 additions & 32 deletions internal/models/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
)

type Net struct {
Name string `validate:"required"`
Sessions map[string]*NetSession
SessionIDs []string
ID int64
StreamID string
Name string `validate:"required" json:"name"`
StreamID string `json:"stream_id"`
Sessions map[string]*NetSession `json:"sessions"`
SessionIDs []string `json:"session_ids"`
ID int64 `json:"id"`
}

func NewNet(name string) *Net {
Expand Down Expand Up @@ -128,24 +128,6 @@ func FindNetBySessionID(ctx context.Context, sessionID string) (*Net, error) {
return m, nil
}

// func (m *Net) AddSession(ctx context.Context) (*NetSession, error) {
// streamID := ulid.Make().String()
// session := &NetSession{
// ID: streamID,
// }
//
// _, err := global.dao.CreateNetSessionAndReturnId(ctx, dao.CreateNetSessionAndReturnIdParams{
// NetID: m.ID,
// StreamID: streamID,
// })
// if err != nil {
// return nil, err
// }
//
// m.Sessions[streamID] = session
// return session, nil
// }

func (n *Net) Events(ctx context.Context, onlyStreams ...string) (EventStream, error) {
if len(onlyStreams) == 0 {
streamIDs := make([]string, 0, len(n.Sessions))
Expand All @@ -170,11 +152,12 @@ func (m *Net) Replay(ctx context.Context, onlyStreams ...string) error {
return nil
}

// replay the given event stream, vivifying the model
func (m *Net) replay(stream EventStream) {
for _, event := range stream {
eventMachine:
switch e := event.Event.(type) {
case events.NetSessionScheduled:
case *events.NetSessionScheduled:
// if any periods exist, ignore
// otherwise, create a new one in the future
session := m.Sessions[event.StreamID]
Expand All @@ -186,7 +169,7 @@ func (m *Net) replay(stream EventStream) {
Scheduled: true,
})

case events.NetSessionOpened:
case *events.NetSessionOpened:
session := m.Sessions[event.StreamID]

// if no periods exist, create a new one
Expand Down Expand Up @@ -214,7 +197,7 @@ func (m *Net) replay(stream EventStream) {
break eventMachine
}

case events.NetSessionClosed:
case *events.NetSessionClosed:
// if no periods exist, ignore
// if the last period is open, close it
// if the last period is closed, ignore
Expand All @@ -226,12 +209,12 @@ func (m *Net) replay(stream EventStream) {
session.Periods[len(session.Periods)-1].ClosedAt = event.At
}

case events.NetCheckinHeard:
case *events.NetCheckinHeard:
session := m.Sessions[event.StreamID]
// if the checkin is not in the session, add it
// if the checkin is in the session, reset it
for i, checkin := range session.Checkins {
if checkin.ID == e.ID || strings.ToUpper(checkin.Callsign.AsHeard) == strings.ToUpper(e.Callsign) {
if checkin.ID == e.ID || strings.EqualFold(checkin.Callsign.AsHeard, e.Callsign) {
session.Checkins[i].Acked = false
// session.Checkins[i].Verified = false
// session.Checkins[i].Valid = nil
Expand All @@ -248,7 +231,7 @@ func (m *Net) replay(stream EventStream) {
At: event.At,
})

case events.NetCheckinVerified:
case *events.NetCheckinVerified:
// set the verified flag to true
// if the verification has no errors, set the valid flag to nil
// if the verification has an error, set the valid flag to the error
Expand Down Expand Up @@ -282,7 +265,7 @@ func (m *Net) replay(stream EventStream) {
break eventMachine
}
}
case events.NetCheckinAcked:
case *events.NetCheckinAcked:
// set the acked flag to true
session := m.Sessions[event.StreamID]
for i, checkin := range session.Checkins {
Expand All @@ -291,7 +274,7 @@ func (m *Net) replay(stream EventStream) {
break eventMachine
}
}
case events.NetCheckinCorrected:
case *events.NetCheckinCorrected:
// find the checkin and update it
// mark as invalidated
session := m.Sessions[event.StreamID]
Expand All @@ -311,7 +294,7 @@ func (m *Net) replay(stream EventStream) {
break eventMachine
}
}
case events.NetCheckinRevoked:
case *events.NetCheckinRevoked:
// find the checkin and remove it
session := m.Sessions[event.StreamID]
for i, checkin := range session.Checkins {
Expand Down

0 comments on commit 73801a1

Please sign in to comment.