feat: allow to run parallel requests (#1290)

* feat: allow to run parallel requests

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fixup

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto 2023-11-16 08:20:05 +01:00 committed by GitHub
parent 66a558ff41
commit fdd95d1d86
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 91 additions and 44 deletions

3
.env
View file

@ -70,3 +70,6 @@ MODELS_PATH=/models
### Define the number of parallel LLAMA.cpp workers (Defaults to 1) ### Define the number of parallel LLAMA.cpp workers (Defaults to 1)
# LLAMACPP_PARALLEL=1 # LLAMACPP_PARALLEL=1
### Enable to run parallel requests
# PARALLEL_REQUESTS=true

View file

@ -16,6 +16,10 @@ func modelOpts(c config.Config, o *options.Option, opts []model.Option) []model.
opts = append(opts, model.WithSingleActiveBackend()) opts = append(opts, model.WithSingleActiveBackend())
} }
if o.ParallelBackendRequests {
opts = append(opts, model.EnableParallelRequests)
}
if c.GRPC.Attempts != 0 { if c.GRPC.Attempts != 0 {
opts = append(opts, model.WithGRPCAttempts(c.GRPC.Attempts)) opts = append(opts, model.WithGRPCAttempts(c.GRPC.Attempts))
} }

View file

@ -125,11 +125,11 @@ func BackendMonitorEndpoint(bm BackendMonitor) func(c *fiber.Ctx) error {
client := bm.options.Loader.CheckIsLoaded(backendId) client := bm.options.Loader.CheckIsLoaded(backendId)
if client == nil { if client == "" {
return fmt.Errorf("backend %s is not currently loaded", backendId) return fmt.Errorf("backend %s is not currently loaded", backendId)
} }
status, rpcErr := client.Status(context.TODO()) status, rpcErr := client.GRPC().Status(context.TODO())
if rpcErr != nil { if rpcErr != nil {
log.Warn().Msgf("backend %s experienced an error retrieving status info: %s", backendId, rpcErr.Error()) log.Warn().Msgf("backend %s experienced an error retrieving status info: %s", backendId, rpcErr.Error())
val, slbErr := bm.SampleLocalBackendProcess(backendId) val, slbErr := bm.SampleLocalBackendProcess(backendId)

View file

@ -5,9 +5,9 @@ import (
"embed" "embed"
"encoding/json" "encoding/json"
"github.com/go-skynet/LocalAI/metrics"
"github.com/go-skynet/LocalAI/pkg/gallery" "github.com/go-skynet/LocalAI/pkg/gallery"
model "github.com/go-skynet/LocalAI/pkg/model" model "github.com/go-skynet/LocalAI/pkg/model"
"github.com/go-skynet/LocalAI/metrics"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -37,6 +37,7 @@ type Option struct {
AutoloadGalleries bool AutoloadGalleries bool
SingleBackend bool SingleBackend bool
ParallelBackendRequests bool
} }
type AppOption func(*Option) type AppOption func(*Option)
@ -66,6 +67,10 @@ var EnableSingleBackend = func(o *Option) {
o.SingleBackend = true o.SingleBackend = true
} }
var EnableParallelBackendRequests = func(o *Option) {
o.ParallelBackendRequests = true
}
var EnableGalleriesAutoload = func(o *Option) { var EnableGalleriesAutoload = func(o *Option) {
o.AutoloadGalleries = true o.AutoloadGalleries = true
} }

11
main.go
View file

@ -16,9 +16,9 @@ import (
config "github.com/go-skynet/LocalAI/api/config" config "github.com/go-skynet/LocalAI/api/config"
"github.com/go-skynet/LocalAI/api/options" "github.com/go-skynet/LocalAI/api/options"
"github.com/go-skynet/LocalAI/internal" "github.com/go-skynet/LocalAI/internal"
"github.com/go-skynet/LocalAI/metrics"
"github.com/go-skynet/LocalAI/pkg/gallery" "github.com/go-skynet/LocalAI/pkg/gallery"
model "github.com/go-skynet/LocalAI/pkg/model" model "github.com/go-skynet/LocalAI/pkg/model"
"github.com/go-skynet/LocalAI/metrics"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
progressbar "github.com/schollz/progressbar/v3" progressbar "github.com/schollz/progressbar/v3"
@ -63,6 +63,11 @@ func main() {
EnvVars: []string{"SINGLE_ACTIVE_BACKEND"}, EnvVars: []string{"SINGLE_ACTIVE_BACKEND"},
Usage: "Allow only one backend to be running.", Usage: "Allow only one backend to be running.",
}, },
&cli.BoolFlag{
Name: "parallel-requests",
EnvVars: []string{"PARALLEL_REQUESTS"},
Usage: "Enable backends to handle multiple requests in parallel. This is for backends that supports multiple requests in parallel, like llama.cpp or vllm",
},
&cli.BoolFlag{ &cli.BoolFlag{
Name: "cors", Name: "cors",
EnvVars: []string{"CORS"}, EnvVars: []string{"CORS"},
@ -193,7 +198,9 @@ For a list of compatible model, check out: https://localai.io/model-compatibilit
options.WithUploadLimitMB(ctx.Int("upload-limit")), options.WithUploadLimitMB(ctx.Int("upload-limit")),
options.WithApiKeys(ctx.StringSlice("api-keys")), options.WithApiKeys(ctx.StringSlice("api-keys")),
} }
if ctx.Bool("parallel-requests") {
opts = append(opts, options.EnableParallelBackendRequests)
}
if ctx.Bool("single-active-backend") { if ctx.Bool("single-active-backend") {
opts = append(opts, options.EnableSingleBackend) opts = append(opts, options.EnableSingleBackend)
} }

View file

@ -61,11 +61,11 @@ var AutoLoadBackends []string = []string{
// starts the grpcModelProcess for the backend, and returns a grpc client // starts the grpcModelProcess for the backend, and returns a grpc client
// It also loads the model // It also loads the model
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string) (*grpc.Client, error) { func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string) (ModelAddress, error) {
return func(modelName, modelFile string) (*grpc.Client, error) { return func(modelName, modelFile string) (ModelAddress, error) {
log.Debug().Msgf("Loading Model %s with gRPC (file: %s) (backend: %s): %+v", modelName, modelFile, backend, *o) log.Debug().Msgf("Loading Model %s with gRPC (file: %s) (backend: %s): %+v", modelName, modelFile, backend, *o)
var client *grpc.Client var client ModelAddress
getFreeAddress := func() (string, error) { getFreeAddress := func() (string, error) {
port, err := freeport.GetFreePort() port, err := freeport.GetFreePort()
@ -82,46 +82,46 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
if _, err := os.Stat(uri); err == nil { if _, err := os.Stat(uri); err == nil {
serverAddress, err := getFreeAddress() serverAddress, err := getFreeAddress()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error()) return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
} }
// Make sure the process is executable // Make sure the process is executable
if err := ml.startProcess(uri, o.model, serverAddress); err != nil { if err := ml.startProcess(uri, o.model, serverAddress); err != nil {
return nil, err return "", err
} }
log.Debug().Msgf("GRPC Service Started") log.Debug().Msgf("GRPC Service Started")
client = grpc.NewClient(serverAddress) client = ModelAddress(serverAddress)
} else { } else {
// address // address
client = grpc.NewClient(uri) client = ModelAddress(uri)
} }
} else { } else {
grpcProcess := filepath.Join(o.assetDir, "backend-assets", "grpc", backend) grpcProcess := filepath.Join(o.assetDir, "backend-assets", "grpc", backend)
// Check if the file exists // Check if the file exists
if _, err := os.Stat(grpcProcess); os.IsNotExist(err) { if _, err := os.Stat(grpcProcess); os.IsNotExist(err) {
return nil, fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess) return "", fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess)
} }
serverAddress, err := getFreeAddress() serverAddress, err := getFreeAddress()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error()) return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
} }
// Make sure the process is executable // Make sure the process is executable
if err := ml.startProcess(grpcProcess, o.model, serverAddress); err != nil { if err := ml.startProcess(grpcProcess, o.model, serverAddress); err != nil {
return nil, err return "", err
} }
log.Debug().Msgf("GRPC Service Started") log.Debug().Msgf("GRPC Service Started")
client = grpc.NewClient(serverAddress) client = ModelAddress(serverAddress)
} }
// Wait for the service to start up // Wait for the service to start up
ready := false ready := false
for i := 0; i < o.grpcAttempts; i++ { for i := 0; i < o.grpcAttempts; i++ {
if client.HealthCheck(context.Background()) { if client.GRPC().HealthCheck(context.Background()) {
log.Debug().Msgf("GRPC Service Ready") log.Debug().Msgf("GRPC Service Ready")
ready = true ready = true
break break
@ -131,7 +131,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
if !ready { if !ready {
log.Debug().Msgf("GRPC Service NOT ready") log.Debug().Msgf("GRPC Service NOT ready")
return nil, fmt.Errorf("grpc service not ready") return "", fmt.Errorf("grpc service not ready")
} }
options := *o.gRPCOptions options := *o.gRPCOptions
@ -140,19 +140,30 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
log.Debug().Msgf("GRPC: Loading model with options: %+v", options) log.Debug().Msgf("GRPC: Loading model with options: %+v", options)
res, err := client.LoadModel(o.context, &options) res, err := client.GRPC().LoadModel(o.context, &options)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not load model: %w", err) return "", fmt.Errorf("could not load model: %w", err)
} }
if !res.Success { if !res.Success {
return nil, fmt.Errorf("could not load model (no success): %s", res.Message) return "", fmt.Errorf("could not load model (no success): %s", res.Message)
} }
return client, nil return client, nil
} }
} }
func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err error) { func (ml *ModelLoader) resolveAddress(addr ModelAddress, parallel bool) (*grpc.Client, error) {
if parallel {
return addr.GRPC(), nil
}
if _, ok := ml.grpcClients[string(addr)]; !ok {
ml.grpcClients[string(addr)] = addr.GRPC()
}
return ml.grpcClients[string(addr)], nil
}
func (ml *ModelLoader) BackendLoader(opts ...Option) (client *grpc.Client, err error) {
o := NewOptions(opts...) o := NewOptions(opts...)
log.Debug().Msgf("Loading model %s from %s", o.backendString, o.model) log.Debug().Msgf("Loading model %s from %s", o.backendString, o.model)
@ -166,22 +177,25 @@ func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err er
ml.mu.Unlock() ml.mu.Unlock()
} }
// if an external backend is provided, use it var backendToConsume string
_, externalBackendExists := o.externalBackends[backend]
if externalBackendExists {
return ml.LoadModel(o.model, ml.grpcModel(backend, o))
}
switch backend { switch backend {
case Gpt4AllLlamaBackend, Gpt4AllMptBackend, Gpt4AllJBackend, Gpt4All: case Gpt4AllLlamaBackend, Gpt4AllMptBackend, Gpt4AllJBackend, Gpt4All:
o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "gpt4all") o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "gpt4all")
return ml.LoadModel(o.model, ml.grpcModel(Gpt4All, o)) backendToConsume = Gpt4All
case PiperBackend: case PiperBackend:
o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "espeak-ng-data") o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "espeak-ng-data")
return ml.LoadModel(o.model, ml.grpcModel(PiperBackend, o)) backendToConsume = PiperBackend
default: default:
return ml.LoadModel(o.model, ml.grpcModel(backend, o)) backendToConsume = backend
} }
addr, err := ml.LoadModel(o.model, ml.grpcModel(backendToConsume, o))
if err != nil {
return nil, err
}
return ml.resolveAddress(addr, o.parallelRequests)
} }
func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) { func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) {
@ -190,10 +204,11 @@ func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) {
ml.mu.Lock() ml.mu.Lock()
// Return earlier if we have a model already loaded // Return earlier if we have a model already loaded
// (avoid looping through all the backends) // (avoid looping through all the backends)
if m := ml.CheckIsLoaded(o.model); m != nil { if m := ml.CheckIsLoaded(o.model); m != "" {
log.Debug().Msgf("Model '%s' already loaded", o.model) log.Debug().Msgf("Model '%s' already loaded", o.model)
ml.mu.Unlock() ml.mu.Unlock()
return m, nil
return ml.resolveAddress(m, o.parallelRequests)
} }
// If we can have only one backend active, kill all the others (except external backends) // If we can have only one backend active, kill all the others (except external backends)
if o.singleActiveBackend { if o.singleActiveBackend {

View file

@ -59,15 +59,23 @@ type ModelLoader struct {
ModelPath string ModelPath string
mu sync.Mutex mu sync.Mutex
// TODO: this needs generics // TODO: this needs generics
models map[string]*grpc.Client grpcClients map[string]*grpc.Client
models map[string]ModelAddress
grpcProcesses map[string]*process.Process grpcProcesses map[string]*process.Process
templates map[TemplateType]map[string]*template.Template templates map[TemplateType]map[string]*template.Template
} }
type ModelAddress string
func (m ModelAddress) GRPC() *grpc.Client {
return grpc.NewClient(string(m))
}
func NewModelLoader(modelPath string) *ModelLoader { func NewModelLoader(modelPath string) *ModelLoader {
nml := &ModelLoader{ nml := &ModelLoader{
ModelPath: modelPath, ModelPath: modelPath,
models: make(map[string]*grpc.Client), grpcClients: make(map[string]*grpc.Client),
models: make(map[string]ModelAddress),
templates: make(map[TemplateType]map[string]*template.Template), templates: make(map[TemplateType]map[string]*template.Template),
grpcProcesses: make(map[string]*process.Process), grpcProcesses: make(map[string]*process.Process),
} }
@ -98,12 +106,12 @@ func (ml *ModelLoader) ListModels() ([]string, error) {
return models, nil return models, nil
} }
func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (*grpc.Client, error)) (*grpc.Client, error) { func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (ModelAddress, error)) (ModelAddress, error) {
ml.mu.Lock() ml.mu.Lock()
defer ml.mu.Unlock() defer ml.mu.Unlock()
// Check if we already have a loaded model // Check if we already have a loaded model
if model := ml.CheckIsLoaded(modelName); model != nil { if model := ml.CheckIsLoaded(modelName); model != "" {
return model, nil return model, nil
} }
@ -113,7 +121,7 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (
model, err := loader(modelName, modelFile) model, err := loader(modelName, modelFile)
if err != nil { if err != nil {
return nil, err return "", err
} }
// TODO: Add a helper method to iterate all prompt templates associated with a config if and only if it's YAML? // TODO: Add a helper method to iterate all prompt templates associated with a config if and only if it's YAML?
@ -138,24 +146,24 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error {
return ml.deleteProcess(modelName) return ml.deleteProcess(modelName)
} }
func (ml *ModelLoader) CheckIsLoaded(s string) *grpc.Client { func (ml *ModelLoader) CheckIsLoaded(s string) ModelAddress {
if m, ok := ml.models[s]; ok { if m, ok := ml.models[s]; ok {
log.Debug().Msgf("Model already loaded in memory: %s", s) log.Debug().Msgf("Model already loaded in memory: %s", s)
if !m.HealthCheck(context.Background()) { if !m.GRPC().HealthCheck(context.Background()) {
log.Debug().Msgf("GRPC Model not responding: %s", s) log.Debug().Msgf("GRPC Model not responding: %s", s)
if !ml.grpcProcesses[s].IsAlive() { if !ml.grpcProcesses[s].IsAlive() {
log.Debug().Msgf("GRPC Process is not responding: %s", s) log.Debug().Msgf("GRPC Process is not responding: %s", s)
// stop and delete the process, this forces to re-load the model and re-create again the service // stop and delete the process, this forces to re-load the model and re-create again the service
ml.deleteProcess(s) ml.deleteProcess(s)
return nil return ""
} }
} }
return m return m
} }
return nil return ""
} }
func (ml *ModelLoader) EvaluateTemplateForPrompt(templateType TemplateType, templateName string, in PromptTemplateData) (string, error) { func (ml *ModelLoader) EvaluateTemplateForPrompt(templateType TemplateType, templateName string, in PromptTemplateData) (string, error) {

View file

@ -20,10 +20,15 @@ type Options struct {
grpcAttempts int grpcAttempts int
grpcAttemptsDelay int grpcAttemptsDelay int
singleActiveBackend bool singleActiveBackend bool
parallelRequests bool
} }
type Option func(*Options) type Option func(*Options)
var EnableParallelRequests = func(o *Options) {
o.parallelRequests = true
}
func WithExternalBackend(name string, uri string) Option { func WithExternalBackend(name string, uri string) Option {
return func(o *Options) { return func(o *Options) {
if o.externalBackends == nil { if o.externalBackends == nil {

View file

@ -17,7 +17,7 @@ import (
func (ml *ModelLoader) StopAllExcept(s string) { func (ml *ModelLoader) StopAllExcept(s string) {
ml.StopGRPC(func(id string, p *process.Process) bool { ml.StopGRPC(func(id string, p *process.Process) bool {
if id != s { if id != s {
for ml.models[id].IsBusy() { for ml.models[id].GRPC().IsBusy() {
log.Debug().Msgf("%s busy. Waiting.", id) log.Debug().Msgf("%s busy. Waiting.", id)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
} }