feat: track internally started models by ID (#3693)

* chore(refactor): track internally started models by ID

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Just extend options, no need to copy

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Improve debugging for rerankers failures

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Simplify model loading with rerankers

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Be more consistent when generating model options

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Uncommitted code

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Make deleteProcess more idiomatic

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Adapt CLI for sound generation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Fixup threads definition

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Handle corner case where c.Seed is nil

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Consistently use ModelOptions

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Adapt new code to refactoring

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Dave <dave@gray101.com>
This commit is contained in:
Ettore Di Giacinto 2024-10-02 08:55:58 +02:00 committed by GitHub
parent db704199dc
commit 0965c6cd68
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 169 additions and 185 deletions

View file

@ -268,10 +268,10 @@ func selectGRPCProcess(backend, assetDir string, f16 bool) string {
// starts the grpcModelProcess for the backend, and returns a grpc client
// It also loads the model
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string) (*Model, error) {
return func(modelName, modelFile string) (*Model, error) {
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string, string) (*Model, error) {
return func(modelID, modelName, modelFile string) (*Model, error) {
log.Debug().Msgf("Loading Model %s with gRPC (file: %s) (backend: %s): %+v", modelName, modelFile, backend, *o)
log.Debug().Msgf("Loading Model %s with gRPC (file: %s) (backend: %s): %+v", modelID, modelFile, backend, *o)
var client *Model
@ -304,7 +304,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
}
// Make sure the process is executable
process, err := ml.startProcess(uri, o.model, serverAddress)
process, err := ml.startProcess(uri, modelID, serverAddress)
if err != nil {
log.Error().Err(err).Str("path", uri).Msg("failed to launch ")
return nil, err
@ -312,11 +312,11 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
log.Debug().Msgf("GRPC Service Started")
client = NewModel(modelName, serverAddress, process)
client = NewModel(modelID, serverAddress, process)
} else {
log.Debug().Msg("external backend is uri")
// address
client = NewModel(modelName, uri, nil)
client = NewModel(modelID, uri, nil)
}
} else {
grpcProcess := backendPath(o.assetDir, backend)
@ -347,14 +347,14 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
args, grpcProcess = library.LoadLDSO(o.assetDir, args, grpcProcess)
// Make sure the process is executable in any circumstance
process, err := ml.startProcess(grpcProcess, o.model, serverAddress, args...)
process, err := ml.startProcess(grpcProcess, modelID, serverAddress, args...)
if err != nil {
return nil, err
}
log.Debug().Msgf("GRPC Service Started")
client = NewModel(modelName, serverAddress, process)
client = NewModel(modelID, serverAddress, process)
}
log.Debug().Msgf("Wait for the service to start up")
@ -407,11 +407,7 @@ func (ml *ModelLoader) ListAvailableBackends(assetdir string) ([]string, error)
func (ml *ModelLoader) BackendLoader(opts ...Option) (client grpc.Backend, err error) {
o := NewOptions(opts...)
if o.model != "" {
log.Info().Msgf("Loading model '%s' with backend %s", o.model, o.backendString)
} else {
log.Info().Msgf("Loading model with backend %s", o.backendString)
}
log.Info().Msgf("Loading model '%s' with backend %s", o.modelID, o.backendString)
backend := strings.ToLower(o.backendString)
if realBackend, exists := Aliases[backend]; exists {
@ -420,10 +416,10 @@ func (ml *ModelLoader) BackendLoader(opts ...Option) (client grpc.Backend, err e
}
if o.singleActiveBackend {
log.Debug().Msgf("Stopping all backends except '%s'", o.model)
err := ml.StopGRPC(allExcept(o.model))
log.Debug().Msgf("Stopping all backends except '%s'", o.modelID)
err := ml.StopGRPC(allExcept(o.modelID))
if err != nil {
log.Error().Err(err).Str("keptModel", o.model).Msg("error while shutting down all backends except for the keptModel")
log.Error().Err(err).Str("keptModel", o.modelID).Msg("error while shutting down all backends except for the keptModel")
}
}
@ -437,7 +433,7 @@ func (ml *ModelLoader) BackendLoader(opts ...Option) (client grpc.Backend, err e
backendToConsume = backend
}
model, err := ml.LoadModel(o.model, ml.grpcModel(backendToConsume, o))
model, err := ml.LoadModel(o.modelID, o.model, ml.grpcModel(backendToConsume, o))
if err != nil {
return nil, err
}
@ -450,18 +446,18 @@ func (ml *ModelLoader) GreedyLoader(opts ...Option) (grpc.Backend, error) {
// Return earlier if we have a model already loaded
// (avoid looping through all the backends)
if m := ml.CheckIsLoaded(o.model); m != nil {
log.Debug().Msgf("Model '%s' already loaded", o.model)
if m := ml.CheckIsLoaded(o.modelID); m != nil {
log.Debug().Msgf("Model '%s' already loaded", o.modelID)
return m.GRPC(o.parallelRequests, ml.wd), nil
}
// If we can have only one backend active, kill all the others (except external backends)
if o.singleActiveBackend {
log.Debug().Msgf("Stopping all backends except '%s'", o.model)
err := ml.StopGRPC(allExcept(o.model))
log.Debug().Msgf("Stopping all backends except '%s'", o.modelID)
err := ml.StopGRPC(allExcept(o.modelID))
if err != nil {
log.Error().Err(err).Str("keptModel", o.model).Msg("error while shutting down all backends except for the keptModel - greedyloader continuing")
log.Error().Err(err).Str("keptModel", o.modelID).Msg("error while shutting down all backends except for the keptModel - greedyloader continuing")
}
}
@ -480,23 +476,13 @@ func (ml *ModelLoader) GreedyLoader(opts ...Option) (grpc.Backend, error) {
log.Debug().Msgf("Loading from the following backends (in order): %+v", autoLoadBackends)
if o.model != "" {
log.Info().Msgf("Trying to load the model '%s' with the backend '%s'", o.model, autoLoadBackends)
}
log.Info().Msgf("Trying to load the model '%s' with the backend '%s'", o.modelID, autoLoadBackends)
for _, key := range autoLoadBackends {
log.Info().Msgf("[%s] Attempting to load", key)
options := []Option{
options := append(opts, []Option{
WithBackendString(key),
WithModel(o.model),
WithLoadGRPCLoadModelOpts(o.gRPCOptions),
WithThreads(o.threads),
WithAssetDir(o.assetDir),
}
for k, v := range o.externalBackends {
options = append(options, WithExternalBackend(k, v))
}
}...)
model, modelerr := ml.BackendLoader(options...)
if modelerr == nil && model != nil {

View file

@ -114,9 +114,9 @@ func (ml *ModelLoader) ListModels() []Model {
return models
}
func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (*Model, error)) (*Model, error) {
func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string, string, string) (*Model, error)) (*Model, error) {
// Check if we already have a loaded model
if model := ml.CheckIsLoaded(modelName); model != nil {
if model := ml.CheckIsLoaded(modelID); model != nil {
return model, nil
}
@ -126,7 +126,7 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (
ml.mu.Lock()
defer ml.mu.Unlock()
model, err := loader(modelName, modelFile)
model, err := loader(modelID, modelName, modelFile)
if err != nil {
return nil, err
}
@ -135,7 +135,7 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (
return nil, fmt.Errorf("loader didn't return a model")
}
ml.models[modelName] = model
ml.models[modelID] = model
return model, nil
}

View file

@ -65,22 +65,22 @@ var _ = Describe("ModelLoader", func() {
It("should load a model and keep it in memory", func() {
mockModel = model.NewModel("foo", "test.model", nil)
mockLoader := func(modelName, modelFile string) (*model.Model, error) {
mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) {
return mockModel, nil
}
model, err := modelLoader.LoadModel("test.model", mockLoader)
model, err := modelLoader.LoadModel("foo", "test.model", mockLoader)
Expect(err).To(BeNil())
Expect(model).To(Equal(mockModel))
Expect(modelLoader.CheckIsLoaded("test.model")).To(Equal(mockModel))
Expect(modelLoader.CheckIsLoaded("foo")).To(Equal(mockModel))
})
It("should return an error if loading the model fails", func() {
mockLoader := func(modelName, modelFile string) (*model.Model, error) {
mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) {
return nil, errors.New("failed to load model")
}
model, err := modelLoader.LoadModel("test.model", mockLoader)
model, err := modelLoader.LoadModel("foo", "test.model", mockLoader)
Expect(err).To(HaveOccurred())
Expect(model).To(BeNil())
})
@ -88,18 +88,16 @@ var _ = Describe("ModelLoader", func() {
Context("ShutdownModel", func() {
It("should shutdown a loaded model", func() {
mockModel = model.NewModel("foo", "test.model", nil)
mockLoader := func(modelName, modelFile string) (*model.Model, error) {
return mockModel, nil
mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) {
return model.NewModel("foo", "test.model", nil), nil
}
_, err := modelLoader.LoadModel("test.model", mockLoader)
_, err := modelLoader.LoadModel("foo", "test.model", mockLoader)
Expect(err).To(BeNil())
err = modelLoader.ShutdownModel("test.model")
err = modelLoader.ShutdownModel("foo")
Expect(err).To(BeNil())
Expect(modelLoader.CheckIsLoaded("test.model")).To(BeNil())
Expect(modelLoader.CheckIsLoaded("foo")).To(BeNil())
})
})
})

View file

@ -9,7 +9,7 @@ import (
type Options struct {
backendString string
model string
threads uint32
modelID string
assetDir string
context context.Context
@ -68,12 +68,6 @@ func WithLoadGRPCLoadModelOpts(opts *pb.ModelOptions) Option {
}
}
func WithThreads(threads uint32) Option {
return func(o *Options) {
o.threads = threads
}
}
func WithAssetDir(assetDir string) Option {
return func(o *Options) {
o.assetDir = assetDir
@ -92,6 +86,12 @@ func WithSingleActiveBackend() Option {
}
}
func WithModelID(id string) Option {
return func(o *Options) {
o.modelID = id
}
}
func NewOptions(opts ...Option) *Options {
o := &Options{
gRPCOptions: &pb.ModelOptions{},

View file

@ -16,16 +16,26 @@ import (
)
func (ml *ModelLoader) deleteProcess(s string) error {
if m, exists := ml.models[s]; exists {
process := m.Process()
if process != nil {
if err := process.Stop(); err != nil {
log.Error().Err(err).Msgf("(deleteProcess) error while deleting process %s", s)
}
}
defer delete(ml.models, s)
m, exists := ml.models[s]
if !exists {
// Nothing to do
return nil
}
delete(ml.models, s)
return nil
process := m.Process()
if process == nil {
// Nothing to do as there is no process
return nil
}
err := process.Stop()
if err != nil {
log.Error().Err(err).Msgf("(deleteProcess) error while deleting process %s", s)
}
return err
}
func (ml *ModelLoader) StopGRPC(filter GRPCProcessFilter) error {