Skip to content

Commit

Permalink
add new API for accessing the aggregated events endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
spikelu2016 committed Mar 25, 2024
1 parent e70bf20 commit 3ced2a3
Show file tree
Hide file tree
Showing 5 changed files with 401 additions and 138 deletions.
20 changes: 20 additions & 0 deletions cmd/bricksllm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ func main() {
log.Sugar().Fatalf("error creating policies table: %v", err)
}

err = store.CreateEventsByDayTable()
if err != nil {
log.Sugar().Fatalf("error creating event aggregated by day table: %v", err)
}

err = store.CreateUniqueIndexForEventsTable()
if err != nil {
log.Sugar().Fatalf("error creating unique index for event aggregated by day table: %v", err)
}

err = store.CreateTimeStampIndexForEventsTable()
if err != nil {
log.Sugar().Fatalf("error creating time stamp index for event aggregated by day table: %v", err)
}

err = store.CreateKeyIdIndexForEventsTable()
if err != nil {
log.Sugar().Fatalf("error creating key id index for event aggregated by day table: %v", err)
}

memStore, err := memdb.NewMemDb(store, log, cfg.InMemoryDbUpdateInterval)
if err != nil {
log.Sugar().Fatalf("cannot initialize memdb: %v", err)
Expand Down
12 changes: 12 additions & 0 deletions internal/manager/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type eventStorage interface {
GetEvents(userId, customId string, keyIds []string, start, end int64) ([]*event.Event, error)
GetEventDataPoints(start, end, increment int64, tags, keyIds, customIds, userIds []string, filters []string) ([]*event.DataPoint, error)
GetLatencyPercentiles(start, end int64, tags, keyIds []string) ([]float64, error)
GetAggregatedEventByDayDataPoints(start, end int64, keyIds []string) ([]*event.DataPoint, error)
}

type ReportingManager struct {
Expand Down Expand Up @@ -56,6 +57,17 @@ func (rm *ReportingManager) GetEventReporting(e *event.ReportingRequest) (*event
}, nil
}

func (rm *ReportingManager) GetAggregatedEventByDayReporting(e *event.ReportingRequest) (*event.ReportingResponse, error) {
dataPoints, err := rm.es.GetAggregatedEventByDayDataPoints(e.Start, e.End, e.KeyIds)
if err != nil {
return nil, err
}

return &event.ReportingResponse{
DataPoints: dataPoints,
}, nil
}

func (rm *ReportingManager) GetKeyReporting(keyId string) (*key.KeyReporting, error) {
k, err := rm.ks.GetKey(keyId)
if err != nil {
Expand Down
101 changes: 101 additions & 0 deletions internal/server/web/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type KeyReportingManager interface {
GetKeyReporting(keyId string) (*key.KeyReporting, error)
GetEvents(userId, customId string, keyIds []string, start int64, end int64) ([]*event.Event, error)
GetEventReporting(e *event.ReportingRequest) (*event.ReportingResponse, error)
GetAggregatedEventByDayReporting(e *event.ReportingRequest) (*event.ReportingResponse, error)
}

type PoliciesManager interface {
Expand Down Expand Up @@ -78,6 +79,7 @@ func NewAdminServer(log *zap.Logger, mode string, m KeyManager, krm KeyReporting

router.GET("/api/reporting/keys/:id", getGetKeyReportingHandler(krm, log, prod))
router.POST("/api/reporting/events", getGetEventMetricsHandler(krm, log, prod))
router.POST("/api/reporting/events-by-day", getGetEventMetricsByDayHandler(krm, log, prod))
router.GET("/api/events", getGetEventsHandler(krm, log, prod))

router.PUT("/api/provider-settings", getCreateProviderSettingHandler(psm, log, prod))
Expand Down Expand Up @@ -723,6 +725,18 @@ func validateEventReportingRequest(r *event.ReportingRequest) bool {
return true
}

func validateEventReportingByDayRequest(r *event.ReportingRequest) bool {
if r.Start == 0 || r.End == 0 {
return false
}

if r.Start >= r.End {
return false
}

return true
}

func getGetEventMetricsHandler(m KeyReportingManager, log *zap.Logger, prod bool) gin.HandlerFunc {
return func(c *gin.Context) {
stats.Incr("bricksllm.admin.get_get_event_metrics.requests", nil, 1)
Expand Down Expand Up @@ -810,6 +824,93 @@ func getGetEventMetricsHandler(m KeyReportingManager, log *zap.Logger, prod bool
}
}

func getGetEventMetricsByDayHandler(m KeyReportingManager, log *zap.Logger, prod bool) gin.HandlerFunc {
return func(c *gin.Context) {
stats.Incr("bricksllm.admin.get_get_event_metrics_by_day.requests", nil, 1)

start := time.Now()
defer func() {
dur := time.Since(start)
stats.Timing("bricksllm.admin.get_get_event_metrics_by_day.latency", dur, nil, 1)
}()

path := "/api/reporting/events-by-day"

if c == nil || c.Request == nil {
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/empty-context",
Title: "context is empty error",
Status: http.StatusInternalServerError,
Detail: "gin context is empty",
Instance: path,
})
return
}

cid := c.GetString(correlationId)
data, err := io.ReadAll(c.Request.Body)
if err != nil {
logError(log, "error when reading event by day reporting request body", prod, cid, err)
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/request-body-read",
Title: "request body reader error",
Status: http.StatusInternalServerError,
Detail: err.Error(),
Instance: path,
})
return
}

request := &event.ReportingRequest{}
err = json.Unmarshal(data, request)
if err != nil {
logError(log, "error when unmarshalling event by day reporting request body", prod, cid, err)
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/json-unmarshal",
Title: "json unmarshaller error",
Status: http.StatusInternalServerError,
Detail: err.Error(),
Instance: path,
})
return
}

if !validateEventReportingByDayRequest(request) {
stats.Incr("bricksllm.admin.get_get_event_metrics_by_day.request_not_valid", nil, 1)

err = fmt.Errorf("event reporting request %+v is not valid", request)
logError(log, "invalid reporting request", prod, cid, err)
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/invalid-reporting-request",
Title: "invalid reporting request",
Status: http.StatusBadRequest,
Detail: err.Error(),
Instance: path,
})
return
}

reportingResponse, err := m.GetAggregatedEventByDayReporting(request)
if err != nil {
stats.Incr("bricksllm.admin.get_get_event_metrics_by_day.get_aggregated_event_by_day_reporting", nil, 1)

logError(log, "error when getting event by day reporting", prod, cid, err)
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/event-reporting-manager",
Title: "event reporting error",
Status: http.StatusInternalServerError,
Detail: err.Error(),
Instance: path,
})
return
}

stats.Incr("bricksllm.admin.get_get_event_metrics_by_day.success", nil, 1)

c.JSON(http.StatusOK, reportingResponse)
}
}

func getGetEventsHandler(m KeyReportingManager, log *zap.Logger, prod bool) gin.HandlerFunc {
return func(c *gin.Context) {
stats.Incr("bricksllm.admin.get_get_events_handler.requests", nil, 1)
Expand Down
Loading

0 comments on commit 3ced2a3

Please sign in to comment.