mirror of
https://github.com/mudler/LocalAI.git
synced 2025-05-27 22:15:00 +00:00
Revert "[Refactor]: Core/API Split" (#1550)
Revert "[Refactor]: Core/API Split (#1506)"
This reverts commit ab7b4d5ee9
.
This commit is contained in:
parent
ab7b4d5ee9
commit
db926896bd
77 changed files with 3132 additions and 3456 deletions
|
@ -1,138 +0,0 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/go-skynet/LocalAI/pkg/grpc/proto"
|
||||
"github.com/go-skynet/LocalAI/pkg/model"
|
||||
"github.com/go-skynet/LocalAI/pkg/schema"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
gopsutil "github.com/shirou/gopsutil/v3/process"
|
||||
)
|
||||
|
||||
type BackendMonitor struct {
|
||||
configLoader *ConfigLoader
|
||||
modelLoader *model.ModelLoader
|
||||
options *schema.StartupOptions // Taking options in case we need to inspect ExternalGRPCBackends, though that's out of scope for now, hence the name.
|
||||
}
|
||||
|
||||
func NewBackendMonitor(configLoader *ConfigLoader, modelLoader *model.ModelLoader, options *schema.StartupOptions) *BackendMonitor {
|
||||
return &BackendMonitor{
|
||||
configLoader: configLoader,
|
||||
modelLoader: modelLoader,
|
||||
options: options,
|
||||
}
|
||||
}
|
||||
|
||||
func (bm *BackendMonitor) SampleLocalBackendProcess(model string) (*schema.BackendMonitorResponse, error) {
|
||||
config, exists := bm.configLoader.GetConfig(model)
|
||||
var backend string
|
||||
if exists {
|
||||
backend = config.Model
|
||||
} else {
|
||||
// Last ditch effort: use it raw, see if a backend happens to match.
|
||||
backend = model
|
||||
}
|
||||
|
||||
if !strings.HasSuffix(backend, ".bin") {
|
||||
backend = fmt.Sprintf("%s.bin", backend)
|
||||
}
|
||||
|
||||
pid, err := bm.modelLoader.GetGRPCPID(backend)
|
||||
|
||||
if err != nil {
|
||||
log.Error().Msgf("model %s : failed to find pid %+v", model, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Name is slightly frightening but this does _not_ create a new process, rather it looks up an existing process by PID.
|
||||
backendProcess, err := gopsutil.NewProcess(int32(pid))
|
||||
|
||||
if err != nil {
|
||||
log.Error().Msgf("model %s [PID %d] : error getting process info %+v", model, pid, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
memInfo, err := backendProcess.MemoryInfo()
|
||||
|
||||
if err != nil {
|
||||
log.Error().Msgf("model %s [PID %d] : error getting memory info %+v", model, pid, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
memPercent, err := backendProcess.MemoryPercent()
|
||||
if err != nil {
|
||||
log.Error().Msgf("model %s [PID %d] : error getting memory percent %+v", model, pid, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cpuPercent, err := backendProcess.CPUPercent()
|
||||
if err != nil {
|
||||
log.Error().Msgf("model %s [PID %d] : error getting cpu percent %+v", model, pid, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &schema.BackendMonitorResponse{
|
||||
MemoryInfo: memInfo,
|
||||
MemoryPercent: memPercent,
|
||||
CPUPercent: cpuPercent,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (bm BackendMonitor) getModelLoaderIDFromModelName(modelName string) (string, error) {
|
||||
config, exists := bm.configLoader.GetConfig(modelName)
|
||||
var backendId string
|
||||
if exists {
|
||||
backendId = config.Model
|
||||
} else {
|
||||
// Last ditch effort: use it raw, see if a backend happens to match.
|
||||
backendId = modelName
|
||||
}
|
||||
|
||||
if !strings.HasSuffix(backendId, ".bin") {
|
||||
backendId = fmt.Sprintf("%s.bin", backendId)
|
||||
}
|
||||
|
||||
return backendId, nil
|
||||
}
|
||||
|
||||
func (bm BackendMonitor) CheckAndSample(modelName string) (*proto.StatusResponse, error) {
|
||||
backendId, err := bm.getModelLoaderIDFromModelName(modelName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
modelAddr := bm.modelLoader.CheckIsLoaded(backendId)
|
||||
if modelAddr == "" {
|
||||
return nil, fmt.Errorf("backend %s is not currently loaded", backendId)
|
||||
}
|
||||
|
||||
status, rpcErr := modelAddr.GRPC(false, nil).Status(context.TODO())
|
||||
if rpcErr != nil {
|
||||
log.Warn().Msgf("backend %s experienced an error retrieving status info: %s", backendId, rpcErr.Error())
|
||||
val, slbErr := bm.SampleLocalBackendProcess(backendId)
|
||||
if slbErr != nil {
|
||||
return nil, fmt.Errorf("backend %s experienced an error retrieving status info via rpc: %s, then failed local node process sample: %s", backendId, rpcErr.Error(), slbErr.Error())
|
||||
}
|
||||
return &proto.StatusResponse{
|
||||
State: proto.StatusResponse_ERROR,
|
||||
Memory: &proto.MemoryUsageData{
|
||||
Total: val.MemoryInfo.VMS,
|
||||
Breakdown: map[string]uint64{
|
||||
"gopsutil-RSS": val.MemoryInfo.RSS,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (bm BackendMonitor) ShutdownModel(modelName string) error {
|
||||
backendId, err := bm.getModelLoaderIDFromModelName(modelName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return bm.modelLoader.ShutdownModel(backendId)
|
||||
}
|
|
@ -1,157 +0,0 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/go-skynet/LocalAI/pkg/schema"
|
||||
"github.com/go-skynet/LocalAI/pkg/utils"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type ConfigLoader struct {
|
||||
configs map[string]schema.Config
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewConfigLoader() *ConfigLoader {
|
||||
return &ConfigLoader{
|
||||
configs: make(map[string]schema.Config),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: check this is correct post-merge
|
||||
func (cm *ConfigLoader) LoadConfig(file string) error {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
c, err := schema.ReadSingleConfigFile(file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read config file: %w", err)
|
||||
}
|
||||
|
||||
cm.configs[c.Name] = *c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *ConfigLoader) GetConfig(m string) (schema.Config, bool) {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
v, exists := cm.configs[m]
|
||||
return v, exists
|
||||
}
|
||||
|
||||
func (cm *ConfigLoader) GetAllConfigs() []schema.Config {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
var res []schema.Config
|
||||
for _, v := range cm.configs {
|
||||
res = append(res, v)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (cm *ConfigLoader) ListConfigs() []string {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
var res []string
|
||||
for k := range cm.configs {
|
||||
res = append(res, k)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (cm *ConfigLoader) LoadConfigs(path string) error {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
entries, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
files := make([]fs.FileInfo, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
info, err := entry.Info()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
files = append(files, info)
|
||||
}
|
||||
for _, file := range files {
|
||||
// Skip templates, YAML and .keep files
|
||||
if !strings.Contains(file.Name(), ".yaml") && !strings.Contains(file.Name(), ".yml") {
|
||||
continue
|
||||
}
|
||||
c, err := schema.ReadSingleConfigFile(filepath.Join(path, file.Name()))
|
||||
if err == nil {
|
||||
cm.configs[c.Name] = *c
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Preload prepare models if they are not local but url or huggingface repositories
|
||||
func (cm *ConfigLoader) Preload(modelPath string) error {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
status := func(fileName, current, total string, percent float64) {
|
||||
utils.DisplayDownloadFunction(fileName, current, total, percent)
|
||||
}
|
||||
|
||||
log.Info().Msgf("Preloading models from %s", modelPath)
|
||||
|
||||
for _, config := range cm.configs {
|
||||
|
||||
// Download files and verify their SHA
|
||||
for _, file := range config.DownloadFiles {
|
||||
log.Debug().Msgf("Checking %q exists and matches SHA", file.Filename)
|
||||
|
||||
if err := utils.VerifyPath(file.Filename, modelPath); err != nil {
|
||||
return err
|
||||
}
|
||||
// Create file path
|
||||
filePath := filepath.Join(modelPath, file.Filename)
|
||||
|
||||
if err := utils.DownloadFile(file.URI, filePath, file.SHA256, status); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
modelURL := config.PredictionOptions.Model
|
||||
modelURL = utils.ConvertURL(modelURL)
|
||||
|
||||
if utils.LooksLikeURL(modelURL) {
|
||||
// md5 of model name
|
||||
md5Name := utils.MD5(modelURL)
|
||||
|
||||
// check if file exists
|
||||
if _, err := os.Stat(filepath.Join(modelPath, md5Name)); errors.Is(err, os.ErrNotExist) {
|
||||
err := utils.DownloadFile(modelURL, filepath.Join(modelPath, md5Name), "", status)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cl *ConfigLoader) LoadConfigFile(file string) error {
|
||||
cl.Lock()
|
||||
defer cl.Unlock()
|
||||
c, err := schema.ReadConfigFile(file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot load config file: %w", err)
|
||||
}
|
||||
|
||||
for _, cc := range c {
|
||||
cl.configs[cc.Name] = *cc
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -1,160 +0,0 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/go-skynet/LocalAI/pkg/gallery"
|
||||
"github.com/go-skynet/LocalAI/pkg/utils"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
type GalleryApplier struct {
|
||||
modelPath string
|
||||
sync.Mutex
|
||||
C chan gallery.GalleryOp
|
||||
statuses map[string]*gallery.GalleryOpStatus
|
||||
}
|
||||
|
||||
func NewGalleryApplier(modelPath string) *GalleryApplier {
|
||||
return &GalleryApplier{
|
||||
modelPath: modelPath,
|
||||
C: make(chan gallery.GalleryOp),
|
||||
statuses: make(map[string]*gallery.GalleryOpStatus),
|
||||
}
|
||||
}
|
||||
|
||||
func (g *GalleryApplier) UpdateStatus(s string, op *gallery.GalleryOpStatus) {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
g.statuses[s] = op
|
||||
}
|
||||
|
||||
func (g *GalleryApplier) GetStatus(s string) *gallery.GalleryOpStatus {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
|
||||
return g.statuses[s]
|
||||
}
|
||||
|
||||
func (g *GalleryApplier) GetAllStatus() map[string]*gallery.GalleryOpStatus {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
|
||||
return g.statuses
|
||||
}
|
||||
|
||||
func (g *GalleryApplier) Start(c context.Context, cm *ConfigLoader) {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-c.Done():
|
||||
return
|
||||
case op := <-g.C:
|
||||
utils.ResetDownloadTimers()
|
||||
|
||||
g.UpdateStatus(op.Id, &gallery.GalleryOpStatus{Message: "processing", Progress: 0})
|
||||
|
||||
// updates the status with an error
|
||||
updateError := func(e error) {
|
||||
g.UpdateStatus(op.Id, &gallery.GalleryOpStatus{Error: e, Processed: true, Message: "error: " + e.Error()})
|
||||
}
|
||||
|
||||
// displayDownload displays the download progress
|
||||
progressCallback := func(fileName string, current string, total string, percentage float64) {
|
||||
g.UpdateStatus(op.Id, &gallery.GalleryOpStatus{Message: "processing", FileName: fileName, Progress: percentage, TotalFileSize: total, DownloadedFileSize: current})
|
||||
utils.DisplayDownloadFunction(fileName, current, total, percentage)
|
||||
}
|
||||
|
||||
var err error
|
||||
// if the request contains a gallery name, we apply the gallery from the gallery list
|
||||
if op.GalleryName != "" {
|
||||
if strings.Contains(op.GalleryName, "@") {
|
||||
err = gallery.InstallModelFromGallery(op.Galleries, op.GalleryName, g.modelPath, op.Req, progressCallback)
|
||||
} else {
|
||||
err = gallery.InstallModelFromGalleryByName(op.Galleries, op.GalleryName, g.modelPath, op.Req, progressCallback)
|
||||
}
|
||||
} else {
|
||||
err = PrepareModel(g.modelPath, op.Req, cm, progressCallback)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
updateError(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Reload models
|
||||
err = cm.LoadConfigs(g.modelPath)
|
||||
if err != nil {
|
||||
updateError(err)
|
||||
continue
|
||||
}
|
||||
|
||||
g.UpdateStatus(op.Id, &gallery.GalleryOpStatus{Processed: true, Message: "completed", Progress: 100})
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
type galleryModel struct {
|
||||
gallery.GalleryModel `yaml:",inline"` // https://github.com/go-yaml/yaml/issues/63
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
func PrepareModel(modelPath string, req gallery.GalleryModel, cm *ConfigLoader, downloadStatus func(string, string, string, float64)) error {
|
||||
|
||||
config, err := gallery.GetInstallableModelFromURL(req.URL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
config.Files = append(config.Files, req.AdditionalFiles...)
|
||||
|
||||
return gallery.InstallModel(modelPath, req.Name, &config, req.Overrides, downloadStatus)
|
||||
}
|
||||
|
||||
func processRequests(modelPath, s string, cm *ConfigLoader, galleries []gallery.Gallery, requests []galleryModel) error {
|
||||
var err error
|
||||
for _, r := range requests {
|
||||
utils.ResetDownloadTimers()
|
||||
if r.ID == "" {
|
||||
err = PrepareModel(modelPath, r.GalleryModel, cm, utils.DisplayDownloadFunction)
|
||||
} else {
|
||||
if strings.Contains(r.ID, "@") {
|
||||
err = gallery.InstallModelFromGallery(
|
||||
galleries, r.ID, modelPath, r.GalleryModel, utils.DisplayDownloadFunction)
|
||||
} else {
|
||||
err = gallery.InstallModelFromGalleryByName(
|
||||
galleries, r.ID, modelPath, r.GalleryModel, utils.DisplayDownloadFunction)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func ApplyGalleryFromFile(modelPath, s string, cm *ConfigLoader, galleries []gallery.Gallery) error {
|
||||
dat, err := os.ReadFile(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var requests []galleryModel
|
||||
|
||||
if err := yaml.Unmarshal(dat, &requests); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return processRequests(modelPath, s, cm, galleries, requests)
|
||||
}
|
||||
|
||||
func ApplyGalleryFromString(modelPath, s string, cm *ConfigLoader, galleries []gallery.Gallery) error {
|
||||
var requests []galleryModel
|
||||
err := json.Unmarshal([]byte(s), &requests)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return processRequests(modelPath, s, cm, galleries, requests)
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"github.com/go-skynet/LocalAI/pkg/schema"
|
||||
"go.opentelemetry.io/otel/exporters/prometheus"
|
||||
api "go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
)
|
||||
|
||||
// setupOTelSDK bootstraps the OpenTelemetry pipeline.
|
||||
// If it does not return an error, make sure to call shutdown for proper cleanup.
|
||||
func SetupMetrics() (*schema.LocalAIMetrics, error) {
|
||||
exporter, err := prometheus.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
provider := metric.NewMeterProvider(metric.WithReader(exporter))
|
||||
meter := provider.Meter("github.com/go-skynet/LocalAI")
|
||||
|
||||
apiTimeMetric, err := meter.Float64Histogram("api_call", api.WithDescription("api calls"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &schema.LocalAIMetrics{
|
||||
Meter: meter,
|
||||
ApiTimeMetric: apiTimeMetric,
|
||||
}, nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue