-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[RBR-1591] expose private model schema to be used by shared services (#…
…121) * [RBR-1591] expose private model schema to be used by shared services * add test for API Fetcher * use a result
- Loading branch information
Jeff Haynie
authored
Sep 24, 2024
1 parent
8ae0ebc
commit 88bba80
Showing
3 changed files
with
246 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
package schema | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/url" | ||
"path" | ||
"sync" | ||
"time" | ||
) | ||
|
||
const defaultExpireAfter = 30 * time.Minute | ||
|
||
type Cache struct { | ||
TTL int `json:"ttl"` | ||
} | ||
|
||
type Changefeed struct { | ||
AppendFieldsToSubject []string `json:"appendFieldsToSubject"` | ||
PartitionKeys []string `json:"partitionKeys"` | ||
} | ||
|
||
type Model struct { | ||
Cache *Cache `json:"cache"` | ||
Changefeed *Changefeed `json:"changefeed"` | ||
ModelVersion string `json:"modelVersion"` | ||
Public bool `json:"public"` | ||
} | ||
|
||
type Result struct { | ||
Success bool `json:"success"` | ||
Model *Model `json:"data"` | ||
} | ||
|
||
type Fetcher interface { | ||
FetchTable(ctx context.Context, table string) (io.ReadCloser, error) | ||
} | ||
|
||
type ModelRegistry interface { | ||
Get(ctx context.Context, table string) (*Model, error) | ||
} | ||
|
||
type APIFetcher struct { | ||
URL string | ||
APIKey string | ||
} | ||
|
||
var _ Fetcher = (*APIFetcher)(nil) | ||
|
||
func (f *APIFetcher) FetchTable(ctx context.Context, table string) (io.ReadCloser, error) { | ||
u, err := url.Parse(f.URL) | ||
if err != nil { | ||
return nil, err | ||
} | ||
u.Path = path.Join("v3/schema/private/schema", table) | ||
u.RawQuery = url.Values{"apikey": {f.APIKey}}.Encode() | ||
|
||
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
resp, err := http.DefaultClient.Do(req) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
return nil, fmt.Errorf("failed to fetch model: %s", resp.Status) | ||
} | ||
|
||
return resp.Body, nil | ||
} | ||
|
||
// NewAPIFetcher creates a new API fetcher using the provided URL and API key. | ||
func NewAPIFetcher(url, apiKey string) *APIFetcher { | ||
return &APIFetcher{URL: url, APIKey: apiKey} | ||
} | ||
|
||
type modelCache struct { | ||
model *Model | ||
fetched time.Time | ||
} | ||
|
||
type modelRegistry struct { | ||
models map[string]*modelCache | ||
lock sync.Mutex | ||
fetcher Fetcher | ||
expireAfter time.Duration | ||
} | ||
|
||
var _ ModelRegistry = (*modelRegistry)(nil) | ||
|
||
func (r *modelRegistry) Get(ctx context.Context, table string) (*Model, error) { | ||
r.lock.Lock() | ||
defer r.lock.Unlock() | ||
|
||
if cache, ok := r.models[table]; ok { | ||
if time.Since(cache.fetched) < r.expireAfter { | ||
return cache.model, nil | ||
} | ||
} | ||
|
||
rc, err := r.fetcher.FetchTable(ctx, table) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer rc.Close() | ||
|
||
var res Result | ||
if err := json.NewDecoder(rc).Decode(&res); err != nil { | ||
return nil, err | ||
} | ||
|
||
if !res.Success { | ||
return nil, fmt.Errorf("failed to fetch model: %s", table) | ||
} | ||
|
||
item := &modelCache{ | ||
model: res.Model, | ||
fetched: time.Now(), | ||
} | ||
r.models[table] = item | ||
|
||
return item.model, nil | ||
} | ||
|
||
// NewModelRegistry creates a new model registry using the provided fetcher. | ||
func NewModelRegistry(fetcher Fetcher, opts ...WithOption) ModelRegistry { | ||
opt := &ModelRegistryOption{ | ||
ExpireAfter: defaultExpireAfter, | ||
} | ||
|
||
for _, fn := range opts { | ||
fn(opt) | ||
} | ||
|
||
return &modelRegistry{ | ||
models: make(map[string]*modelCache), | ||
fetcher: fetcher, | ||
expireAfter: opt.ExpireAfter, | ||
} | ||
} | ||
|
||
type ModelRegistryOption struct { | ||
ExpireAfter time.Duration | ||
} | ||
|
||
type WithOption func(opt *ModelRegistryOption) | ||
|
||
// WithExpireAfter sets the expire after duration for an item in the model registry. | ||
func WithExpireAfter(expireAfter time.Duration) WithOption { | ||
return func(opt *ModelRegistryOption) { | ||
opt.ExpireAfter = expireAfter | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package schema | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
cstr "github.com/shopmonkeyus/go-common/string" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
type testFetcher struct { | ||
reader io.ReadCloser | ||
err error | ||
called bool | ||
} | ||
|
||
func (t *testFetcher) FetchTable(ctx context.Context, table string) (io.ReadCloser, error) { | ||
t.called = true | ||
return t.reader, t.err | ||
} | ||
|
||
func TestNewModelRegistry(t *testing.T) { | ||
var m Model | ||
m.Public = true | ||
m.ModelVersion = "1234" | ||
fetcher := &testFetcher{io.NopCloser(bytes.NewReader([]byte(cstr.JSONStringify(Result{Success: true, Model: &m})))), nil, false} | ||
r := NewModelRegistry(fetcher) | ||
model, err := r.Get(context.Background(), "table") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
assert.True(t, fetcher.called) | ||
if model.Public != m.Public || model.ModelVersion != m.ModelVersion { | ||
t.Fatalf("expected model to be %v, got %v", m, model) | ||
} | ||
fetcher.called = false | ||
model, err = r.Get(context.Background(), "table") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
assert.False(t, fetcher.called) | ||
if model.Public != m.Public || model.ModelVersion != m.ModelVersion { | ||
t.Fatalf("expected model to be %v, got %v", m, model) | ||
} | ||
} | ||
|
||
func TestNewModelRegistryWithAPIFetcher(t *testing.T) { | ||
var m Model | ||
m.Public = true | ||
m.ModelVersion = "1234" | ||
var called bool | ||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
called = true | ||
assert.Equal(t, "/v3/schema/private/schema/table", r.URL.Path) | ||
if r.URL.Query().Get("apikey") != "test" { | ||
w.WriteHeader(http.StatusUnauthorized) | ||
return | ||
} | ||
w.Header().Set("Content-Type", "application/json") | ||
fmt.Fprintln(w, cstr.JSONStringify(Result{Success: true, Model: &m})) | ||
})) | ||
defer ts.Close() | ||
fetcher := NewAPIFetcher(ts.URL, "test") | ||
r := NewModelRegistry(fetcher) | ||
model, err := r.Get(context.Background(), "table") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
assert.True(t, called) | ||
if model.Public != m.Public || model.ModelVersion != m.ModelVersion { | ||
t.Fatalf("expected model to be %v, got %v", m, model) | ||
} | ||
called = false | ||
model, err = r.Get(context.Background(), "table") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
assert.False(t, called) | ||
if model.Public != m.Public || model.ModelVersion != m.ModelVersion { | ||
t.Fatalf("expected model to be %v, got %v", m, model) | ||
} | ||
} |