diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2af3fd00..b62f86ef 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -178,13 +178,22 @@ jobs: uses: actions/checkout@v4 with: submodules: true + - name: Dependencies + run: | + # Install protoc + curl -L -s https://github.com/protocolbuffers/protobuf/releases/download/v26.1/protoc-26.1-linux-x86_64.zip -o protoc.zip && \ + unzip -j -d /usr/local/bin protoc.zip bin/protoc && \ + rm protoc.zip + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.34.2 + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@1958fcbe2ca8bd93af633f11e97d44e567e945af + PATH="$PATH:$HOME/go/bin" make protogen-go - name: Build images run: | docker build --build-arg FFMPEG=true --build-arg IMAGE_TYPE=extras --build-arg EXTRA_BACKENDS=rerankers --build-arg MAKEFLAGS="--jobs=5 --output-sync=target" -t local-ai:tests -f Dockerfile . BASE_IMAGE=local-ai:tests DOCKER_AIO_IMAGE=local-ai-aio:test make docker-aio - name: Test run: | - LOCALAI_MODELS_DIR=$PWD/models LOCALAI_IMAGE_TAG=test LOCALAI_IMAGE=local-ai-aio \ + PATH="$PATH:$HOME/go/bin" LOCALAI_MODELS_DIR=$PWD/models LOCALAI_IMAGE_TAG=test LOCALAI_IMAGE=local-ai-aio \ make run-e2e-aio - name: Setup tmate session if tests fail if: ${{ failure() }} diff --git a/Makefile b/Makefile index 121b8e50..4efee986 100644 --- a/Makefile +++ b/Makefile @@ -468,7 +468,7 @@ run-e2e-image: ls -liah $(abspath ./tests/e2e-fixtures) docker run -p 5390:8080 -e MODELS_PATH=/models -e THREADS=1 -e DEBUG=true -d --rm -v $(TEST_DIR):/models --gpus all --name e2e-tests-$(RANDOM) localai-tests -run-e2e-aio: +run-e2e-aio: protogen-go @echo 'Running e2e AIO tests' $(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --flake-attempts 5 -v -r ./tests/e2e-aio diff --git a/pkg/model/initializers.go b/pkg/model/initializers.go index 80dd10b4..d0f47373 100644 --- a/pkg/model/initializers.go +++ b/pkg/model/initializers.go @@ -304,18 +304,19 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string return nil, fmt.Errorf("failed allocating free ports: %s", err.Error()) } // Make sure the process is executable - if err := ml.startProcess(uri, o.model, serverAddress); err != nil { + process, err := ml.startProcess(uri, o.model, serverAddress) + if err != nil { log.Error().Err(err).Str("path", uri).Msg("failed to launch ") return nil, err } log.Debug().Msgf("GRPC Service Started") - client = NewModel(modelName, serverAddress) + client = NewModel(modelName, serverAddress, process) } else { log.Debug().Msg("external backend is uri") // address - client = NewModel(modelName, uri) + client = NewModel(modelName, uri, nil) } } else { grpcProcess := backendPath(o.assetDir, backend) @@ -346,13 +347,14 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string args, grpcProcess = library.LoadLDSO(o.assetDir, args, grpcProcess) // Make sure the process is executable in any circumstance - if err := ml.startProcess(grpcProcess, o.model, serverAddress, args...); err != nil { + process, err := ml.startProcess(grpcProcess, o.model, serverAddress, args...) + if err != nil { return nil, err } log.Debug().Msgf("GRPC Service Started") - client = NewModel(modelName, serverAddress) + client = NewModel(modelName, serverAddress, process) } log.Debug().Msgf("Wait for the service to start up") @@ -374,6 +376,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string if !ready { log.Debug().Msgf("GRPC Service NOT ready") + ml.deleteProcess(o.model) return nil, fmt.Errorf("grpc service not ready") } @@ -385,9 +388,11 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string res, err := client.GRPC(o.parallelRequests, ml.wd).LoadModel(o.context, &options) if err != nil { + ml.deleteProcess(o.model) return nil, fmt.Errorf("could not load model: %w", err) } if !res.Success { + ml.deleteProcess(o.model) return nil, fmt.Errorf("could not load model (no success): %s", res.Message) } diff --git a/pkg/model/loader.go b/pkg/model/loader.go index 4f1ec841..68ac1a31 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -13,7 +13,6 @@ import ( "github.com/mudler/LocalAI/pkg/utils" - process "github.com/mudler/go-processmanager" "github.com/rs/zerolog/log" ) @@ -21,20 +20,18 @@ import ( // TODO: Split ModelLoader and TemplateLoader? Just to keep things more organized. Left together to share a mutex until I look into that. Would split if we seperate directories for .bin/.yaml and .tmpl type ModelLoader struct { - ModelPath string - mu sync.Mutex - models map[string]*Model - grpcProcesses map[string]*process.Process - templates *templates.TemplateCache - wd *WatchDog + ModelPath string + mu sync.Mutex + models map[string]*Model + templates *templates.TemplateCache + wd *WatchDog } func NewModelLoader(modelPath string) *ModelLoader { nml := &ModelLoader{ - ModelPath: modelPath, - models: make(map[string]*Model), - templates: templates.NewTemplateCache(modelPath), - grpcProcesses: make(map[string]*process.Process), + ModelPath: modelPath, + models: make(map[string]*Model), + templates: templates.NewTemplateCache(modelPath), } return nml @@ -127,6 +124,8 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) ( modelFile := filepath.Join(ml.ModelPath, modelName) log.Debug().Msgf("Loading model in memory from file: %s", modelFile) + ml.mu.Lock() + defer ml.mu.Unlock() model, err := loader(modelName, modelFile) if err != nil { return nil, err @@ -136,8 +135,6 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) ( return nil, fmt.Errorf("loader didn't return a model") } - ml.mu.Lock() - defer ml.mu.Unlock() ml.models[modelName] = model return model, nil @@ -146,14 +143,13 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) ( func (ml *ModelLoader) ShutdownModel(modelName string) error { ml.mu.Lock() defer ml.mu.Unlock() - - _, ok := ml.models[modelName] + model, ok := ml.models[modelName] if !ok { return fmt.Errorf("model %s not found", modelName) } retries := 1 - for ml.models[modelName].GRPC(false, ml.wd).IsBusy() { + for model.GRPC(false, ml.wd).IsBusy() { log.Debug().Msgf("%s busy. Waiting.", modelName) dur := time.Duration(retries*2) * time.Second if dur > retryTimeout { @@ -185,8 +181,8 @@ func (ml *ModelLoader) CheckIsLoaded(s string) *Model { if !alive { log.Warn().Msgf("GRPC Model not responding: %s", err.Error()) log.Warn().Msgf("Deleting the process in order to recreate it") - process, exists := ml.grpcProcesses[s] - if !exists { + process := m.Process() + if process == nil { log.Error().Msgf("Process not found for '%s' and the model is not responding anymore !", s) return m } diff --git a/pkg/model/loader_test.go b/pkg/model/loader_test.go index c16a6e50..d0ad4e0c 100644 --- a/pkg/model/loader_test.go +++ b/pkg/model/loader_test.go @@ -63,7 +63,7 @@ var _ = Describe("ModelLoader", func() { Context("LoadModel", func() { It("should load a model and keep it in memory", func() { - mockModel = model.NewModel("foo", "test.model") + mockModel = model.NewModel("foo", "test.model", nil) mockLoader := func(modelName, modelFile string) (*model.Model, error) { return mockModel, nil @@ -88,7 +88,7 @@ var _ = Describe("ModelLoader", func() { Context("ShutdownModel", func() { It("should shutdown a loaded model", func() { - mockModel = model.NewModel("foo", "test.model") + mockModel = model.NewModel("foo", "test.model", nil) mockLoader := func(modelName, modelFile string) (*model.Model, error) { return mockModel, nil diff --git a/pkg/model/model.go b/pkg/model/model.go index 6cb81d10..6e4fd316 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -1,20 +1,32 @@ package model -import grpc "github.com/mudler/LocalAI/pkg/grpc" +import ( + "sync" + + grpc "github.com/mudler/LocalAI/pkg/grpc" + process "github.com/mudler/go-processmanager" +) type Model struct { ID string `json:"id"` address string client grpc.Backend + process *process.Process + sync.Mutex } -func NewModel(ID, address string) *Model { +func NewModel(ID, address string, process *process.Process) *Model { return &Model{ ID: ID, address: address, + process: process, } } +func (m *Model) Process() *process.Process { + return m.process +} + func (m *Model) GRPC(parallel bool, wd *WatchDog) grpc.Backend { if m.client != nil { return m.client @@ -25,6 +37,8 @@ func (m *Model) GRPC(parallel bool, wd *WatchDog) grpc.Backend { enableWD = true } + m.Lock() + defer m.Unlock() m.client = grpc.NewClient(m.address, parallel, wd, enableWD) return m.client } diff --git a/pkg/model/process.go b/pkg/model/process.go index bcd1fccb..48631d79 100644 --- a/pkg/model/process.go +++ b/pkg/model/process.go @@ -16,20 +16,22 @@ import ( ) func (ml *ModelLoader) deleteProcess(s string) error { - if _, exists := ml.grpcProcesses[s]; exists { - if err := ml.grpcProcesses[s].Stop(); err != nil { - log.Error().Err(err).Msgf("(deleteProcess) error while deleting grpc process %s", s) + if m, exists := ml.models[s]; exists { + process := m.Process() + if process != nil { + if err := process.Stop(); err != nil { + log.Error().Err(err).Msgf("(deleteProcess) error while deleting process %s", s) + } } } - delete(ml.grpcProcesses, s) delete(ml.models, s) return nil } func (ml *ModelLoader) StopGRPC(filter GRPCProcessFilter) error { var err error = nil - for k, p := range ml.grpcProcesses { - if filter(k, p) { + for k, m := range ml.models { + if filter(k, m.Process()) { e := ml.ShutdownModel(k) err = errors.Join(err, e) } @@ -44,17 +46,20 @@ func (ml *ModelLoader) StopAllGRPC() error { func (ml *ModelLoader) GetGRPCPID(id string) (int, error) { ml.mu.Lock() defer ml.mu.Unlock() - p, exists := ml.grpcProcesses[id] + p, exists := ml.models[id] if !exists { return -1, fmt.Errorf("no grpc backend found for %s", id) } - return strconv.Atoi(p.PID) + if p.Process() == nil { + return -1, fmt.Errorf("no grpc backend found for %s", id) + } + return strconv.Atoi(p.Process().PID) } -func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string, args ...string) error { +func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string, args ...string) (*process.Process, error) { // Make sure the process is executable if err := os.Chmod(grpcProcess, 0700); err != nil { - return err + return nil, err } log.Debug().Msgf("Loading GRPC Process: %s", grpcProcess) @@ -63,7 +68,7 @@ func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string workDir, err := filepath.Abs(filepath.Dir(grpcProcess)) if err != nil { - return err + return nil, err } grpcControlProcess := process.New( @@ -79,10 +84,8 @@ func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string ml.wd.AddAddressModelMap(serverAddress, id) } - ml.grpcProcesses[id] = grpcControlProcess - if err := grpcControlProcess.Run(); err != nil { - return err + return grpcControlProcess, err } log.Debug().Msgf("GRPC Service state dir: %s", grpcControlProcess.StateDir()) @@ -116,5 +119,5 @@ func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string } }() - return nil + return grpcControlProcess, nil }