Skip to content

Commit

Permalink
Merge branch 'develop' into nasdf/issue/3160
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf authored Oct 24, 2024
2 parents 5ad82e0 + c1fcde0 commit e3388e2
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 75 deletions.
6 changes: 3 additions & 3 deletions cli/p2p_replicator_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ Example:
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
return err
}
rep := client.Replicator{
Info: info,
Schemas: collections,
rep := client.ReplicatorParams{
Info: info,
Collections: collections,
}
return p2p.DeleteReplicator(cmd.Context(), rep)
},
Expand Down
6 changes: 3 additions & 3 deletions cli/p2p_replicator_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ Example:
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
return err
}
rep := client.Replicator{
Info: info,
Schemas: collections,
rep := client.ReplicatorParams{
Info: info,
Collections: collections,
}
return p2p.SetReplicator(cmd.Context(), rep)
},
Expand Down
24 changes: 12 additions & 12 deletions client/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions client/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ type P2P interface {

// SetReplicator adds a replicator to the persisted list or adds
// schemas if the replicator already exists.
SetReplicator(ctx context.Context, rep Replicator) error
SetReplicator(ctx context.Context, rep ReplicatorParams) error
// DeleteReplicator deletes a replicator from the persisted list
// or specific schemas if they are specified.
DeleteReplicator(ctx context.Context, rep Replicator) error
DeleteReplicator(ctx context.Context, rep ReplicatorParams) error
// GetAllReplicators returns the full list of replicators with their
// subscribed schemas.
GetAllReplicators(ctx context.Context) ([]Replicator, error)
Expand Down
8 changes: 8 additions & 0 deletions client/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

// ReplicatorParams contains the replicator fields that can be modified by the user.
type ReplicatorParams struct {
// Info is the address of the peer to replicate to.
Info peer.AddrInfo
// Collections is the list of collection names to replicate.
Collections []string
}

// Replicator is a peer that a set of local collections are replicated to.
type Replicator struct {
Info peer.AddrInfo
Expand Down
27 changes: 25 additions & 2 deletions docs/website/references/http/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,29 @@
},
"type": "object"
},
"replicator_params": {
"properties": {
"Collections": {
"items": {
"type": "string"
},
"type": "array"
},
"Info": {
"properties": {
"Addrs": {
"items": {},
"type": "array"
},
"ID": {
"type": "string"
}
},
"type": "object"
}
},
"type": "object"
},
"schema": {
"properties": {
"Fields": {
Expand Down Expand Up @@ -1803,7 +1826,7 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/replicator"
"$ref": "#/components/schemas/replicator_params"
}
}
},
Expand Down Expand Up @@ -1859,7 +1882,7 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/replicator"
"$ref": "#/components/schemas/replicator_params"
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions http/client_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Client) PeerInfo() peer.AddrInfo {
return res
}

