This repository has been archived by the owner on May 7, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* extract background workers * extract handlers * avoid sleeping in tests * remove fast_http adapter * send len of jobs queue to DD
- Loading branch information
Showing
18 changed files
with
317 additions
and
307 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package actions | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/scoiatael/archai/simplejson" | ||
) | ||
|
||
type BulkWriteJob struct { | ||
Schema []interface{} `json:"schema"` | ||
Objects []interface{} `json:"data"` | ||
Stream string `json:"stream"` | ||
} | ||
|
||
func (wj BulkWriteJob) MarshalJSON() ([]byte, error) { | ||
return json.Marshal(map[string]interface{}{"schema": wj.Schema, | ||
"data": wj.Objects, | ||
"stream": wj.Stream, | ||
}) | ||
} | ||
|
||
func makeObjectWithSchema(obj interface{}, schema []interface{}) (simplejson.Object, error) { | ||
object_with_schema := make(simplejson.Object) | ||
object, conv := obj.([]interface{}) | ||
if !conv { | ||
return object_with_schema, fmt.Errorf("Failed to convert obj to array") | ||
} | ||
for j, name := range schema { | ||
name, conv := name.(string) | ||
if !conv { | ||
return object_with_schema, fmt.Errorf("%d: Failed to convert schema value to string", j) | ||
} | ||
if len(object) <= j { | ||
return object_with_schema, fmt.Errorf("%d: Not enough values", j) | ||
} | ||
object_with_schema[name] = object[j] | ||
} | ||
return object_with_schema, nil | ||
} | ||
|
||
func (wj BulkWriteJob) Run(c Context) error { | ||
c.Telemetry().Incr("bulk_write.aggregate.attempt", []string{"stream:" + wj.Stream}) | ||
for i, obj := range wj.Objects { | ||
object, err := makeObjectWithSchema(obj, wj.Schema) | ||
if err != nil { | ||
return errors.Wrap(err, fmt.Sprintf("HTTP server splitting payload to bulk_write event at %d", i)) | ||
} | ||
payload, err := json.Marshal(object) | ||
if err != nil { | ||
return errors.Wrap(err, fmt.Sprintf("HTTP server marshalling payload to bulk_write event at %d", i)) | ||
} | ||
if err := persistEvent(wj.Stream, payload, "http; bulk_write_job", c); err != nil { | ||
return errors.Wrap(err, "HTTP server bulk_writing events") | ||
} | ||
c.Telemetry().Incr("write", []string{"stream:" + wj.Stream}) | ||
} | ||
c.Telemetry().Incr("bulk_write.aggregate.write", []string{"stream:" + wj.Stream}) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package handlers | ||
|
||
import ( | ||
"github.com/scoiatael/archai/telemetry" | ||
"github.com/scoiatael/archai/types" | ||
) | ||
|
||
type Context interface { | ||
ReadEvents(string, string, int) ([]types.Event, error) | ||
ListStreams() ([]string, error) | ||
Telemetry() telemetry.Datadog | ||
} | ||
|
||
type Handler struct { | ||
Context | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package handlers | ||
|
||
import ( | ||
"github.com/pkg/errors" | ||
"github.com/scoiatael/archai/http" | ||
"github.com/scoiatael/archai/simplejson" | ||
"github.com/scoiatael/archai/types" | ||
) | ||
|
||
func serializeEvents(events []types.Event) (simplejson.Object, error) { | ||
root := make(simplejson.Object) | ||
results := make([]simplejson.Object, len(events)) | ||
cursor := make(simplejson.Object) | ||
for i, ev := range events { | ||
payload, err := simplejson.Read(ev.Blob) | ||
if err != nil { | ||
return root, errors.Wrap(err, "HTTP server marshalling response with read events") | ||
} | ||
results[i] = payload | ||
cursor["next"] = ev.ID | ||
} | ||
root["results"] = results | ||
root["cursor"] = cursor | ||
return root, nil | ||
} | ||
|
||
func (gs Handler) GetStream(ctx http.GetContext) { | ||
stream := ctx.GetSegment("id") | ||
events, err := gs.Context.ReadEvents( | ||
stream, | ||
ctx.StringParam("cursor"), | ||
ctx.IntParam("amount", 10), | ||
) | ||
if err != nil { | ||
ctx.ServerErr(errors.Wrap(err, "GetStream Handle ReadEvents")) | ||
return | ||
} | ||
json, err := serializeEvents(events) | ||
if err != nil { | ||
ctx.ServerErr(err) | ||
} | ||
gs.Context.Telemetry().Incr("read", []string{"stream:" + stream}) | ||
ctx.SendJson(json) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package handlers | ||
|
||
import ( | ||
"github.com/pkg/errors" | ||
"github.com/scoiatael/archai/http" | ||
"github.com/scoiatael/archai/simplejson" | ||
) | ||
|
||
func (gs Handler) GetStreams(ctx http.GetContext) { | ||
streams, err := gs.Context.ListStreams() | ||
if err != nil { | ||
ctx.ServerErr(errors.Wrap(err, "GetStreams Handle .ListStreams")) | ||
return | ||
} | ||
view := make(simplejson.Object) | ||
view["streams"] = streams | ||
ctx.SendJson(view) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.