Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor the dynamic json configs for api_keys and external_backends #2055

Merged
merged 3 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cli

import (
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -65,6 +64,7 @@ func (r *RunCMD) Run(ctx *Context) error {
config.WithAudioDir(r.AudioPath),
config.WithUploadDir(r.UploadPath),
config.WithConfigsDir(r.ConfigPath),
config.WithDynamicConfigDir(r.LocalaiConfigDir),
config.WithF16(r.F16),
config.WithStringGalleries(r.Galleries),
config.WithModelLibraryURL(r.RemoteLibrary),
Expand Down Expand Up @@ -134,17 +134,6 @@ func (r *RunCMD) Run(ctx *Context) error {
return fmt.Errorf("failed basic startup tasks with error %s", err.Error())
}

// Watch the configuration directory
// If the directory does not exist, we don't watch it
if _, err := os.Stat(r.LocalaiConfigDir); err == nil {
closeConfigWatcherFn, err := startup.WatchConfigDirectory(r.LocalaiConfigDir, options)
defer closeConfigWatcherFn()

if err != nil {
return fmt.Errorf("failed while watching configuration directory %s", r.LocalaiConfigDir)
}
}

appHTTP, err := http.App(cl, ml, options)
if err != nil {
log.Error().Err(err).Msg("error during HTTP App construction")
Expand Down
7 changes: 7 additions & 0 deletions core/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ApplicationConfig struct {
AudioDir string
UploadDir string
ConfigsDir string
DynamicConfigsDir string
CORS bool
PreloadJSONModels string
PreloadModelsFromPath string
Expand Down Expand Up @@ -264,6 +265,12 @@ func WithConfigsDir(configsDir string) AppOption {
}
}

func WithDynamicConfigDir(dynamicConfigsDir string) AppOption {
return func(o *ApplicationConfig) {
o.DynamicConfigsDir = dynamicConfigsDir
}
}

func WithApiKeys(apiKeys []string) AppOption {
return func(o *ApplicationConfig) {
o.ApiKeys = apiKeys
Expand Down
156 changes: 105 additions & 51 deletions core/startup/config_file_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,89 +12,143 @@ import (
"github.com/rs/zerolog/log"
)

type WatchConfigDirectoryCloser func() error

func ReadApiKeysJson(configDir string, appConfig *config.ApplicationConfig) error {
fileContent, err := os.ReadFile(path.Join(configDir, "api_keys.json"))
if err == nil {
// Parse JSON content from the file
var fileKeys []string
err := json.Unmarshal(fileContent, &fileKeys)
if err == nil {
appConfig.ApiKeys = append(appConfig.ApiKeys, fileKeys...)
return nil
}
return err
}
return err
type fileHandler func(fileContent []byte, appConfig *config.ApplicationConfig) error

type configFileHandler struct {
handlers map[string]fileHandler

watcher *fsnotify.Watcher

configDir string
appConfig *config.ApplicationConfig
}

func ReadExternalBackendsJson(configDir string, appConfig *config.ApplicationConfig) error {
fileContent, err := os.ReadFile(path.Join(configDir, "external_backends.json"))
if err != nil {
return err
// TODO: This should be a singleton eventually so other parts of the code can register config file handlers,
// then we can export it to other packages
func newConfigFileHandler(appConfig *config.ApplicationConfig) configFileHandler {
c := configFileHandler{
handlers: make(map[string]fileHandler),
configDir: appConfig.DynamicConfigsDir,
appConfig: appConfig,
}
// Parse JSON content from the file
var fileBackends map[string]string
err = json.Unmarshal(fileContent, &fileBackends)
if err != nil {
return err
c.Register("api_keys.json", readApiKeysJson(*appConfig), true)
c.Register("external_backends.json", readExternalBackendsJson(*appConfig), true)
return c
}

func (c *configFileHandler) Register(filename string, handler fileHandler, runNow bool) error {
_, ok := c.handlers[filename]
if ok {
return fmt.Errorf("handler already registered for file %s", filename)
}
err = mergo.Merge(&appConfig.ExternalGRPCBackends, fileBackends)
if err != nil {
return err
c.handlers[filename] = handler
if runNow {
c.callHandler(path.Join(c.appConfig.DynamicConfigsDir, filename), handler)
}
return nil
}

var CONFIG_FILE_UPDATES = map[string]func(configDir string, appConfig *config.ApplicationConfig) error{
"api_keys.json": ReadApiKeysJson,
"external_backends.json": ReadExternalBackendsJson,
}
func (c *configFileHandler) callHandler(filename string, handler fileHandler) {
fileContent, err := os.ReadFile(filename)
if err != nil && !os.IsNotExist(err) {
log.Error().Err(err).Str("filename", filename).Msg("could not read file")
}

func WatchConfigDirectory(configDir string, appConfig *config.ApplicationConfig) (WatchConfigDirectoryCloser, error) {
if len(configDir) == 0 {
return nil, fmt.Errorf("configDir blank")
if err = handler(fileContent, c.appConfig); err != nil {
log.Error().Err(err).Msg("WatchConfigDirectory goroutine failed to update options")
}
}

func (c *configFileHandler) Watch() error {
configWatcher, err := fsnotify.NewWatcher()
c.watcher = configWatcher
if err != nil {
log.Fatal().Msgf("Unable to create a watcher for the LocalAI Configuration Directory: %+v", err)
}
ret := func() error {
configWatcher.Close()
return nil
log.Fatal().Err(err).Str("configdir", c.configDir).Msg("wnable to create a watcher for configuration directory")
}

// Start listening for events.
go func() {
for {
select {
case event, ok := <-configWatcher.Events:
case event, ok := <-c.watcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Write) {
for targetName, watchFn := range CONFIG_FILE_UPDATES {
if event.Name == targetName {
err := watchFn(configDir, appConfig)
log.Warn().Msgf("WatchConfigDirectory goroutine for %s: failed to update options: %+v", targetName, err)
}
if event.Has(fsnotify.Write | fsnotify.Create | fsnotify.Remove) {
handler, ok := c.handlers[path.Base(event.Name)]
if !ok {
continue
}

c.callHandler(event.Name, handler)
}
case _, ok := <-configWatcher.Errors:
case err, ok := <-c.watcher.Errors:
log.Error().Err(err).Msg("config watcher error received")
if !ok {
return
}
log.Error().Err(err).Msg("error encountered while watching config directory")
}
}
}()

// Add a path.
err = configWatcher.Add(configDir)
err = c.watcher.Add(c.appConfig.DynamicConfigsDir)
if err != nil {
return ret, fmt.Errorf("unable to establish watch on the LocalAI Configuration Directory: %+v", err)
return fmt.Errorf("unable to establish watch on the LocalAI Configuration Directory: %+v", err)
}

return ret, nil
return nil
}

// TODO: When we institute graceful shutdown, this should be called
func (c *configFileHandler) Stop() {
c.watcher.Close()
}

func readApiKeysJson(startupAppConfig config.ApplicationConfig) fileHandler {
handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error {
log.Debug().Msg("processing api_keys.json")

if len(fileContent) > 0 {
// Parse JSON content from the file
var fileKeys []string
err := json.Unmarshal(fileContent, &fileKeys)
if err != nil {
return err
}

appConfig.ApiKeys = append(startupAppConfig.ApiKeys, fileKeys...)
} else {
appConfig.ApiKeys = startupAppConfig.ApiKeys
}
log.Debug().Msg("api keys loaded from api_keys.json")
return nil
}

return handler
}

func readExternalBackendsJson(startupAppConfig config.ApplicationConfig) fileHandler {
handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error {
log.Debug().Msg("processing external_backends.json")

if len(fileContent) > 0 {
// Parse JSON content from the file
var fileBackends map[string]string
err := json.Unmarshal(fileContent, &fileBackends)
if err != nil {
return err
}
appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends
err = mergo.Merge(&appConfig.ExternalGRPCBackends, &fileBackends)
if err != nil {
return err
}
} else {
appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends
}
log.Debug().Msg("external backends loaded from external_backends.json")
return nil
}
return handler
}
5 changes: 5 additions & 0 deletions core/startup/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func Startup(opts ...config.AppOption) (*config.BackendConfigLoader, *model.Mode
}()
}

// Watch the configuration directory
// If the directory does not exist, we don't watch it
configHandler := newConfigFileHandler(options)
configHandler.Watch()

log.Info().Msg("core/startup process completed!")
return cl, ml, options, nil
}