func (c *Client) SetReplicator(ctx context.Context, rep client.Replicator) error {
func (c *Client) SetReplicator(ctx context.Context, rep client.ReplicatorParams) error {
methodURL := c.http.baseURL.JoinPath("p2p", "replicators")

body, err := json.Marshal(rep)
Expand All @@ -50,7 +50,7 @@ func (c *Client) SetReplicator(ctx context.Context, rep client.Replicator) error
return err
}

func (c *Client) DeleteReplicator(ctx context.Context, rep client.Replicator) error {
func (c *Client) DeleteReplicator(ctx context.Context, rep client.ReplicatorParams) error {
methodURL := c.http.baseURL.JoinPath("p2p", "replicators")

body, err := json.Marshal(rep)
Expand Down
9 changes: 6 additions & 3 deletions http/handler_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *p2pHandler) SetReplicator(rw http.ResponseWriter, req *http.Request) {
return
}

var rep client.Replicator
var rep client.ReplicatorParams
if err := requestJSON(req, &rep); err != nil {
responseJSON(rw, http.StatusBadRequest, errorResponse{err})
return
Expand All @@ -56,7 +56,7 @@ func (s *p2pHandler) DeleteReplicator(rw http.ResponseWriter, req *http.Request)
return
}

var rep client.Replicator
var rep client.ReplicatorParams
if err := requestJSON(req, &rep); err != nil {
responseJSON(rw, http.StatusBadRequest, errorResponse{err})
return
Expand Down Expand Up @@ -152,6 +152,9 @@ func (h *p2pHandler) bindRoutes(router *Router) {
replicatorSchema := &openapi3.SchemaRef{
Ref: "#/components/schemas/replicator",
}
replicatorParamsSchema := &openapi3.SchemaRef{
Ref: "#/components/schemas/replicator_params",
}

peerInfoResponse := openapi3.NewResponse().
WithDescription("Peer network info").
Expand All @@ -178,7 +181,7 @@ func (h *p2pHandler) bindRoutes(router *Router) {

replicatorRequest := openapi3.NewRequestBody().
WithRequired(true).
WithContent(openapi3.NewContentWithJSONSchemaRef(replicatorSchema))
WithContent(openapi3.NewContentWithJSONSchemaRef(replicatorParamsSchema))

setReplicator := openapi3.NewOperation()
setReplicator.Description = "Add peer replicators"
Expand Down
1 change: 1 addition & 0 deletions http/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var openApiSchemas = map[string]any{
"update_result": &client.UpdateResult{},
"lens_config": &client.LensConfig{},
"replicator": &client.Replicator{},
"replicator_params": &client.ReplicatorParams{},
"ccip_request": &CCIPRequest{},
"ccip_response": &CCIPResponse{},
"patch_schema_request": &patchSchemaRequest{},
Expand Down
14 changes: 7 additions & 7 deletions internal/db/p2p_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
retryTimeout = 10 * time.Second
)

func (db *db) SetReplicator(ctx context.Context, rep client.Replicator) error {
func (db *db) SetReplicator(ctx context.Context, rep client.ReplicatorParams) error {
txn, err := db.NewTxn(ctx, false)
if err != nil {
return err
Expand Down Expand Up @@ -85,9 +85,9 @@ func (db *db) SetReplicator(ctx context.Context, rep client.Replicator) error {

var collections []client.Collection
switch {
case len(rep.Schemas) > 0:
case len(rep.Collections) > 0:
// if specific collections are chosen get them by name
for _, name := range rep.Schemas {
for _, name := range rep.Collections {
col, err := db.GetCollectionByName(ctx, name)
if err != nil {
return NewErrReplicatorCollections(err)
Expand Down Expand Up @@ -210,7 +210,7 @@ func (db *db) getDocsHeads(
return updateChan
}

func (db *db) DeleteReplicator(ctx context.Context, rep client.Replicator) error {
func (db *db) DeleteReplicator(ctx context.Context, rep client.ReplicatorParams) error {
txn, err := db.NewTxn(ctx, false)
if err != nil {
return err
Expand Down Expand Up @@ -247,9 +247,9 @@ func (db *db) DeleteReplicator(ctx context.Context, rep client.Replicator) error
}

var collections []client.Collection
if len(rep.Schemas) > 0 {
if len(rep.Collections) > 0 {
// if specific collections are chosen get them by name
for _, name := range rep.Schemas {
for _, name := range rep.Collections {
col, err := db.GetCollectionByName(ctx, name)
if err != nil {
return NewErrReplicatorCollections(err)
Expand Down Expand Up @@ -277,7 +277,7 @@ func (db *db) DeleteReplicator(ctx context.Context, rep client.Replicator) error

// Persist the replicator to the store, deleting it if no schemas remain
key := core.NewReplicatorKey(rep.Info.ID.String())
if len(rep.Schemas) == 0 {
if len(rep.Collections) == 0 {
err := txn.Peerstore().Delete(ctx, key.ToDS())
if err != nil {
return err
Expand Down
Loading

0 comments on commit e3388e2

Please sign in to comment.