feat: add external grpc and model autoloading

This commit is contained in:
Ettore Di Giacinto 2023-07-20 22:10:12 +02:00
parent 1d2ae46ddc
commit 94916749c5
15 changed files with 429 additions and 192 deletions

View file

@ -19,8 +19,6 @@ import (
process "github.com/mudler/go-processmanager"
)
const tokenizerSuffix = ".tokenizer.json"
const (
LlamaBackend = "llama"
BloomzBackend = "bloomz"
@ -45,7 +43,6 @@ const (
StableDiffusionBackend = "stablediffusion"
PiperBackend = "piper"
LCHuggingFaceBackend = "langchain-huggingface"
//GGLLMFalconBackend = "falcon"
)
var AutoLoadBackends []string = []string{
@ -75,75 +72,116 @@ func (ml *ModelLoader) StopGRPC() {
}
}
func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string) error {
// Make sure the process is executable
if err := os.Chmod(grpcProcess, 0755); err != nil {
return err
}
log.Debug().Msgf("Loading GRPC Process", grpcProcess)
log.Debug().Msgf("GRPC Service for %s will be running at: '%s'", id, serverAddress)
grpcControlProcess := process.New(
process.WithTemporaryStateDir(),
process.WithName(grpcProcess),
process.WithArgs("--addr", serverAddress))
ml.grpcProcesses[id] = grpcControlProcess
if err := grpcControlProcess.Run(); err != nil {
return err
}
log.Debug().Msgf("GRPC Service state dir: %s", grpcControlProcess.StateDir())
// clean up process
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
grpcControlProcess.Stop()
}()
go func() {
t, err := tail.TailFile(grpcControlProcess.StderrPath(), tail.Config{Follow: true})
if err != nil {
log.Debug().Msgf("Could not tail stderr")
}
for line := range t.Lines {
log.Debug().Msgf("GRPC(%s): stderr %s", strings.Join([]string{id, serverAddress}, "-"), line.Text)
}
}()
go func() {
t, err := tail.TailFile(grpcControlProcess.StdoutPath(), tail.Config{Follow: true})
if err != nil {
log.Debug().Msgf("Could not tail stdout")
}
for line := range t.Lines {
log.Debug().Msgf("GRPC(%s): stdout %s", strings.Join([]string{id, serverAddress}, "-"), line.Text)
}
}()
return nil
}
// starts the grpcModelProcess for the backend, and returns a grpc client
// It also loads the model
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string) (*grpc.Client, error) {
return func(s string) (*grpc.Client, error) {
log.Debug().Msgf("Loading GRPC Model", backend, *o)
grpcProcess := filepath.Join(o.assetDir, "backend-assets", "grpc", backend)
var client *grpc.Client
// Check if the file exists
if _, err := os.Stat(grpcProcess); os.IsNotExist(err) {
return nil, fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess)
}
// Make sure the process is executable
if err := os.Chmod(grpcProcess, 0755); err != nil {
return nil, err
}
log.Debug().Msgf("Loading GRPC Process", grpcProcess)
port, err := freeport.GetFreePort()
if err != nil {
return nil, err
}
serverAddress := fmt.Sprintf("localhost:%d", port)
log.Debug().Msgf("GRPC Service for '%s' (%s) will be running at: '%s'", backend, o.modelFile, serverAddress)
grpcControlProcess := process.New(
process.WithTemporaryStateDir(),
process.WithName(grpcProcess),
process.WithArgs("--addr", serverAddress))
ml.grpcProcesses[o.modelFile] = grpcControlProcess
if err := grpcControlProcess.Run(); err != nil {
return nil, err
}
// clean up process
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
grpcControlProcess.Stop()
}()
go func() {
t, err := tail.TailFile(grpcControlProcess.StderrPath(), tail.Config{Follow: true})
getFreeAddress := func() (string, error) {
port, err := freeport.GetFreePort()
if err != nil {
log.Debug().Msgf("Could not tail stderr")
return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
}
for line := range t.Lines {
log.Debug().Msgf("GRPC(%s): stderr %s", strings.Join([]string{backend, o.modelFile, serverAddress}, "-"), line.Text)
return fmt.Sprintf("127.0.0.1:%d", port), nil
}
// Check if the backend is provided as external
if uri, ok := o.externalBackends[backend]; ok {
log.Debug().Msgf("Loading external backend: %s", uri)
// check if uri is a file or a address
if _, err := os.Stat(uri); err == nil {
serverAddress, err := getFreeAddress()
if err != nil {
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
}
// Make sure the process is executable
if err := ml.startProcess(uri, o.modelFile, serverAddress); err != nil {
return nil, err
}
log.Debug().Msgf("GRPC Service Started")
client = grpc.NewClient(serverAddress)
} else {
// address
client = grpc.NewClient(uri)
}
}()
go func() {
t, err := tail.TailFile(grpcControlProcess.StdoutPath(), tail.Config{Follow: true})
} else {
grpcProcess := filepath.Join(o.assetDir, "backend-assets", "grpc", backend)
// Check if the file exists
if _, err := os.Stat(grpcProcess); os.IsNotExist(err) {
return nil, fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess)
}
serverAddress, err := getFreeAddress()
if err != nil {
log.Debug().Msgf("Could not tail stdout")
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
}
for line := range t.Lines {
log.Debug().Msgf("GRPC(%s): stderr %s", strings.Join([]string{backend, o.modelFile, serverAddress}, "-"), line.Text)
// Make sure the process is executable
if err := ml.startProcess(grpcProcess, o.modelFile, serverAddress); err != nil {
return nil, err
}
}()
log.Debug().Msgf("GRPC Service Started")
log.Debug().Msgf("GRPC Service Started")
client := grpc.NewClient(serverAddress)
client = grpc.NewClient(serverAddress)
}
// Wait for the service to start up
ready := false
@ -158,11 +196,6 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string) (*grpc
if !ready {
log.Debug().Msgf("GRPC Service NOT ready")
log.Debug().Msgf("Alive: ", grpcControlProcess.IsAlive())
log.Debug().Msgf(fmt.Sprintf("GRPC Service Exitcode:"))
log.Debug().Msgf(grpcControlProcess.ExitCode())
return nil, fmt.Errorf("grpc service not ready")
}
@ -189,6 +222,13 @@ func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err er
log.Debug().Msgf("Loading model %s from %s", o.backendString, o.modelFile)
backend := strings.ToLower(o.backendString)
// if an external backend is provided, use it
_, externalBackendExists := o.externalBackends[backend]
if externalBackendExists {
return ml.LoadModel(o.modelFile, ml.grpcModel(backend, o))
}
switch backend {
case LlamaBackend, LlamaGrammarBackend, GPTJBackend, DollyBackend,
MPTBackend, Gpt2Backend, FalconBackend,
@ -209,8 +249,6 @@ func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err er
func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) {
o := NewOptions(opts...)
log.Debug().Msgf("Loading model '%s' greedly", o.modelFile)
// Is this really needed? BackendLoader already does this
ml.mu.Lock()
if m := ml.checkIsLoaded(o.modelFile); m != nil {
@ -221,16 +259,29 @@ func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) {
ml.mu.Unlock()
var err error
for _, b := range AutoLoadBackends {
log.Debug().Msgf("[%s] Attempting to load", b)
// autoload also external backends
allBackendsToAutoLoad := []string{}
allBackendsToAutoLoad = append(allBackendsToAutoLoad, AutoLoadBackends...)
for _, b := range o.externalBackends {
allBackendsToAutoLoad = append(allBackendsToAutoLoad, b)
}
log.Debug().Msgf("Loading model '%s' greedly from all the available backends: %s", o.modelFile, strings.Join(allBackendsToAutoLoad, ", "))
model, modelerr := ml.BackendLoader(
for _, b := range allBackendsToAutoLoad {
log.Debug().Msgf("[%s] Attempting to load", b)
options := []Option{
WithBackendString(b),
WithModelFile(o.modelFile),
WithLoadGRPCLLMModelOpts(o.gRPCOptions),
WithThreads(o.threads),
WithAssetDir(o.assetDir),
)
}
for k, v := range o.externalBackends {
options = append(options, WithExternalBackend(k, v))
}
model, modelerr := ml.BackendLoader(options...)
if modelerr == nil && model != nil {
log.Debug().Msgf("[%s] Loads OK", b)
return model, nil