Merge branch 'master' into default_miro

This commit is contained in:
Dave 2024-08-20 19:10:51 -04:00 committed by GitHub
commit 3eb1c1c689
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
197 changed files with 4171 additions and 1305 deletions

View file

@ -15,4 +15,5 @@ var CLI struct {
Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"`
Worker worker.Worker `cmd:"" help:"Run workers to distribute workload (llama.cpp-only)"`
Util UtilCMD `cmd:"" help:"Utility commands"`
Explorer ExplorerCMD `cmd:"" help:"Run p2p explorer"`
}

49
core/cli/explorer.go Normal file
View file

@ -0,0 +1,49 @@
package cli
import (
"context"
"time"
cliContext "github.com/mudler/LocalAI/core/cli/context"
"github.com/mudler/LocalAI/core/explorer"
"github.com/mudler/LocalAI/core/http"
)
type ExplorerCMD struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
ConnectionErrorThreshold int `env:"LOCALAI_CONNECTION_ERROR_THRESHOLD,CONNECTION_ERROR_THRESHOLD" default:"3" help:"Connection failure threshold for the explorer" group:"api"`
WithSync bool `env:"LOCALAI_WITH_SYNC,WITH_SYNC" default:"false" help:"Enable sync with the network" group:"api"`
OnlySync bool `env:"LOCALAI_ONLY_SYNC,ONLY_SYNC" default:"false" help:"Only sync with the network" group:"api"`
}
func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
db, err := explorer.NewDatabase(e.PoolDatabase)
if err != nil {
return err
}
dur, err := time.ParseDuration(e.ConnectionTimeout)
if err != nil {
return err
}
if e.WithSync {
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
go ds.Start(context.Background(), true)
}
if e.OnlySync {
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
ctx := context.Background()
return ds.Start(ctx, false)
}
appHTTP := http.Explorer(db)
return appHTTP.Listen(e.Address)
}

View file

@ -8,14 +8,16 @@ import (
)
type FederatedCLI struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
}
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken, f.LoadBalanced)
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker)
return fs.Start(context.Background())
}

View file

@ -83,7 +83,9 @@ func (mi *ModelsInstall) Run(ctx *cliContext.Context) error {
return err
}
if !downloader.LooksLikeOCI(modelName) {
modelURI := downloader.URI(modelName)
if !modelURI.LooksLikeOCI() {
model := gallery.FindModel(models, modelName, mi.ModelsPath)
if model == nil {
log.Error().Str("model", modelName).Msg("model not found")

View file

@ -54,6 +54,7 @@ type RunCMD struct {
OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"hardening"`
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
@ -63,6 +64,7 @@ type RunCMD struct {
EnableWatchdogBusy bool `env:"LOCALAI_WATCHDOG_BUSY,WATCHDOG_BUSY" default:"false" help:"Enable watchdog for stopping backends that are busy longer than the watchdog-busy-timeout" group:"backends"`
WatchdogBusyTimeout string `env:"LOCALAI_WATCHDOG_BUSY_TIMEOUT,WATCHDOG_BUSY_TIMEOUT" default:"5m" help:"Threshold beyond which a busy backend should be stopped" group:"backends"`
Federated bool `env:"LOCALAI_FEDERATED,FEDERATED" help:"Enable federated instance" group:"federated"`
DisableGalleryEndpoint bool `env:"LOCALAI_DISABLE_GALLERY_ENDPOINT,DISABLE_GALLERY_ENDPOINT" help:"Disable the gallery endpoints" group:"api"`
}
func (r *RunCMD) Run(ctx *cliContext.Context) error {
@ -94,6 +96,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
config.WithModelsURL(append(r.Models, r.ModelArgs...)...),
config.WithOpaqueErrors(r.OpaqueErrors),
config.WithEnforcedPredownloadScans(!r.DisablePredownloadScan),
config.WithP2PNetworkID(r.Peer2PeerNetworkID),
}
token := ""
@ -119,9 +122,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
}
log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func(serviceID string, node p2p.NodeData) {
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes("") {
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
@ -132,7 +135,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar)
log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar)
}); err != nil {
}, true); err != nil {
return err
}
}
@ -142,14 +145,13 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if err != nil {
return err
}
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.FederatedID); err != nil {
return err
}
node, err := p2p.NewNode(token)
fedCtx := context.Background()
node, err := p2p.ExposeService(fedCtx, "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID))
if err != nil {
return err
}
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.FederatedID, nil); err != nil {
if err := p2p.ServiceDiscoverer(fedCtx, node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil, false); err != nil {
return err
}
}
@ -161,6 +163,10 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
opts = append(opts, config.DisableWebUI)
}
if r.DisableGalleryEndpoint {
opts = append(opts, config.DisableGalleryEndpoint)
}
if idleWatchDog || busyWatchDog {
opts = append(opts, config.EnableWatchDog)
if idleWatchDog {

View file

@ -86,8 +86,8 @@ func (hfscmd *HFScanCMD) Run(ctx *cliContext.Context) error {
var errs error = nil
for _, uri := range hfscmd.ToScan {
log.Info().Str("uri", uri).Msg("scanning specific uri")
scanResults, err := downloader.HuggingFaceScan(uri)
if err != nil && !errors.Is(err, downloader.ErrNonHuggingFaceFile) {
scanResults, err := downloader.HuggingFaceScan(downloader.URI(uri))
if err != nil && errors.Is(err, downloader.ErrUnsafeFilesFound) {
log.Error().Err(err).Strs("clamAV", scanResults.ClamAVInfectedFiles).Strs("pickles", scanResults.DangerousPickles).Msg("! WARNING ! A known-vulnerable model is included in this repo!")
errs = errors.Join(errs, err)
}

View file

@ -21,7 +21,7 @@ func (r *LLamaCPP) Run(ctx *cliContext.Context) error {
err := assets.ExtractFiles(ctx.BackendAssets, r.BackendAssetsPath)
log.Debug().Msgf("Extracting backend assets files to %s", r.BackendAssetsPath)
if err != nil {
log.Warn().Msgf("Failed extracting backend assets files: %s (might be required for some backends to work properly, like gpt4all)", err)
log.Warn().Msgf("Failed extracting backend assets files: %s (might be required for some backends to work properly)", err)
}
if len(os.Args) < 4 {

View file

@ -19,12 +19,13 @@ import (
)
type P2P struct {
WorkerFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"`
WorkerFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
}
func (r *P2P) Run(ctx *cliContext.Context) error {
@ -32,7 +33,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
err := assets.ExtractFiles(ctx.BackendAssets, r.BackendAssetsPath)
log.Debug().Msgf("Extracting backend assets files to %s", r.BackendAssetsPath)
if err != nil {
log.Warn().Msgf("Failed extracting backend assets files: %s (might be required for some backends to work properly, like gpt4all)", err)
log.Warn().Msgf("Failed extracting backend assets files: %s (might be required for some backends to work properly)", err)
}
// Check if the token is set
@ -59,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
p = r.RunnerPort
}
err = p2p.ExposeService(context.Background(), address, p, r.Token, "")
_, err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
@ -99,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
}
}()
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, "")
_, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}

View file

@ -34,6 +34,7 @@ type ApplicationConfig struct {
EnforcePredownloadScans bool
OpaqueErrors bool
P2PToken string
P2PNetworkID string
ModelLibraryURL string
@ -56,6 +57,8 @@ type ApplicationConfig struct {
ModelsURL []string
WatchDogBusyTimeout, WatchDogIdleTimeout time.Duration
DisableGalleryEndpoint bool
}
type AppOption func(*ApplicationConfig)
@ -91,6 +94,12 @@ func WithCors(b bool) AppOption {
}
}
func WithP2PNetworkID(s string) AppOption {
return func(o *ApplicationConfig) {
o.P2PNetworkID = s
}
}
func WithCsrf(b bool) AppOption {
return func(o *ApplicationConfig) {
o.CSRF = b
@ -124,6 +133,10 @@ var EnableWatchDogIdleCheck = func(o *ApplicationConfig) {
o.WatchDogIdle = true
}
var DisableGalleryEndpoint = func(o *ApplicationConfig) {
o.DisableGalleryEndpoint = true
}
var EnableWatchDogBusyCheck = func(o *ApplicationConfig) {
o.WatchDog = true
o.WatchDogBusy = true

View file

@ -8,7 +8,6 @@ import (
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/pkg/downloader"
"github.com/mudler/LocalAI/pkg/functions"
"github.com/mudler/LocalAI/pkg/utils"
)
const (
@ -72,9 +71,9 @@ type BackendConfig struct {
}
type File struct {
Filename string `yaml:"filename" json:"filename"`
SHA256 string `yaml:"sha256" json:"sha256"`
URI string `yaml:"uri" json:"uri"`
Filename string `yaml:"filename" json:"filename"`
SHA256 string `yaml:"sha256" json:"sha256"`
URI downloader.URI `yaml:"uri" json:"uri"`
}
type VallE struct {
@ -213,28 +212,32 @@ func (c *BackendConfig) ShouldCallSpecificFunction() bool {
// MMProjFileName returns the filename of the MMProj file
// If the MMProj is a URL, it will return the MD5 of the URL which is the filename
func (c *BackendConfig) MMProjFileName() string {
modelURL := downloader.ConvertURL(c.MMProj)
if downloader.LooksLikeURL(modelURL) {
return utils.MD5(modelURL)
uri := downloader.URI(c.MMProj)
if uri.LooksLikeURL() {
f, _ := uri.FilenameFromUrl()
return f
}
return c.MMProj
}
func (c *BackendConfig) IsMMProjURL() bool {
return downloader.LooksLikeURL(downloader.ConvertURL(c.MMProj))
uri := downloader.URI(c.MMProj)
return uri.LooksLikeURL()
}
func (c *BackendConfig) IsModelURL() bool {
return downloader.LooksLikeURL(downloader.ConvertURL(c.Model))
uri := downloader.URI(c.Model)
return uri.LooksLikeURL()
}
// ModelFileName returns the filename of the model
// If the model is a URL, it will return the MD5 of the URL which is the filename
func (c *BackendConfig) ModelFileName() string {
modelURL := downloader.ConvertURL(c.Model)
if downloader.LooksLikeURL(modelURL) {
return utils.MD5(modelURL)
uri := downloader.URI(c.Model)
if uri.LooksLikeURL() {
f, _ := uri.FilenameFromUrl()
return f
}
return c.Model

View file

@ -244,7 +244,7 @@ func (bcl *BackendConfigLoader) Preload(modelPath string) error {
// Create file path
filePath := filepath.Join(modelPath, file.Filename)
if err := downloader.DownloadFile(file.URI, filePath, file.SHA256, i, len(config.DownloadFiles), status); err != nil {
if err := file.URI.DownloadFile(filePath, file.SHA256, i, len(config.DownloadFiles), status); err != nil {
return err
}
}
@ -252,10 +252,10 @@ func (bcl *BackendConfigLoader) Preload(modelPath string) error {
// If the model is an URL, expand it, and download the file
if config.IsModelURL() {
modelFileName := config.ModelFileName()
modelURL := downloader.ConvertURL(config.Model)
uri := downloader.URI(config.Model)
// check if file exists
if _, err := os.Stat(filepath.Join(modelPath, modelFileName)); errors.Is(err, os.ErrNotExist) {
err := downloader.DownloadFile(modelURL, filepath.Join(modelPath, modelFileName), "", 0, 0, status)
err := uri.DownloadFile(filepath.Join(modelPath, modelFileName), "", 0, 0, status)
if err != nil {
return err
}
@ -269,10 +269,10 @@ func (bcl *BackendConfigLoader) Preload(modelPath string) error {
if config.IsMMProjURL() {
modelFileName := config.MMProjFileName()
modelURL := downloader.ConvertURL(config.MMProj)
uri := downloader.URI(config.MMProj)
// check if file exists
if _, err := os.Stat(filepath.Join(modelPath, modelFileName)); errors.Is(err, os.ErrNotExist) {
err := downloader.DownloadFile(modelURL, filepath.Join(modelPath, modelFileName), "", 0, 0, status)
err := uri.DownloadFile(filepath.Join(modelPath, modelFileName), "", 0, 0, status)
if err != nil {
return err
}

View file

@ -26,15 +26,17 @@ const (
type settingsConfig struct {
StopWords []string
TemplateConfig TemplateConfig
RepeatPenalty float64
}
// default settings to adopt with a given model family
var defaultsSettings map[familyType]settingsConfig = map[familyType]settingsConfig{
Gemma: {
RepeatPenalty: 1.0,
StopWords: []string{"<|im_end|>", "<end_of_turn>", "<start_of_turn>"},
TemplateConfig: TemplateConfig{
Chat: "{{.Input }}\n<|start_of_turn|>model\n",
ChatMessage: "<|start_of_turn|>{{if eq .RoleName \"assistant\" }}model{{else}}{{ .RoleName }}{{end}}\n{{ if .Content -}}\n{{.Content -}}\n{{ end -}}<|end_of_turn|>",
Chat: "{{.Input }}\n<start_of_turn>model\n",
ChatMessage: "<start_of_turn>{{if eq .RoleName \"assistant\" }}model{{else}}{{ .RoleName }}{{end}}\n{{ if .Content -}}\n{{.Content -}}\n{{ end -}}<end_of_turn>",
Completion: "{{.Input}}",
},
},
@ -192,6 +194,9 @@ func guessDefaultsFromFile(cfg *BackendConfig, modelPath string) {
if len(cfg.StopWords) == 0 {
cfg.StopWords = settings.StopWords
}
if cfg.RepeatPenalty == 0.0 {
cfg.RepeatPenalty = settings.RepeatPenalty
}
} else {
log.Debug().Any("family", family).Msgf("guessDefaultsFromFile: no template found for family")
}
@ -219,7 +224,7 @@ func identifyFamily(f *gguf.GGUFFile) familyType {
commandR := arch == "command-r" && eosTokenID == 255001
qwen2 := arch == "qwen2"
phi3 := arch == "phi-3"
gemma := strings.HasPrefix(f.Model().Name, "gemma")
gemma := strings.HasPrefix(arch, "gemma") || strings.Contains(strings.ToLower(f.Model().Name), "gemma")
deepseek2 := arch == "deepseek2"
switch {

View file

@ -37,7 +37,8 @@ func main() {
// download the assets
for _, asset := range assets {
if err := downloader.DownloadFile(asset.URL, filepath.Join(destPath, asset.FileName), asset.SHA, 1, 1, utils.DisplayDownloadFunction); err != nil {
uri := downloader.URI(asset.URL)
if err := uri.DownloadFile(filepath.Join(destPath, asset.FileName), asset.SHA, 1, 1, utils.DisplayDownloadFunction); err != nil {
panic(err)
}
}

125
core/explorer/database.go Normal file
View file

@ -0,0 +1,125 @@
package explorer
// A simple JSON database for storing and retrieving p2p network tokens and a name and description.
import (
"encoding/json"
"os"
"sort"
"sync"
"github.com/gofrs/flock"
)
// Database is a simple JSON database for storing and retrieving p2p network tokens and a name and description.
type Database struct {
path string
data map[string]TokenData
flock *flock.Flock
sync.Mutex
}
// TokenData is a p2p network token with a name and description.
type TokenData struct {
Name string `json:"name"`
Description string `json:"description"`
Clusters []ClusterData
Failures int
}
type ClusterData struct {
Workers []string
Type string
NetworkID string
}
// NewDatabase creates a new Database with the given path.
func NewDatabase(path string) (*Database, error) {
fileLock := flock.New(path + ".lock")
db := &Database{
data: make(map[string]TokenData),
path: path,
flock: fileLock,
}
return db, db.load()
}
// Get retrieves a Token from the Database by its token.
func (db *Database) Get(token string) (TokenData, bool) {
db.flock.Lock() // we are making sure that the file is not being written to
defer db.flock.Unlock()
db.Lock() // we are making sure that is safe if called by another instance in the same process
defer db.Unlock()
db.load()
t, ok := db.data[token]
return t, ok
}
// Set stores a Token in the Database by its token.
func (db *Database) Set(token string, t TokenData) error {
db.flock.Lock()
defer db.flock.Unlock()
db.Lock()
defer db.Unlock()
db.load()
db.data[token] = t
return db.save()
}
// Delete removes a Token from the Database by its token.
func (db *Database) Delete(token string) error {
db.flock.Lock()
defer db.flock.Unlock()
db.Lock()
defer db.Unlock()
db.load()
delete(db.data, token)
return db.save()
}
func (db *Database) TokenList() []string {
db.flock.Lock()
defer db.flock.Unlock()
db.Lock()
defer db.Unlock()
db.load()
tokens := []string{}
for k := range db.data {
tokens = append(tokens, k)
}
sort.Slice(tokens, func(i, j int) bool {
// sort by token
return tokens[i] < tokens[j]
})
return tokens
}
// load reads the Database from disk.
func (db *Database) load() error {
if _, err := os.Stat(db.path); os.IsNotExist(err) {
return nil
}
// Read the file from disk
// Unmarshal the JSON into db.data
f, err := os.ReadFile(db.path)
if err != nil {
return err
}
return json.Unmarshal(f, &db.data)
}
// Save writes the Database to disk.
func (db *Database) save() error {
// Marshal db.data into JSON
// Write the JSON to the file
f, err := os.Create(db.path)
if err != nil {
return err
}
defer f.Close()
return json.NewEncoder(f).Encode(db.data)
}

View file

@ -0,0 +1,92 @@
package explorer_test
import (
"os"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/explorer"
)
var _ = Describe("Database", func() {
var (
dbPath string
db *explorer.Database
err error
)
BeforeEach(func() {
// Create a temporary file path for the database
dbPath = "test_db.json"
db, err = explorer.NewDatabase(dbPath)
Expect(err).To(BeNil())
})
AfterEach(func() {
// Clean up the temporary database file
os.Remove(dbPath)
})
Context("when managing tokens", func() {
It("should add and retrieve a token", func() {
token := "token123"
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}
err = db.Set(token, t)
Expect(err).To(BeNil())
retrievedToken, exists := db.Get(token)
Expect(exists).To(BeTrue())
Expect(retrievedToken).To(Equal(t))
})
It("should delete a token", func() {
token := "token123"
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}
err = db.Set(token, t)
Expect(err).To(BeNil())
err = db.Delete(token)
Expect(err).To(BeNil())
_, exists := db.Get(token)
Expect(exists).To(BeFalse())
})
It("should persist data to disk", func() {
token := "token123"
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}
err = db.Set(token, t)
Expect(err).To(BeNil())
// Recreate the database object to simulate reloading from disk
db, err = explorer.NewDatabase(dbPath)
Expect(err).To(BeNil())
retrievedToken, exists := db.Get(token)
Expect(exists).To(BeTrue())
Expect(retrievedToken).To(Equal(t))
// Check the token list
tokenList := db.TokenList()
Expect(tokenList).To(ContainElement(token))
})
})
Context("when loading an empty or non-existent file", func() {
It("should start with an empty database", func() {
dbPath = "empty_db.json"
db, err = explorer.NewDatabase(dbPath)
Expect(err).To(BeNil())
_, exists := db.Get("nonexistent")
Expect(exists).To(BeFalse())
// Clean up
os.Remove(dbPath)
})
})
})

213
core/explorer/discovery.go Normal file
View file

@ -0,0 +1,213 @@
package explorer
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/edgevpn/pkg/blockchain"
)
type DiscoveryServer struct {
sync.Mutex
database *Database
connectionTime time.Duration
errorThreshold int
}
// NewDiscoveryServer creates a new DiscoveryServer with the given Database.
// it keeps the db state in sync with the network state
func NewDiscoveryServer(db *Database, dur time.Duration, failureThreshold int) *DiscoveryServer {
if dur == 0 {
dur = 50 * time.Second
}
if failureThreshold == 0 {
failureThreshold = 3
}
return &DiscoveryServer{
database: db,
connectionTime: dur,
errorThreshold: failureThreshold,
}
}
type Network struct {
Clusters []ClusterData
}
func (s *DiscoveryServer) runBackground() {
if len(s.database.TokenList()) == 0 {
time.Sleep(5 * time.Second) // avoid busy loop
return
}
for _, token := range s.database.TokenList() {
c, cancel := context.WithTimeout(context.Background(), s.connectionTime)
defer cancel()
// Connect to the network
// Get the number of nodes
// save it in the current state (mutex)
// do not do in parallel
n, err := p2p.NewNode(token)
if err != nil {
log.Err(err).Msg("Failed to create node")
s.failedToken(token)
continue
}
err = n.Start(c)
if err != nil {
log.Err(err).Msg("Failed to start node")
s.failedToken(token)
continue
}
ledger, err := n.Ledger()
if err != nil {
log.Err(err).Msg("Failed to start ledger")
s.failedToken(token)
continue
}
networkData := make(chan ClusterData)
// get the network data - it takes the whole timeout
// as we might not be connected to the network yet,
// and few attempts would have to be made before bailing out
go s.retrieveNetworkData(c, ledger, networkData)
hasWorkers := false
ledgerK := []ClusterData{}
for key := range networkData {
ledgerK = append(ledgerK, key)
if len(key.Workers) > 0 {
hasWorkers = true
}
}
log.Debug().Any("network", token).Msgf("Network has %d clusters", len(ledgerK))
if len(ledgerK) != 0 {
for _, k := range ledgerK {
log.Debug().Any("network", token).Msgf("Clusterdata %+v", k)
}
}
if hasWorkers {
s.Lock()
data, _ := s.database.Get(token)
(&data).Clusters = ledgerK
(&data).Failures = 0
s.database.Set(token, data)
s.Unlock()
} else {
s.failedToken(token)
}
}
s.deleteFailedConnections()
}
func (s *DiscoveryServer) failedToken(token string) {
s.Lock()
defer s.Unlock()
data, _ := s.database.Get(token)
(&data).Failures++
s.database.Set(token, data)
}
func (s *DiscoveryServer) deleteFailedConnections() {
s.Lock()
defer s.Unlock()
for _, t := range s.database.TokenList() {
data, _ := s.database.Get(t)
if data.Failures > s.errorThreshold {
log.Info().Any("token", t).Msg("Token has been removed from the database")
s.database.Delete(t)
}
}
}
func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockchain.Ledger, networkData chan ClusterData) {
clusters := map[string]ClusterData{}
defer func() {
for _, n := range clusters {
networkData <- n
}
close(networkData)
}()
for {
select {
case <-c.Done():
return
default:
time.Sleep(5 * time.Second)
data := ledger.LastBlock().Storage
LEDGER:
for d := range data {
toScanForWorkers := false
cd := ClusterData{}
isWorkerCluster := d == p2p.WorkerID || (strings.Contains(d, "_") && strings.Contains(d, p2p.WorkerID))
isFederatedCluster := d == p2p.FederatedID || (strings.Contains(d, "_") && strings.Contains(d, p2p.FederatedID))
switch {
case isWorkerCluster:
toScanForWorkers = true
cd.Type = "worker"
case isFederatedCluster:
toScanForWorkers = true
cd.Type = "federated"
}
if strings.Contains(d, "_") {
cd.NetworkID = strings.Split(d, "_")[0]
}
if !toScanForWorkers {
continue LEDGER
}
atLeastOneWorker := false
DATA:
for _, v := range data[d] {
nd := &p2p.NodeData{}
if err := v.Unmarshal(nd); err != nil {
continue DATA
}
if nd.IsOnline() {
atLeastOneWorker = true
(&cd).Workers = append(cd.Workers, nd.ID)
}
}
if atLeastOneWorker {
clusters[d] = cd
}
}
}
}
}
// Start the discovery server. This is meant to be run in to a goroutine.
func (s *DiscoveryServer) Start(ctx context.Context, keepRunning bool) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled")
default:
// Collect data
s.runBackground()
if !keepRunning {
return nil
}
}
}
}

View file

@ -0,0 +1,13 @@
package explorer_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestExplorer(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Explorer test suite")
}

View file

@ -131,7 +131,8 @@ func AvailableGalleryModels(galleries []config.Gallery, basePath string) ([]*Gal
func findGalleryURLFromReferenceURL(url string, basePath string) (string, error) {
var refFile string
err := downloader.DownloadAndUnmarshal(url, basePath, func(url string, d []byte) error {
uri := downloader.URI(url)
err := uri.DownloadAndUnmarshal(basePath, func(url string, d []byte) error {
refFile = string(d)
if len(refFile) == 0 {
return fmt.Errorf("invalid reference file at url %s: %s", url, d)
@ -153,8 +154,9 @@ func getGalleryModels(gallery config.Gallery, basePath string) ([]*GalleryModel,
return models, err
}
}
uri := downloader.URI(gallery.URL)
err := downloader.DownloadAndUnmarshal(gallery.URL, basePath, func(url string, d []byte) error {
err := uri.DownloadAndUnmarshal(basePath, func(url string, d []byte) error {
return yaml.Unmarshal(d, &models)
})
if err != nil {
@ -204,35 +206,34 @@ func DeleteModelFromSystem(basePath string, name string, additionalFiles []strin
log.Error().Err(err).Msgf("failed to read gallery file %s", configFile)
}
var filesToRemove []string
// Remove additional files
if galleryconfig != nil {
for _, f := range galleryconfig.Files {
fullPath := filepath.Join(basePath, f.Filename)
log.Debug().Msgf("Removing file %s", fullPath)
if e := os.Remove(fullPath); e != nil {
err = errors.Join(err, fmt.Errorf("failed to remove file %s: %w", f.Filename, e))
}
filesToRemove = append(filesToRemove, fullPath)
}
}
for _, f := range additionalFiles {
fullPath := filepath.Join(filepath.Join(basePath, f))
log.Debug().Msgf("Removing additional file %s", fullPath)
if e := os.Remove(fullPath); e != nil {
filesToRemove = append(filesToRemove, fullPath)
}
filesToRemove = append(filesToRemove, configFile)
filesToRemove = append(filesToRemove, galleryFile)
// skip duplicates
filesToRemove = utils.Unique(filesToRemove)
// Removing files
for _, f := range filesToRemove {
if e := os.Remove(f); e != nil {
err = errors.Join(err, fmt.Errorf("failed to remove file %s: %w", f, e))
}
}
log.Debug().Msgf("Removing model config file %s", configFile)
// Delete the model config file
if e := os.Remove(configFile); e != nil {
err = errors.Join(err, fmt.Errorf("failed to remove file %s: %w", configFile, e))
}
// Delete gallery config file
os.Remove(galleryFile)
return err
}
@ -253,8 +254,8 @@ func SafetyScanGalleryModels(galleries []config.Gallery, basePath string) error
func SafetyScanGalleryModel(galleryModel *GalleryModel) error {
for _, file := range galleryModel.AdditionalFiles {
scanResults, err := downloader.HuggingFaceScan(file.URI)
if err != nil && !errors.Is(err, downloader.ErrNonHuggingFaceFile) {
scanResults, err := downloader.HuggingFaceScan(downloader.URI(file.URI))
if err != nil && errors.Is(err, downloader.ErrUnsafeFilesFound) {
log.Error().Str("model", galleryModel.Name).Strs("clamAV", scanResults.ClamAVInfectedFiles).Strs("pickles", scanResults.DangerousPickles).Msg("Contains unsafe file(s)!")
return err
}

View file

@ -68,7 +68,8 @@ type PromptTemplate struct {
func GetGalleryConfigFromURL(url string, basePath string) (Config, error) {
var config Config
err := downloader.DownloadAndUnmarshal(url, basePath, func(url string, d []byte) error {
uri := downloader.URI(url)
err := uri.DownloadAndUnmarshal(basePath, func(url string, d []byte) error {
return yaml.Unmarshal(d, &config)
})
if err != nil {
@ -118,14 +119,14 @@ func InstallModel(basePath, nameOverride string, config *Config, configOverrides
filePath := filepath.Join(basePath, file.Filename)
if enforceScan {
scanResults, err := downloader.HuggingFaceScan(file.URI)
if err != nil && !errors.Is(err, downloader.ErrNonHuggingFaceFile) {
scanResults, err := downloader.HuggingFaceScan(downloader.URI(file.URI))
if err != nil && errors.Is(err, downloader.ErrUnsafeFilesFound) {
log.Error().Str("model", config.Name).Strs("clamAV", scanResults.ClamAVInfectedFiles).Strs("pickles", scanResults.DangerousPickles).Msg("Contains unsafe file(s)!")
return err
}
}
if err := downloader.DownloadFile(file.URI, filePath, file.SHA256, i, len(config.Files), downloadStatus); err != nil {
uri := downloader.URI(file.URI)
if err := uri.DownloadFile(filePath, file.SHA256, i, len(config.Files), downloadStatus); err != nil {
return err
}
}

View file

@ -73,8 +73,9 @@ func getModelStatus(url string) (response map[string]interface{}) {
}
func getModels(url string) (response []gallery.GalleryModel) {
uri := downloader.URI(url)
// TODO: No tests currently seem to exercise file:// urls. Fix?
downloader.DownloadAndUnmarshal(url, "", func(url string, i []byte) error {
uri.DownloadAndUnmarshal("", func(url string, i []byte) error {
// Unmarshal YAML data into a struct
return json.Unmarshal(i, &response)
})
@ -562,32 +563,6 @@ var _ = Describe("API test", func() {
Expect(res["unit"]).To(Equal("celcius"), fmt.Sprint(res))
Expect(string(resp2.Choices[0].FinishReason)).To(Equal("function_call"), fmt.Sprint(resp2.Choices[0].FinishReason))
})
It("runs gpt4all", Label("gpt4all"), func() {
if runtime.GOOS != "linux" {
Skip("test supported only on linux")
}
response := postModelApplyRequest("http://127.0.0.1:9090/models/apply", modelApplyRequest{
URL: "github:go-skynet/model-gallery/gpt4all-j.yaml",
Name: "gpt4all-j",
})
Expect(response["uuid"]).ToNot(BeEmpty(), fmt.Sprint(response))
uuid := response["uuid"].(string)
Eventually(func() bool {
response := getModelStatus("http://127.0.0.1:9090/models/jobs/" + uuid)
return response["processed"].(bool)
}, "960s", "10s").Should(Equal(true))
resp, err := client.CreateChatCompletion(context.TODO(), openai.ChatCompletionRequest{Model: "gpt4all-j", Messages: []openai.ChatCompletionMessage{openai.ChatCompletionMessage{Role: "user", Content: "How are you?"}}})
Expect(err).ToNot(HaveOccurred())
Expect(len(resp.Choices)).To(Equal(1))
Expect(resp.Choices[0].Message.Content).To(ContainSubstring("well"))
})
})
})
@ -791,20 +766,6 @@ var _ = Describe("API test", func() {
Expect(resp.Choices[0].Message.Content).ToNot(BeEmpty())
})
It("can generate completions from model configs", func() {
resp, err := client.CreateCompletion(context.TODO(), openai.CompletionRequest{Model: "gpt4all", Prompt: testPrompt})
Expect(err).ToNot(HaveOccurred())
Expect(len(resp.Choices)).To(Equal(1))
Expect(resp.Choices[0].Text).ToNot(BeEmpty())
})
It("can generate chat completions from model configs", func() {
resp, err := client.CreateChatCompletion(context.TODO(), openai.ChatCompletionRequest{Model: "gpt4all-2", Messages: []openai.ChatCompletionMessage{openai.ChatCompletionMessage{Role: "user", Content: testPrompt}}})
Expect(err).ToNot(HaveOccurred())
Expect(len(resp.Choices)).To(Equal(1))
Expect(resp.Choices[0].Message.Content).ToNot(BeEmpty())
})
It("returns errors", func() {
_, err := client.CreateCompletion(context.TODO(), openai.CompletionRequest{Model: "foomodel", Prompt: testPrompt})
Expect(err).To(HaveOccurred())

View file

@ -9,7 +9,6 @@ import (
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/LocalAI/core/services"
"github.com/mudler/LocalAI/pkg/xsync"
)
const (
@ -372,7 +371,12 @@ func dropBadChars(s string) string {
return strings.ReplaceAll(s, "@", "__")
}
func ListModels(models []*gallery.GalleryModel, processing *xsync.SyncedMap[string, string], galleryService *services.GalleryService) string {
type ProcessTracker interface {
Exists(string) bool
Get(string) string
}
func ListModels(models []*gallery.GalleryModel, processTracker ProcessTracker, galleryService *services.GalleryService) string {
modelsElements := []elem.Node{}
descriptionDiv := func(m *gallery.GalleryModel) elem.Node {
return elem.Div(
@ -396,7 +400,7 @@ func ListModels(models []*gallery.GalleryModel, processing *xsync.SyncedMap[stri
actionDiv := func(m *gallery.GalleryModel) elem.Node {
galleryID := fmt.Sprintf("%s@%s", m.Gallery.Name, m.Name)
currentlyProcessing := processing.Exists(galleryID)
currentlyProcessing := processTracker.Exists(galleryID)
jobID := ""
isDeletionOp := false
if currentlyProcessing {
@ -404,7 +408,7 @@ func ListModels(models []*gallery.GalleryModel, processing *xsync.SyncedMap[stri
if status != nil && status.Deletion {
isDeletionOp = true
}
jobID = processing.Get(galleryID)
jobID = processTracker.Get(galleryID)
// TODO:
// case not handled, if status == nil : "Waiting"
}
@ -497,8 +501,9 @@ func ListModels(models []*gallery.GalleryModel, processing *xsync.SyncedMap[stri
},
elem.Img(attrs.Props{
// "class": "rounded-t-lg object-fit object-center h-96",
"class": "rounded-t-lg max-h-48 max-w-96 object-cover mt-3",
"src": m.Icon,
"class": "rounded-t-lg max-h-48 max-w-96 object-cover mt-3",
"src": m.Icon,
"loading": "lazy",
}),
),
),

View file

@ -0,0 +1,102 @@
package explorer
import (
"encoding/base64"
"sort"
"github.com/gofiber/fiber/v2"
"github.com/mudler/LocalAI/core/explorer"
"github.com/mudler/LocalAI/internal"
)
func Dashboard() func(*fiber.Ctx) error {
return func(c *fiber.Ctx) error {
summary := fiber.Map{
"Title": "LocalAI API - " + internal.PrintableVersion(),
"Version": internal.PrintableVersion(),
}
if string(c.Context().Request.Header.ContentType()) == "application/json" || len(c.Accepts("html")) == 0 {
// The client expects a JSON response
return c.Status(fiber.StatusOK).JSON(summary)
} else {
// Render index
return c.Render("views/explorer", summary)
}
}
}
type AddNetworkRequest struct {
Token string `json:"token"`
Name string `json:"name"`
Description string `json:"description"`
}
type Network struct {
explorer.TokenData
Token string `json:"token"`
}
func ShowNetworks(db *explorer.Database) func(*fiber.Ctx) error {
return func(c *fiber.Ctx) error {
results := []Network{}
for _, token := range db.TokenList() {
networkData, exists := db.Get(token) // get the token data
hasWorkers := false
for _, cluster := range networkData.Clusters {
if len(cluster.Workers) > 0 {
hasWorkers = true
break
}
}
if exists && hasWorkers {
results = append(results, Network{TokenData: networkData, Token: token})
}
}
// order by number of clusters
sort.Slice(results, func(i, j int) bool {
return len(results[i].Clusters) > len(results[j].Clusters)
})
return c.JSON(results)
}
}
func AddNetwork(db *explorer.Database) func(*fiber.Ctx) error {
return func(c *fiber.Ctx) error {
request := new(AddNetworkRequest)
if err := c.BodyParser(request); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Cannot parse JSON"})
}
if request.Token == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Token is required"})
}
if request.Name == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Name is required"})
}
if request.Description == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Description is required"})
}
// TODO: check if token is valid, otherwise reject
// try to decode the token from base64
_, err := base64.StdEncoding.DecodeString(request.Token)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Invalid token"})
}
if _, exists := db.Get(request.Token); exists {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Token already exists"})
}
err = db.Set(request.Token, explorer.TokenData{Name: request.Name, Description: request.Description})
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Cannot add token"})
}
return c.Status(fiber.StatusOK).JSON(fiber.Map{"message": "Token added"})
}
}

View file

@ -11,12 +11,14 @@ import (
// @Summary Returns available P2P nodes
// @Success 200 {object} []schema.P2PNodesResponse "Response"
// @Router /api/p2p [get]
func ShowP2PNodes(c *fiber.Ctx) error {
func ShowP2PNodes(appConfig *config.ApplicationConfig) func(*fiber.Ctx) error {
// Render index
return c.JSON(schema.P2PNodesResponse{
Nodes: p2p.GetAvailableNodes(""),
FederatedNodes: p2p.GetAvailableNodes(p2p.FederatedID),
})
return func(c *fiber.Ctx) error {
return c.JSON(schema.P2PNodesResponse{
Nodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.WorkerID)),
FederatedNodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)),
})
}
}
// ShowP2PToken returns the P2P token

View file

@ -17,7 +17,10 @@ func WelcomeEndpoint(appConfig *config.ApplicationConfig,
backendConfigs := cl.GetAllBackendConfigs()
galleryConfigs := map[string]*gallery.Config{}
modelsWithBackendConfig := map[string]interface{}{}
for _, m := range backendConfigs {
modelsWithBackendConfig[m.Name] = nil
cfg, err := gallery.GetLocalModelConfiguration(ml.ModelPath, m.Name)
if err != nil {
@ -32,7 +35,7 @@ func WelcomeEndpoint(appConfig *config.ApplicationConfig,
modelsWithoutConfig := []string{}
for _, m := range models {
if _, ok := galleryConfigs[m]; !ok {
if _, ok := modelsWithBackendConfig[m]; !ok {
modelsWithoutConfig = append(modelsWithoutConfig, m)
}
}

View file

@ -172,6 +172,14 @@ func ChatEndpoint(cl *config.BackendConfigLoader, ml *model.ModelLoader, startup
funcs := input.Functions
shouldUseFn := len(input.Functions) > 0 && config.ShouldUseFunctions()
strictMode := false
for _, f := range input.Functions {
if f.Strict {
strictMode = true
break
}
}
// Allow the user to set custom actions via config file
// to be "embedded" in each model
@ -187,10 +195,33 @@ func ChatEndpoint(cl *config.BackendConfigLoader, ml *model.ModelLoader, startup
if config.ResponseFormatMap != nil {
d := schema.ChatCompletionResponseFormat{}
dat, _ := json.Marshal(config.ResponseFormatMap)
_ = json.Unmarshal(dat, &d)
dat, err := json.Marshal(config.ResponseFormatMap)
if err != nil {
return err
}
err = json.Unmarshal(dat, &d)
if err != nil {
return err
}
if d.Type == "json_object" {
input.Grammar = functions.JSONBNF
} else if d.Type == "json_schema" {
d := schema.JsonSchemaRequest{}
dat, err := json.Marshal(config.ResponseFormatMap)
if err != nil {
return err
}
err = json.Unmarshal(dat, &d)
if err != nil {
return err
}
fs := &functions.JSONFunctionStructure{
AnyOf: []functions.Item{d.JsonSchema.Schema},
}
g, err := fs.Grammar(config.FunctionsConfig.GrammarOptions()...)
if err == nil {
input.Grammar = g
}
}
}
@ -201,7 +232,7 @@ func ChatEndpoint(cl *config.BackendConfigLoader, ml *model.ModelLoader, startup
}
switch {
case !config.FunctionsConfig.GrammarConfig.NoGrammar && shouldUseFn:
case (!config.FunctionsConfig.GrammarConfig.NoGrammar || strictMode) && shouldUseFn:
noActionGrammar := functions.Function{
Name: noActionName,
Description: noActionDescription,

46
core/http/explorer.go Normal file
View file

@ -0,0 +1,46 @@
package http
import (
"net/http"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/favicon"
"github.com/gofiber/fiber/v2/middleware/filesystem"
"github.com/mudler/LocalAI/core/explorer"
"github.com/mudler/LocalAI/core/http/routes"
)
func Explorer(db *explorer.Database) *fiber.App {
fiberCfg := fiber.Config{
Views: renderEngine(),
// We disable the Fiber startup message as it does not conform to structured logging.
// We register a startup log line with connection information in the OnListen hook to keep things user friendly though
DisableStartupMessage: false,
// Override default error handler
}
app := fiber.New(fiberCfg)
routes.RegisterExplorerRoutes(app, db)
httpFS := http.FS(embedDirStatic)
app.Use(favicon.New(favicon.Config{
URL: "/favicon.ico",
FileSystem: httpFS,
File: "static/favicon.ico",
}))
app.Use("/static", filesystem.New(filesystem.Config{
Root: httpFS,
PathPrefix: "static",
Browse: true,
}))
// Define a custom 404 handler
// Note: keep this at the bottom!
app.Use(notFoundHandler)
return app
}

View file

@ -0,0 +1,13 @@
package routes
import (
"github.com/gofiber/fiber/v2"
coreExplorer "github.com/mudler/LocalAI/core/explorer"
"github.com/mudler/LocalAI/core/http/endpoints/explorer"
)
func RegisterExplorerRoutes(app *fiber.App, db *coreExplorer.Database) {
app.Get("/", explorer.Dashboard())
app.Post("/network/add", explorer.AddNetwork(db))
app.Get("/networks", explorer.ShowNetworks(db))
}

View file

@ -21,17 +21,18 @@ func RegisterLocalAIRoutes(app *fiber.App,
app.Get("/swagger/*", swagger.HandlerDefault) // default
// LocalAI API endpoints
if !appConfig.DisableGalleryEndpoint {
modelGalleryEndpointService := localai.CreateModelGalleryEndpointService(appConfig.Galleries, appConfig.ModelPath, galleryService)
app.Post("/models/apply", auth, modelGalleryEndpointService.ApplyModelGalleryEndpoint())
app.Post("/models/delete/:name", auth, modelGalleryEndpointService.DeleteModelGalleryEndpoint())
modelGalleryEndpointService := localai.CreateModelGalleryEndpointService(appConfig.Galleries, appConfig.ModelPath, galleryService)
app.Post("/models/apply", auth, modelGalleryEndpointService.ApplyModelGalleryEndpoint())
app.Post("/models/delete/:name", auth, modelGalleryEndpointService.DeleteModelGalleryEndpoint())
app.Get("/models/available", auth, modelGalleryEndpointService.ListModelFromGalleryEndpoint())
app.Get("/models/galleries", auth, modelGalleryEndpointService.ListModelGalleriesEndpoint())
app.Post("/models/galleries", auth, modelGalleryEndpointService.AddModelGalleryEndpoint())
app.Delete("/models/galleries", auth, modelGalleryEndpointService.RemoveModelGalleryEndpoint())
app.Get("/models/jobs/:uuid", auth, modelGalleryEndpointService.GetOpStatusEndpoint())
app.Get("/models/jobs", auth, modelGalleryEndpointService.GetAllStatusEndpoint())
app.Get("/models/available", auth, modelGalleryEndpointService.ListModelFromGalleryEndpoint())
app.Get("/models/galleries", auth, modelGalleryEndpointService.ListModelGalleriesEndpoint())
app.Post("/models/galleries", auth, modelGalleryEndpointService.AddModelGalleryEndpoint())
app.Delete("/models/galleries", auth, modelGalleryEndpointService.RemoveModelGalleryEndpoint())
app.Get("/models/jobs/:uuid", auth, modelGalleryEndpointService.GetOpStatusEndpoint())
app.Get("/models/jobs", auth, modelGalleryEndpointService.GetAllStatusEndpoint())
}
app.Post("/tts", auth, localai.TTSEndpoint(cl, ml, appConfig))
@ -59,7 +60,7 @@ func RegisterLocalAIRoutes(app *fiber.App,
// p2p
if p2p.IsP2PEnabled() {
app.Get("/api/p2p", auth, localai.ShowP2PNodes)
app.Get("/api/p2p", auth, localai.ShowP2PNodes(appConfig))
app.Get("/api/p2p/token", auth, localai.ShowP2PToken(appConfig))
}

View file

@ -21,6 +21,40 @@ import (
"github.com/google/uuid"
)
type modelOpCache struct {
status *xsync.SyncedMap[string, string]
}
func NewModelOpCache() *modelOpCache {
return &modelOpCache{
status: xsync.NewSyncedMap[string, string](),
}
}
func (m *modelOpCache) Set(key string, value string) {
m.status.Set(key, value)
}
func (m *modelOpCache) Get(key string) string {
return m.status.Get(key)
}
func (m *modelOpCache) DeleteUUID(uuid string) {
for _, k := range m.status.Keys() {
if m.status.Get(k) == uuid {
m.status.Delete(k)
}
}
}
func (m *modelOpCache) Map() map[string]string {
return m.status.Map()
}
func (m *modelOpCache) Exists(key string) bool {
return m.status.Exists(key)
}
func RegisterUIRoutes(app *fiber.App,
cl *config.BackendConfigLoader,
ml *model.ModelLoader,
@ -29,7 +63,7 @@ func RegisterUIRoutes(app *fiber.App,
auth func(*fiber.Ctx) error) {
// keeps the state of models that are being installed from the UI
var processingModels = xsync.NewSyncedMap[string, string]()
var processingModels = NewModelOpCache()
// modelStatus returns the current status of the models being processed (installation or deletion)
// it is called asynchonously from the UI
@ -62,6 +96,7 @@ func RegisterUIRoutes(app *fiber.App,
//"FederatedNodes": p2p.GetAvailableNodes(p2p.FederatedID),
"IsP2PEnabled": p2p.IsP2PEnabled(),
"P2PToken": appConfig.P2PToken,
"NetworkID": appConfig.P2PNetworkID,
}
// Render index
@ -70,202 +105,202 @@ func RegisterUIRoutes(app *fiber.App,
/* show nodes live! */
app.Get("/p2p/ui/workers", auth, func(c *fiber.Ctx) error {
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes("")))
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.WorkerID))))
})
app.Get("/p2p/ui/workers-federation", auth, func(c *fiber.Ctx) error {
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.FederatedID)))
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID))))
})
app.Get("/p2p/ui/workers-stats", auth, func(c *fiber.Ctx) error {
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes("")))
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.WorkerID))))
})
app.Get("/p2p/ui/workers-federation-stats", auth, func(c *fiber.Ctx) error {
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.FederatedID)))
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID))))
})
}
// Show the Models page (all models)
app.Get("/browse", auth, func(c *fiber.Ctx) error {
term := c.Query("term")
if !appConfig.DisableGalleryEndpoint {
models, _ := gallery.AvailableGalleryModels(appConfig.Galleries, appConfig.ModelPath)
// Show the Models page (all models)
app.Get("/browse", auth, func(c *fiber.Ctx) error {
term := c.Query("term")
// Get all available tags
allTags := map[string]struct{}{}
tags := []string{}
for _, m := range models {
for _, t := range m.Tags {
allTags[t] = struct{}{}
models, _ := gallery.AvailableGalleryModels(appConfig.Galleries, appConfig.ModelPath)
// Get all available tags
allTags := map[string]struct{}{}
tags := []string{}
for _, m := range models {
for _, t := range m.Tags {
allTags[t] = struct{}{}
}
}
}
for t := range allTags {
tags = append(tags, t)
}
sort.Strings(tags)
if term != "" {
models = gallery.GalleryModels(models).Search(term)
}
// Get model statuses
processingModelsData, taskTypes := modelStatus()
summary := fiber.Map{
"Title": "LocalAI - Models",
"Version": internal.PrintableVersion(),
"Models": template.HTML(elements.ListModels(models, processingModels, galleryService)),
"Repositories": appConfig.Galleries,
"AllTags": tags,
"ProcessingModels": processingModelsData,
"AvailableModels": len(models),
"IsP2PEnabled": p2p.IsP2PEnabled(),
"TaskTypes": taskTypes,
// "ApplicationConfig": appConfig,
}
// Render index
return c.Render("views/models", summary)
})
// Show the models, filtered from the user input
// https://htmx.org/examples/active-search/
app.Post("/browse/search/models", auth, func(c *fiber.Ctx) error {
form := struct {
Search string `form:"search"`
}{}
if err := c.BodyParser(&form); err != nil {
return c.Status(fiber.StatusBadRequest).SendString(err.Error())
}
models, _ := gallery.AvailableGalleryModels(appConfig.Galleries, appConfig.ModelPath)
return c.SendString(elements.ListModels(gallery.GalleryModels(models).Search(form.Search), processingModels, galleryService))
})
/*
Install routes
*/
// This route is used when the "Install" button is pressed, we submit here a new job to the gallery service
// https://htmx.org/examples/progress-bar/
app.Post("/browse/install/model/:id", auth, func(c *fiber.Ctx) error {
galleryID := strings.Clone(c.Params("id")) // note: strings.Clone is required for multiple requests!
log.Debug().Msgf("UI job submitted to install : %+v\n", galleryID)
id, err := uuid.NewUUID()
if err != nil {
return err
}
uid := id.String()
processingModels.Set(galleryID, uid)
op := gallery.GalleryOp{
Id: uid,
GalleryModelName: galleryID,
Galleries: appConfig.Galleries,
}
go func() {
galleryService.C <- op
}()
return c.SendString(elements.StartProgressBar(uid, "0", "Installation"))
})
// This route is used when the "Install" button is pressed, we submit here a new job to the gallery service
// https://htmx.org/examples/progress-bar/
app.Post("/browse/delete/model/:id", auth, func(c *fiber.Ctx) error {
galleryID := strings.Clone(c.Params("id")) // note: strings.Clone is required for multiple requests!
log.Debug().Msgf("UI job submitted to delete : %+v\n", galleryID)
var galleryName = galleryID
if strings.Contains(galleryID, "@") {
// if the galleryID contains a @ it means that it's a model from a gallery
// but we want to delete it from the local models which does not need
// a repository ID
galleryName = strings.Split(galleryID, "@")[1]
}
id, err := uuid.NewUUID()
if err != nil {
return err
}
uid := id.String()
// Track the deletion job by galleryID and galleryName
// The GalleryID contains information about the repository,
// while the GalleryName is ONLY the name of the model
processingModels.Set(galleryName, uid)
processingModels.Set(galleryID, uid)
op := gallery.GalleryOp{
Id: uid,
Delete: true,
GalleryModelName: galleryName,
}
go func() {
galleryService.C <- op
cl.RemoveBackendConfig(galleryName)
}()
return c.SendString(elements.StartProgressBar(uid, "0", "Deletion"))
})
// Display the job current progress status
// If the job is done, we trigger the /browse/job/:uid route
// https://htmx.org/examples/progress-bar/
app.Get("/browse/job/progress/:uid", auth, func(c *fiber.Ctx) error {
jobUID := strings.Clone(c.Params("uid")) // note: strings.Clone is required for multiple requests!
status := galleryService.GetStatus(jobUID)
if status == nil {
//fmt.Errorf("could not find any status for ID")
return c.SendString(elements.ProgressBar("0"))
}
if status.Progress == 100 {
c.Set("HX-Trigger", "done") // this triggers /browse/job/:uid (which is when the job is done)
return c.SendString(elements.ProgressBar("100"))
}
if status.Error != nil {
return c.SendString(elements.ErrorProgress(status.Error.Error(), status.GalleryModelName))
}
return c.SendString(elements.ProgressBar(fmt.Sprint(status.Progress)))
})
// this route is hit when the job is done, and we display the
// final state (for now just displays "Installation completed")
app.Get("/browse/job/:uid", auth, func(c *fiber.Ctx) error {
jobUID := strings.Clone(c.Params("uid")) // note: strings.Clone is required for multiple requests!
status := galleryService.GetStatus(jobUID)
galleryID := ""
for _, k := range processingModels.Keys() {
if processingModels.Get(k) == jobUID {
galleryID = k
processingModels.Delete(k)
for t := range allTags {
tags = append(tags, t)
}
}
if galleryID == "" {
log.Debug().Msgf("no processing model found for job : %+v\n", jobUID)
}
sort.Strings(tags)
log.Debug().Msgf("JOB finished : %+v\n", status)
showDelete := true
displayText := "Installation completed"
if status.Deletion {
showDelete = false
displayText = "Deletion completed"
}
if term != "" {
models = gallery.GalleryModels(models).Search(term)
}
return c.SendString(elements.DoneProgress(galleryID, displayText, showDelete))
})
// Get model statuses
processingModelsData, taskTypes := modelStatus()
summary := fiber.Map{
"Title": "LocalAI - Models",
"Version": internal.PrintableVersion(),
"Models": template.HTML(elements.ListModels(models, processingModels, galleryService)),
"Repositories": appConfig.Galleries,
"AllTags": tags,
"ProcessingModels": processingModelsData,
"AvailableModels": len(models),
"IsP2PEnabled": p2p.IsP2PEnabled(),
"TaskTypes": taskTypes,
// "ApplicationConfig": appConfig,
}
// Render index
return c.Render("views/models", summary)
})
// Show the models, filtered from the user input
// https://htmx.org/examples/active-search/
app.Post("/browse/search/models", auth, func(c *fiber.Ctx) error {
form := struct {
Search string `form:"search"`
}{}
if err := c.BodyParser(&form); err != nil {
return c.Status(fiber.StatusBadRequest).SendString(err.Error())
}
models, _ := gallery.AvailableGalleryModels(appConfig.Galleries, appConfig.ModelPath)
return c.SendString(elements.ListModels(gallery.GalleryModels(models).Search(form.Search), processingModels, galleryService))
})
/*
Install routes
*/
// This route is used when the "Install" button is pressed, we submit here a new job to the gallery service
// https://htmx.org/examples/progress-bar/
app.Post("/browse/install/model/:id", auth, func(c *fiber.Ctx) error {
galleryID := strings.Clone(c.Params("id")) // note: strings.Clone is required for multiple requests!
log.Debug().Msgf("UI job submitted to install : %+v\n", galleryID)
id, err := uuid.NewUUID()
if err != nil {
return err
}
uid := id.String()
processingModels.Set(galleryID, uid)
op := gallery.GalleryOp{
Id: uid,
GalleryModelName: galleryID,
Galleries: appConfig.Galleries,
}
go func() {
galleryService.C <- op
}()
return c.SendString(elements.StartProgressBar(uid, "0", "Installation"))
})
// This route is used when the "Install" button is pressed, we submit here a new job to the gallery service
// https://htmx.org/examples/progress-bar/
app.Post("/browse/delete/model/:id", auth, func(c *fiber.Ctx) error {
galleryID := strings.Clone(c.Params("id")) // note: strings.Clone is required for multiple requests!
log.Debug().Msgf("UI job submitted to delete : %+v\n", galleryID)
var galleryName = galleryID
if strings.Contains(galleryID, "@") {
// if the galleryID contains a @ it means that it's a model from a gallery
// but we want to delete it from the local models which does not need
// a repository ID
galleryName = strings.Split(galleryID, "@")[1]
}
id, err := uuid.NewUUID()
if err != nil {
return err
}
uid := id.String()
// Track the deletion job by galleryID and galleryName
// The GalleryID contains information about the repository,
// while the GalleryName is ONLY the name of the model
processingModels.Set(galleryName, uid)
processingModels.Set(galleryID, uid)
op := gallery.GalleryOp{
Id: uid,
Delete: true,
GalleryModelName: galleryName,
}
go func() {
galleryService.C <- op
cl.RemoveBackendConfig(galleryName)
}()
return c.SendString(elements.StartProgressBar(uid, "0", "Deletion"))
})
// Display the job current progress status
// If the job is done, we trigger the /browse/job/:uid route
// https://htmx.org/examples/progress-bar/
app.Get("/browse/job/progress/:uid", auth, func(c *fiber.Ctx) error {
jobUID := strings.Clone(c.Params("uid")) // note: strings.Clone is required for multiple requests!
status := galleryService.GetStatus(jobUID)
if status == nil {
//fmt.Errorf("could not find any status for ID")
return c.SendString(elements.ProgressBar("0"))
}
if status.Progress == 100 {
c.Set("HX-Trigger", "done") // this triggers /browse/job/:uid (which is when the job is done)
return c.SendString(elements.ProgressBar("100"))
}
if status.Error != nil {
// TODO: instead of deleting the job, we should keep it in the cache and make it dismissable by the user
processingModels.DeleteUUID(jobUID)
return c.SendString(elements.ErrorProgress(status.Error.Error(), status.GalleryModelName))
}
return c.SendString(elements.ProgressBar(fmt.Sprint(status.Progress)))
})
// this route is hit when the job is done, and we display the
// final state (for now just displays "Installation completed")
app.Get("/browse/job/:uid", auth, func(c *fiber.Ctx) error {
jobUID := strings.Clone(c.Params("uid")) // note: strings.Clone is required for multiple requests!
status := galleryService.GetStatus(jobUID)
galleryID := ""
processingModels.DeleteUUID(jobUID)
if galleryID == "" {
log.Debug().Msgf("no processing model found for job : %+v\n", jobUID)
}
log.Debug().Msgf("JOB finished : %+v\n", status)
showDelete := true
displayText := "Installation completed"
if status.Deletion {
showDelete = false
displayText = "Deletion completed"
}
return c.SendString(elements.DoneProgress(galleryID, displayText, showDelete))
})
}
// Show the Chat page
app.Get("/chat/:model", auth, func(c *fiber.Ctx) error {

View file

@ -0,0 +1,144 @@
const canvas = document.getElementById('networkCanvas');
const ctx = canvas.getContext('2d');
let particles = [];
let isDragging = false;
let dragParticle = null;
const maxParticles = 100; // Maximum number of particles
const dragAreaRadius = 10; // Increased area for easier dragging
// Function to resize canvas based on aspect ratio
function resizeCanvas() {
canvas.width = window.innerWidth;
canvas.height = Math.min(window.innerHeight, 400); // Maintain a max height of 400px
}
// Adjust the canvas size when the window resizes
window.addEventListener('resize', resizeCanvas);
// Initialize canvas size
resizeCanvas();
class Particle {
constructor(x, y) {
this.x = x;
this.y = y;
this.radius = 4;
this.color = `rgba(0, 255, 204, 1)`;
this.speedX = (Math.random() - 0.5) * 2; // Random horizontal speed
this.speedY = (Math.random() - 0.5) * 2; // Random vertical speed
}
update() {
if (!isDragging || dragParticle !== this) {
this.x += this.speedX;
this.y += this.speedY;
// Bounce off the edges of the canvas
if (this.x < 0 || this.x > canvas.width) {
this.speedX *= -1;
}
if (this.y < 0 || this.y > canvas.height) {
this.speedY *= -1;
}
}
}
draw() {
ctx.beginPath();
ctx.arc(this.x, this.y, this.radius, 0, Math.PI * 2, false);
ctx.fillStyle = this.color;
ctx.fill();
}
isMouseOver(mouseX, mouseY) {
// Increase the draggable area by checking if the mouse is within a larger radius
return Math.hypot(mouseX - this.x, mouseY - this.y) < dragAreaRadius;
}
}
function connectParticles() {
for (let i = 0; i < particles.length; i++) {
for (let j = i + 1; j < particles.length; j++) {
const distance = Math.hypot(particles[i].x - particles[j].x, particles[i].y - particles[j].y);
if (distance < 150) {
ctx.beginPath();
ctx.moveTo(particles[i].x, particles[i].y);
ctx.lineTo(particles[j].x, particles[j].y);
ctx.strokeStyle = `rgba(0, 255, 204, ${1 - distance / 150})`;
ctx.stroke();
}
}
}
}
function initParticles(num) {
for (let i = 0; i < num; i++) {
particles.push(new Particle(Math.random() * canvas.width, Math.random() * canvas.height));
}
}
function animate() {
ctx.clearRect(0, 0, canvas.width, canvas.height);
particles.forEach(particle => {
particle.update();
particle.draw();
});
connectParticles();
requestAnimationFrame(animate);
}
// Handle mouse click to create a new particle
canvas.addEventListener('click', (e) => {
const rect = canvas.getBoundingClientRect();
const mouseX = e.clientX - rect.left;
const mouseY = e.clientY - rect.top;
const newParticle = new Particle(mouseX, mouseY);
particles.push(newParticle);
// Limit the number of particles to the maximum
if (particles.length > maxParticles) {
particles.shift(); // Remove the oldest particle
}
});
// Handle mouse down for dragging
canvas.addEventListener('mousedown', (e) => {
const rect = canvas.getBoundingClientRect();
const mouseX = e.clientX - rect.left;
const mouseY = e.clientY - rect.top;
for (let particle of particles) {
if (particle.isMouseOver(mouseX, mouseY)) {
isDragging = true;
dragParticle = particle;
break;
}
}
});
// Handle mouse move for dragging
canvas.addEventListener('mousemove', (e) => {
if (isDragging && dragParticle) {
const rect = canvas.getBoundingClientRect();
const mouseX = e.clientX - rect.left;
const mouseY = e.clientY - rect.top;
dragParticle.x = mouseX;
dragParticle.y = mouseY;
}
});
// Handle mouse up to stop dragging
canvas.addEventListener('mouseup', () => {
isDragging = false;
dragParticle = null;
});
// Initialize and start the animation
initParticles(maxParticles);
animate();

View file

@ -0,0 +1,380 @@
<!DOCTYPE html>
<html lang="en">
{{template "views/partials/head" .}}
<style>
body {
background-color: #1a202c;
color: #e2e8f0;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.token {
word-break: break-all;
}
.container {
max-width: 800px;
margin: 0 auto;
padding: 20px;
position: relative;
}
.network-card {
background-color: #2d3748;
padding: 20px;
border-radius: 8px;
margin-bottom: 20px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
transition: transform 0.3s ease, box-shadow 0.3s ease;
}
.network-card:hover {
transform: translateY(-5px);
box-shadow: 0 6px 10px rgba(0, 0, 0, 0.15);
}
.network-title {
font-size: 24px;
font-weight: bold;
margin-bottom: 10px;
color: #63b3ed;
}
.network-token {
font-size: 14px;
font-style: italic;
color: #cbd5e0;
margin-bottom: 10px;
word-break: break-word; /* Breaks words to prevent overflow */
overflow-wrap: break-word; /* Ensures long strings break */
white-space: pre-wrap; /* Preserves whitespace for breaking */
}
.cluster {
margin-top: 10px;
background-color: #4a5568;
padding: 10px;
border-radius: 6px;
transition: background-color 0.3s ease;
}
.cluster:hover {
background-color: #5a6b78;
}
.cluster-title {
font-size: 18px;
font-weight: bold;
color: #e2e8f0;
}
.form-container {
background-color: #2d3748;
padding: 20px;
border-radius: 8px;
margin-bottom: 20px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
.form-control {
margin-bottom: 15px;
}
label {
display: block;
margin-bottom: 5px;
font-weight: bold;
}
input[type="text"],
textarea {
width: 100%;
padding: 10px;
border-radius: 4px;
border: 1px solid #4a5568;
background-color: #3a4250;
color: #e2e8f0;
transition: border-color 0.3s ease, background-color 0.3s ease;
}
input[type="text"]:focus,
textarea:focus {
border-color: #63b3ed;
background-color: #4a5568;
}
button {
background-color: #3182ce;
color: #e2e8f0;
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
transition: background-color 0.3s ease;
}
.error {
color: #e53e3e;
margin-top: 5px;
}
.success {
color: #38a169;
margin-top: 5px;
}
/* Spinner Styles */
.spinner {
display: inline-block;
width: 50px;
height: 50px;
border: 5px solid rgba(255, 255, 255, 0.2);
border-radius: 50%;
border-top-color: #3182ce;
animation: spin 1s linear infinite;
margin: 0 auto;
}
@keyframes spin {
to { transform: rotate(360deg); }
}
/* Center the loading text and spinner */
.loading-container {
text-align: center;
padding: 50px;
}
.warning-box {
border-radius: 5px;
}
.warning-box i {
margin-right: 10px;
}
.token-box {
background-color: #4a5568;
padding: 10px;
border-radius: 4px;
margin-top: 10px;
position: relative;
cursor: pointer;
}
.token-box:hover {
background-color: #5a6b7e;
}
.token-text {
overflow-wrap: break-word;
font-family: monospace;
}
.copy-icon {
position: absolute;
top: 10px;
right: 10px;
color: #e2e8f0;
}
</style>
<body class="bg-gray-900 text-gray-200">
<div class="flex flex-col min-h-screen" x-data="networkClusters()" x-init="init()">
{{template "views/partials/navbar_explorer" .}}
<div class="animation-container">
<canvas id="networkCanvas"></canvas>
<div class="text-overlay">
<header class="text-center py-12">
<h1 class="text-5xl font-bold text-gray-100">
<i class="fa-solid fa-circle-nodes mr-2"></i> Network Clusters Explorer
</h1>
<p class="mt-4 text-lg">
View the clusters and workers available in each network.
<a href="https://localai.io/features/distribute/" target="_blank">
<i class="fas fa-circle-info pr-2"></i>
</a>
</p>
</header>
</div>
</div>
<div class="container mx-auto px-4 flex-grow">
<!-- Warning Box -->
<div class="warning-box bg-yellow-100 text-gray-800 mb-20 pt-5 pb-5 pr-5 pl-5 text-lg">
<i class="fa-solid fa-triangle-exclamation"></i><i class="fa-solid fa-flask"></i>
The explorer is a global, community-driven tool to share network tokens and view available clusters in the globe.
Anyone can use the tokens to offload computation and use the clusters available or share resources.
This is provided without any warranty. Use it at your own risk. We are not responsible for any potential harm or misuse. Sharing tokens globally allows anyone from the internet to use your instances.
Although the community will address bugs, this is experimental software and may be insecure to deploy on your hardware unless you take all necessary precautions.
</div>
<div class="flow-root">
<!-- Toggle button for showing/hiding the form -->
<button class="bg-red-600 hover:bg-blue-600 float-right mb-2 flex items-center px-4 py-2 rounded" @click="toggleForm()">
<!-- Conditional icon display -->
<i :class="showForm ? 'fa-solid fa-times' : 'fa-solid fa-plus'" class="mr-2"></i>
<span x-text="showForm ? 'Close' : 'Add New Network'"></span>
</button>
</div>
<!-- Form for adding a new network -->
<div class="form-container" x-show="showForm" @click.outside="showForm = false">
<h2 class="text-3xl font-bold mb-4"><i class="fa-solid fa-plus"></i> Add New Network</h2>
<div class="form-control">
<label for="name">Network Name</label>
<input type="text" id="name" x-model="newNetwork.name" placeholder="Enter network name" />
</div>
<div class="form-control">
<label for="description">Description</label>
<textarea id="description" x-model="newNetwork.description" placeholder="Enter description"></textarea>
</div>
<div class="form-control">
<label for="token">Token</label>
<textarea id="token" x-model="newNetwork.token" placeholder="Enter token"></textarea>
</div>
<button @click="addNetwork"><i class="fa-solid fa-plus"></i> Add Network</button>
<template x-if="errorMessage">
<p class="error" x-text="errorMessage"></p>
</template>
<template x-if="successMessage">
<p class="success" x-text="successMessage"></p>
</template>
</div>
<!-- Loading Spinner -->
<template x-if="networks.length === 0 && !loadingComplete">
<div class="loading-container">
<div class="spinner"></div>
<p class="text-center mt-4">Loading networks...</p>
</div>
</template>
<template x-if="networks.length === 0 && loadingComplete">
<div class="loading-container">
<p class="text-center mt-4">No networks available with online workers</p>
</div>
</template>
<!-- Display Networks -->
<template x-for="network in networks" :key="network.name">
<div class="network-card">
<i class="fa-solid fa-circle-nodes mr-2"></i><span class="network-title font-bold mb-4 mt-1" x-text="network.name"></span>
<div class="token-box" @click="copyToken(network.token)">
<p class="text-lg font-bold mb-4 mt-1">
<i class="fa-solid fa-copy copy-icon"></i>
<i class="fa-solid fa-key mr-2"></i>Token (click to copy):
</p>
<span class="token-text" x-text="network.token"></span>
</div>
<div class="cluster">
<p class="text-lg font-bold mb-4 mt-1"><i class="fa-solid fa-book mr-2"></i> Description</p>
<p x-text="network.description"></p>
</div>
<h2 class="text-3xl font-bold mb-4 mt-4">Available Clusters in this network</h2>
<template x-for="cluster in network.Clusters" :key="cluster.NetworkID + cluster.Type">
<div class="cluster">
<div class="cluster-title"></div>
<span class="inline-block bg-orange-500 text-white py-1 px-3 rounded-full text-xs" x-text="'Cluster Type: ' + cluster.Type">
</span>
<span class="inline-block bg-orange-500 text-white py-1 px-3 rounded-full text-xs" x-show="cluster.NetworkID" x-text="'Network ID: ' + (cluster.NetworkID || 'N/A')">
</span>
<span class="inline-block bg-blue-500 text-white py-1 px-3 rounded-full text-xs" x-text="'Number of Workers: ' + cluster.Workers.length">
</span>
<!-- Give commands and instructions to join the network -->
<span class="inline-block token-box text-white py-1 px-3 text-xs" x-show="cluster.Type == 'federated'" >
<p class="text-lg font-bold mb-4 mt-1">
<i class="fa-solid fa-copy copy-icon float-right"></i>
Command to connect (click to copy):
</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words" @click="copyToken($el.textContent)" >
docker run -d --restart=always -e ADDRESS=":80" -e LOCALAI_P2P_NETWORK_ID=<span class="token" x-text="cluster.NetworkID"></span> -e LOCALAI_P2P_LOGLEVEL=debug --name local-ai -e TOKEN="<span class="token" x-text="network.token"></span>" --net host -ti localai/localai:master-ffmpeg-core federated --debug
</code>
or via CLI:
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words" @click="copyToken($el.textContent)" >
ADDRESS=":80" LOCALAI_P2P_NETWORK_ID=<span class="token" x-text="cluster.NetworkID"></span> LOCALAI_P2P_LOGLEVEL=debug TOKEN="<span class="token" x-text="network.token"></span>" local-ai federated --debug
</code>
</span>
</div>
</template>
</div>
</template>
</div>
<script>
function networkClusters() {
return {
networks: [],
newNetwork: {
name: '',
description: '',
token: ''
},
errorMessage: '',
successMessage: '',
showForm: false, // Form visibility state
loadingComplete: false, // To track if loading is complete
toggleForm() {
this.showForm = !this.showForm;
console.log('Toggling form:', this.showForm);
},
fetchNetworks() {
console.log('Fetching networks...');
fetch('/networks')
.then(response => response.json())
.then(data => {
console.log('Data fetched successfully:', data);
this.networks = data;
this.loadingComplete = true; // Set loading complete
})
.catch(error => {
console.error('Error fetching networks:', error);
this.loadingComplete = true; // Ensure spinner is hidden if error occurs
});
},
addNetwork() {
this.errorMessage = '';
this.successMessage = '';
console.log('Adding new network:', this.newNetwork);
// Validate input
if (!this.newNetwork.name || !this.newNetwork.description || !this.newNetwork.token) {
this.errorMessage = 'All fields are required.';
return;
}
fetch('/network/add', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(this.newNetwork)
})
.then(response => {
if (!response.ok) {
return response.json().then(err => { throw err; });
}
return response.json();
})
.then(data => {
console.log('Network added successfully:', data);
this.successMessage = 'Network added successfully!';
this.fetchNetworks(); // Refresh the networks list
this.newNetwork = { name: '', description: '', token: '' }; // Clear form
})
.catch(error => {
console.error('Error adding network:', error);
this.errorMessage = 'Failed to add network. Please try again.'
if (error.error) {
this.errorMessage += " Error : " + error.error;
}
});
},
copyToken(token) {
navigator.clipboard.writeText(token)
.then(() => {
console.log('Text copied to clipboard:', token);
alert('Text copied to clipboard!');
})
.catch(err => {
console.error('Failed to copy token:', err);
});
},
init() {
console.log('Initializing Alpine component...');
this.fetchNetworks();
setInterval(() => {
this.fetchNetworks();
}, 5000); // Refresh every 5 seconds
}
}
}
</script>
<script src="/static/p2panimation.js"></script>
{{template "views/partials/footer" .}}
</div>
</body>
</html>

View file

@ -1,21 +1,40 @@
<!DOCTYPE html>
<html lang="en">
{{template "views/partials/head" .}}
<body class="bg-gray-900 text-gray-200">
<div class="flex flex-col min-h-screen">
<div class="flex flex-col min-h-screen" x-data="{}">
{{template "views/partials/navbar" .}}
<div class="container mx-auto px-4 flex-grow">
<div class="workers mt-12 text-center">
<h2 class="text-3xl font-semibold text-gray-100 mb-8">
<i class="fa-solid fa-circle-nodes"></i> Distributed inference with P2P
<a href="https://localai.io/features/distribute/" target="_blank">
<i class="fas fa-circle-info pr-2"></i>
</a>
</h2>
<div class="animation-container">
<canvas id="networkCanvas"></canvas>
<div class="text-overlay">
<header class="text-center py-12">
<h1 class="text-5xl font-bold text-gray-100">
<i class="fa-solid fa-circle-nodes mr-2"></i> Distributed inference with P2P
</h1>
<p class="mt-4 text-lg">
Distribute computation by sharing and loadbalancing instances or sharding model weights.
<a href="https://localai.io/features/distribute/" target="_blank">
<i class="fas fa-circle-info pr-2"></i>
</a>
</p>
</header>
</div>
</div>
<h5 class="mb-4 text-justify">LocalAI uses P2P technologies to enable distribution of work between peers. It is possible to share an instance with Federation and/or split the weights of a model across peers (only available with llama.cpp models). You can now share computational resources between your devices or your friends!</h5>
<div class="bg-gray-800 p-6 rounded-lg shadow-lg mb-12 text-left">
<p class="text-lg font-bold mb-4 mt-1">
Network token
<i class="fa-solid fa-copy copy-icon float-right"></i>
</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words" @click="copyClipboard($el.textContent)">{{.P2PToken}}</code><br>
The network token can be used to either share the instance or join a federation or a worker network. Below you will find a few examples on how to start a new instance or a worker with the token and you will be able to see the available workers and federated nodes.
</div>
<!-- Warning box if p2p token is empty and p2p is enabled -->
{{ if and .IsP2PEnabled (eq .P2PToken "") }}
<div class="bg-red-500 p-4 rounded-lg shadow-lg mb-12 text-left">
@ -40,7 +59,6 @@
<h3 class="text-2xl font-semibold text-gray-100 mb-6"><i class="fa-solid fa-book"></i> Start a federated instance</h3>
<!-- Tabs navigation -->
<ul class="mb-5 flex list-none flex-row flex-wrap ps-0" role="tablist" data-twe-nav-ref>
<li role="presentation" class="flex-auto text-center">
@ -52,22 +70,25 @@
</ul>
<!-- Tabs content -->
<div class="mb-6">
<div class="mb-6">
<div class="tabcontent hidden opacity-100 transition-opacity duration-150 ease-linear data-[twe-tab-active]:block p-4" id="tabs-federated-cli" role="tabpanel" aria-labelledby="tabs-federated-cli" data-twe-tab-active>
<p class="mb-2">To start a new instance to share:</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words">
<p class="text-lg font-bold mb-4 mt-1">
To start a new instance to share:
<i class="fa-solid fa-copy copy-icon float-right"></i>
</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words" @click="copyClipboard($el.textContent)">
# Start a new instance to share with --federated and a TOKEN<br>
export TOKEN="<span class="token">{{.P2PToken}}</span>"<br>
local-ai run --federated --p2p
</code>
<p class="mt-2">Note: If you don't have a token do not specify it and use the generated one that you can find in this page.</p>
<p class="mb-2">To start a new federated load balancer:</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words">
<p class="text-lg font-bold mb-4 mt-1">
To start a new federated load balancer:
<i class="fa-solid fa-copy copy-icon float-right"></i>
</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words" @click="copyClipboard($el.textContent)">
export TOKEN="<span class="token">{{.P2PToken}}</span>"<br>
local-ai federated
</code>
@ -77,13 +98,19 @@
<p class="mt-2">For all the options available, please refer to the <a href="https://localai.io/features/distribute/#starting-workers" target="_blank" class="text-yellow-300 hover:text-yellow-400">documentation</a>.</p>
</div>
<div class="tabcontent hidden opacity-0 transition-opacity duration-150 ease-linear data-[twe-tab-active]:block p-4" id="tabs-federated-docker" role="tabpanel" aria-labelledby="tabs-federated-docker">
<p class="mb-2">To start a new federated instance:</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words">
<p class="text-lg font-bold mb-4 mt-1">
To start a new federated instance:
<i class="fa-solid fa-copy copy-icon float-right"></i>
</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words" @click="copyClipboard($el.textContent)">
docker run -ti --net host -e TOKEN="<span class="token">{{.P2PToken}}</span>" --name local-ai -p 8080:8080 localai/localai:latest-cpu run --federated --p2p
</code>
<p class="mb-2">To start a new federated server (port to 9090):</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words">
<p class="text-lg font-bold mb-4 mt-1">
To start a new federated server with Docker (port to 9090):
<i class="fa-solid fa-copy copy-icon float-right"></i>
</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words" @click="copyClipboard($el.textContent)">
docker run -ti --net host -e TOKEN="<span class="token">{{.P2PToken}}</span>" --name local-ai -p 9090:8080 localai/localai:latest-cpu federated
</code>
@ -119,8 +146,11 @@
<!-- Tabs content -->
<div class="mb-6">
<div class="tabcontent hidden opacity-100 transition-opacity duration-150 ease-linear data-[twe-tab-active]:block p-4" id="tabs-cli" role="tabpanel" aria-labelledby="tabs-cli" data-twe-tab-active>
<p class="mb-2">To start a new worker, run the following command:</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words">
<p class="text-lg font-bold mb-4 mt-1">
To start a new worker, run the following command:
<i class="fa-solid fa-copy copy-icon float-right"></i>
</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words" @click="copyClipboard($el.textContent)">
export TOKEN="<span class="token">{{.P2PToken}}</span>"<br>
local-ai worker p2p-llama-cpp-rpc
</code>
@ -128,8 +158,11 @@
<p class="mt-2">For all the options available, please refer to the <a href="https://localai.io/features/distribute/#starting-workers" target="_blank" class="text-yellow-300 hover:text-yellow-400">documentation</a>.</p>
</div>
<div class="tabcontent hidden opacity-0 transition-opacity duration-150 ease-linear data-[twe-tab-active]:block p-4" id="tabs-docker" role="tabpanel" aria-labelledby="tabs-docker">
<p class="mb-2">To start a new worker with docker, run the following command:</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words">
<p class="text-lg font-bold mb-4 mt-1">
To start a new worker with docker, run the following command:
<i class="fa-solid fa-copy copy-icon float-right"></i>
</p>
<code class="block bg-gray-700 text-yellow-300 p-4 rounded-lg break-words" @click="copyClipboard($el.textContent)">
docker run -ti --net host -e TOKEN="<span class="token">{{.P2PToken}}</span>" --name local-ai -p 8080:8080 localai/localai:latest-cpu worker p2p-llama-cpp-rpc
</code>
@ -144,7 +177,7 @@
{{template "views/partials/footer" .}}
</div>
<script src="/static/p2panimation.js"></script>
<style>
.token {
word-break: break-all;

View file

@ -6,6 +6,7 @@
rel="stylesheet"
href="/static/assets/highlightjs.css"
/>
<script defer src="/static/assets/anime.min.js"></script>
<script
defer
src="/static/assets/highlightjs.js"
@ -46,9 +47,79 @@
preflight: false,
},
};
function copyClipboard(token) {
navigator.clipboard.writeText(token)
.then(() => {
console.log('Text copied to clipboard:', token);
alert('Text copied to clipboard!');
})
.catch(err => {
console.error('Failed to copy token:', err);
});
}
</script>
<link href="/static/assets/fontawesome/css/fontawesome.css" rel="stylesheet" />
<link href="/static/assets/fontawesome/css/brands.css" rel="stylesheet" />
<link href="/static/assets/fontawesome/css/solid.css" rel="stylesheet" />
<script src="/static/assets/htmx.js" crossorigin="anonymous"></script>
<link href="/static/assets/fontawesome/css/fontawesome.css" rel="stylesheet" />
<link href="/static/assets/fontawesome/css/brands.css" rel="stylesheet" />
<link href="/static/assets/fontawesome/css/solid.css" rel="stylesheet" />
<script src="/static/assets/htmx.js" crossorigin="anonymous"></script>
<!-- P2P Animation START -->
<style>
.animation-container {
position: relative;
width: 100%;
height: 25vh;
display: flex;
justify-content: center;
align-items: center;
overflow: hidden;
}
canvas {
position: absolute;
top: 0;
left: 0;
}
.text-overlay {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
text-align: center;
z-index: 1;
}
</style>
<!-- P2P Animation END -->
<!-- Flask and node animation -->
<style>
.fa-circle-nodes {
/* font-size: 100px; /* Adjust the size as needed */
animation: rotateCircleNodes 8s linear infinite; /* Slow and fluid rotation */
display: inline-block;
}
@keyframes rotateCircleNodes {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
/* Animation for the warning box */
.fa-flask {
/* font-size: 100px; /* Adjust the size as needed */
animation: shakeFlask 3s ease-in-out infinite; /* Smooth easing and longer duration for fluidity */
transform-origin: bottom center;
}
@keyframes shakeFlask {
0%, 10% { transform: rotate(0deg); } /* Start and end still */
20% { transform: rotate(-10deg); } /* Smooth transition to left */
30% { transform: rotate(10deg); } /* Smooth transition to right */
40% { transform: rotate(-8deg); } /* Smooth transition to left */
50% { transform: rotate(8deg); } /* Smooth transition to right */
60% { transform: rotate(-5deg); } /* Smooth transition to left */
70% { transform: rotate(5deg); } /* Smooth transition to right */
80% { transform: rotate(-2deg); } /* Smooth transition to left */
90% { transform: rotate(2deg); } /* Smooth transition to right */
100% { transform: rotate(0deg); } /* Return to center */
}
</style>
</head>

View file

@ -0,0 +1,39 @@
<nav class="bg-gray-800 shadow-lg">
<div class="container mx-auto px-4 py-4">
<div class="flex items-center justify-between">
<div class="flex items-center">
<!-- Logo Image: Replace 'logo_url_here' with your actual logo URL -->
<a href="/" class="text-white text-xl font-bold"><img src="https://github.com/go-skynet/LocalAI/assets/2420543/0966aa2a-166e-4f99-a3e5-6c915fc997dd" alt="LocalAI Logo" class="h-10 mr-3 border-2 border-gray-300 shadow rounded"></a>
<a href="/" class="text-white text-xl font-bold">LocalAI</a>
</div>
<!-- Menu button for small screens -->
<div class="lg:hidden">
<button id="menu-toggle" class="text-gray-400 hover:text-white focus:outline-none">
<i class="fas fa-bars fa-lg"></i>
</button>
</div>
<!-- Navigation links -->
<div class="hidden lg:flex lg:items-center lg:justify-end lg:flex-1 lg:w-0">
<a href="/" class="text-gray-400 hover:text-white px-3 py-2 rounded"><i class="fas fa-home pr-2"></i>Home</a>
<a href="https://localai.io" class="text-gray-400 hover:text-white px-3 py-2 rounded" target="_blank" ><i class="fas fa-book-reader pr-2"></i> Documentation</a>
<a href="https://models.localai.io/" class="text-gray-400 hover:text-white px-3 py-2 rounded"><i class="fas fa-brain pr-2"></i> Models</a>
</div>
</div>
<!-- Collapsible menu for small screens -->
<div class="hidden lg:hidden" id="mobile-menu">
<div class="pt-4 pb-3 border-t border-gray-700">
<a href="/" class="block text-gray-400 hover:text-white px-3 py-2 rounded mt-1"><i class="fas fa-home pr-2"></i>Home</a>
<a href="https://localai.io" class="block text-gray-400 hover:text-white px-3 py-2 rounded mt-1" target="_blank" ><i class="fas fa-book-reader pr-2"></i> Documentation</a>
<a href="https://models.localai.io/" class="text-gray-400 hover:text-white px-3 py-2 rounded"><i class="fas fa-brain pr-2"></i> Models</a>
</div>
</div>
</div>
</nav>
<script>
// JavaScript to toggle the mobile menu
document.getElementById('menu-toggle').addEventListener('click', function () {
var mobileMenu = document.getElementById('mobile-menu');
mobileMenu.classList.toggle('hidden');
});
</script>

View file

@ -1,24 +1,87 @@
package p2p
import (
"fmt"
"math/rand/v2"
"sync"
"github.com/rs/zerolog/log"
)
const FederatedID = "federated"
func NetworkID(networkID, serviceID string) string {
if networkID != "" {
return fmt.Sprintf("%s_%s", networkID, serviceID)
}
return serviceID
}
type FederatedServer struct {
sync.Mutex
listenAddr, service, p2ptoken string
requestTable map[string]int
loadBalanced bool
workerTarget string
}
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool) *FederatedServer {
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer {
return &FederatedServer{
listenAddr: listenAddr,
service: service,
p2ptoken: p2pToken,
requestTable: map[string]int{},
loadBalanced: loadBalanced,
workerTarget: workerTarget,
}
}
func (fs *FederatedServer) RandomServer() string {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.ID)
} else {
delete(fs.requestTable, v.ID) // make sure it's not tracked
log.Info().Msgf("Node %s is offline", v.ID)
}
}
if len(tunnelAddresses) == 0 {
return ""
}
return tunnelAddresses[rand.IntN(len(tunnelAddresses))]
}
func (fs *FederatedServer) syncTableStatus() {
fs.Lock()
defer fs.Unlock()
currentTunnels := make(map[string]struct{})
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
fs.ensureRecordExist(v.ID)
currentTunnels[v.ID] = struct{}{}
}
}
// delete tunnels that don't exist anymore
for t := range fs.requestTable {
if _, ok := currentTunnels[t]; !ok {
delete(fs.requestTable, t)
}
}
}
func (fs *FederatedServer) SelectLeastUsedServer() string {
fs.syncTableStatus()
fs.Lock()
defer fs.Unlock()
log.Debug().Any("request_table", fs.requestTable).Msgf("SelectLeastUsedServer()")
// cycle over requestTable and find the entry with the lower number
// if there are multiple entries with the same number, select one randomly
// if there are no entries, return an empty string
@ -30,18 +93,26 @@ func (fs *FederatedServer) SelectLeastUsedServer() string {
minKey = k
}
}
log.Debug().Any("requests_served", min).Any("request_table", fs.requestTable).Msgf("Selected tunnel %s", minKey)
return minKey
}
func (fs *FederatedServer) RecordRequest(nodeID string) {
fs.Lock()
defer fs.Unlock()
// increment the counter for the nodeID in the requestTable
fs.requestTable[nodeID]++
log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Recording request")
}
func (fs *FederatedServer) EnsureRecordExist(nodeID string) {
func (fs *FederatedServer) ensureRecordExist(nodeID string) {
// if the nodeID is not in the requestTable, add it with a counter of 0
_, ok := fs.requestTable[nodeID]
if !ok {
fs.requestTable[nodeID] = 0
}
log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Ensure record exists")
}

View file

@ -8,18 +8,12 @@ import (
"errors"
"fmt"
"net"
"time"
"math/rand/v2"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
"github.com/rs/zerolog/log"
)
func (f *FederatedServer) Start(ctx context.Context) error {
n, err := NewNode(f.p2ptoken)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
@ -31,7 +25,7 @@ func (f *FederatedServer) Start(ctx context.Context) error {
if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) {
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
}); err != nil {
}, false); err != nil {
return err
}
@ -48,27 +42,12 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
return err
}
// ll.Info("Binding local port on", srcaddr)
go func() {
<-ctx.Done()
l.Close()
}()
ledger, _ := node.Ledger()
// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)
nodeAnnounce(ctx, node)
defer l.Close()
for {
@ -76,7 +55,7 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Debug().Msg("New for connection")
log.Debug().Msgf("New connection from %s", l.Addr().String())
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
@ -86,55 +65,38 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
workerID := ""
if fs.workerTarget != "" {
workerID = fs.workerTarget
} else if fs.loadBalanced {
log.Debug().Msgf("Load balancing request")
workerID = fs.SelectLeastUsedServer()
if workerID == "" {
log.Debug().Msgf("Least used server not found, selecting random")
workerID = fs.RandomServer()
}
} else {
workerID = fs.RandomServer()
}
if len(tunnelAddresses) == 0 {
if workerID == "" {
log.Error().Msg("No available nodes yet")
return
}
tunnelAddr := ""
if fs.loadBalanced {
for _, t := range tunnelAddresses {
fs.EnsureRecordExist(t)
}
tunnelAddr = fs.SelectLeastUsedServer()
log.Debug().Msgf("Selected tunnel %s", tunnelAddr)
if tunnelAddr == "" {
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
}
fs.RecordRequest(tunnelAddr)
} else {
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
}
tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
log.Debug().Msgf("Selected node %s", workerID)
nodeData, exists := GetNode(fs.service, workerID)
if !exists {
log.Error().Msgf("Node %s not found", workerID)
return
}
log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, tunnelConn, conn)
go copyStream(closer, conn, tunnelConn)
<-closer
tunnelConn.Close()
conn.Close()
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
proxyP2PConnection(ctx, node, nodeData.ServiceID, conn)
if fs.loadBalanced {
fs.RecordRequest(workerID)
}
}()
}
}
}

View file

@ -5,12 +5,16 @@ import (
"time"
)
const defaultServicesID = "services_localai"
const (
defaultServicesID = "services"
WorkerID = "worker"
)
type NodeData struct {
Name string
ID string
TunnelAddress string
ServiceID string
LastSeen time.Time
}
@ -36,6 +40,19 @@ func GetAvailableNodes(serviceID string) []NodeData {
return availableNodes
}
func GetNode(serviceID, nodeID string) (NodeData, bool) {
if serviceID == "" {
serviceID = defaultServicesID
}
mu.Lock()
defer mu.Unlock()
if _, ok := nodes[serviceID]; !ok {
return NodeData{}, false
}
nd, exists := nodes[serviceID][nodeID]
return nd, exists
}
func AddNode(serviceID string, node NodeData) {
if serviceID == "" {
serviceID = defaultServicesID

View file

@ -21,16 +21,40 @@ import (
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/services"
"github.com/mudler/edgevpn/pkg/types"
eutils "github.com/mudler/edgevpn/pkg/utils"
"github.com/phayes/freeport"
zlog "github.com/rs/zerolog/log"
"github.com/mudler/edgevpn/pkg/logger"
)
func generateNewConnectionData() *node.YAMLConnectionConfig {
maxMessSize := 20 << 20 // 20MB
keyLength := 43
return &node.YAMLConnectionConfig{
MaxMessageSize: maxMessSize,
RoomName: eutils.RandStringRunes(keyLength),
Rendezvous: eutils.RandStringRunes(keyLength),
MDNS: eutils.RandStringRunes(keyLength),
OTP: node.OTP{
DHT: node.OTPConfig{
Key: eutils.RandStringRunes(keyLength),
Interval: 120,
Length: keyLength,
},
Crypto: node.OTPConfig{
Key: eutils.RandStringRunes(keyLength),
Interval: 9000,
Length: keyLength,
},
},
}
}
func GenerateToken() string {
// Generates a new config and exit
newData := node.GenerateNewConnectionData(900)
return newData.Base64()
return generateNewConnectionData().Base64()
}
func IsP2PEnabled() bool {
@ -42,17 +66,7 @@ func nodeID(s string) string {
return fmt.Sprintf("%s-%s", hostname, s)
}
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", listenAddr)
if err != nil {
zlog.Error().Err(err).Msg("Error listening")
return err
}
// ll.Info("Binding local port on", srcaddr)
func nodeAnnounce(ctx context.Context, node *node.Node) {
ledger, _ := node.Ledger()
// Announce ourselves so nodes accepts our connection
@ -60,19 +74,74 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
ctx,
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)
}
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
ledger, _ := node.Ledger()
// Retrieve current ID for ip in the blockchain
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
service := &types.Service{}
existingValue.Unmarshal(service)
// If mismatch, update the blockchain
if !found {
zlog.Error().Msg("Service not found on blockchain")
conn.Close()
// ll.Debugf("service '%s' not found on blockchain", serviceID)
return
}
// Decode the Peer
d, err := peer.Decode(service.PeerID)
if err != nil {
zlog.Error().Msg("cannot decode peer")
conn.Close()
// ll.Debugf("could not decode peer '%s'", service.PeerID)
return
}
// Open a stream
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
if err != nil {
zlog.Error().Err(err).Msg("cannot open stream peer")
conn.Close()
// ll.Debugf("could not open stream '%s'", err.Error())
return
}
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, stream, conn)
go copyStream(closer, conn, stream)
<-closer
stream.Close()
conn.Close()
}
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", listenAddr)
if err != nil {
zlog.Error().Err(err).Msg("Error listening")
return err
}
go func() {
<-ctx.Done()
l.Close()
}()
nodeAnnounce(ctx, node)
defer l.Close()
for {
@ -90,47 +159,7 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
// Retrieve current ID for ip in the blockchain
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, service)
service := &types.Service{}
existingValue.Unmarshal(service)
// If mismatch, update the blockchain
if !found {
zlog.Error().Msg("Service not found on blockchain")
conn.Close()
// ll.Debugf("service '%s' not found on blockchain", serviceID)
return
}
// Decode the Peer
d, err := peer.Decode(service.PeerID)
if err != nil {
zlog.Error().Msg("cannot decode peer")
conn.Close()
// ll.Debugf("could not decode peer '%s'", service.PeerID)
return
}
// Open a stream
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
if err != nil {
zlog.Error().Msg("cannot open stream peer")
conn.Close()
// ll.Debugf("could not open stream '%s'", err.Error())
return
}
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, stream, conn)
go copyStream(closer, conn, stream)
<-closer
stream.Close()
conn.Close()
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
proxyP2PConnection(ctx, node, service, conn)
}()
}
}
@ -139,11 +168,11 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
// This is the main of the server (which keeps the env variable updated)
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error {
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error {
if servicesID == "" {
servicesID = defaultServicesID
}
tunnels, err := discoveryTunnels(ctx, n, token, servicesID)
tunnels, err := discoveryTunnels(ctx, n, token, servicesID, allocate)
if err != nil {
return err
}
@ -170,7 +199,7 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri
return nil
}
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string) (chan NodeData, error) {
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) {
tunnels := make(chan NodeData)
err := n.Start(ctx)
@ -181,7 +210,6 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
}
// get new services, allocate and return to the channel
// TODO:
@ -198,17 +226,19 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
return
default:
time.Sleep(5 * time.Second)
zlog.Debug().Msg("Searching for workers")
data := ledger.LastBlock().Storage[servicesID]
zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data")
for k, v := range data {
zlog.Info().Msgf("Found worker %s", k)
zlog.Debug().Msgf("New worker found in the ledger data '%s'", k)
nd := &NodeData{}
if err := v.Unmarshal(nd); err != nil {
zlog.Error().Msg("cannot unmarshal node data")
continue
}
ensureService(ctx, n, nd, k)
ensureService(ctx, n, nd, k, allocate)
muservice.Lock()
if _, ok := service[nd.Name]; ok {
tunnels <- service[nd.Name].NodeData
@ -230,28 +260,35 @@ type nodeServiceData struct {
var service = map[string]nodeServiceData{}
var muservice sync.Mutex
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string) {
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) {
muservice.Lock()
defer muservice.Unlock()
nd.ServiceID = sserv
if ndService, found := service[nd.Name]; !found {
if !nd.IsOnline() {
// if node is offline and not present, do nothing
zlog.Debug().Msgf("Node %s is offline", nd.ID)
return
}
newCtxm, cancel := context.WithCancel(ctx)
// Start the service
port, err := freeport.GetFreePort()
if err != nil {
fmt.Print(err)
if allocate {
// Start the service
port, err := freeport.GetFreePort()
if err != nil {
zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID)
return
}
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
nd.TunnelAddress = tunnelAddress
go allocateLocalService(newCtxm, n, tunnelAddress, sserv)
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress)
}
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
nd.TunnelAddress = tunnelAddress
service[nd.Name] = nodeServiceData{
NodeData: *nd,
CancelFunc: cancel,
}
go allocateLocalService(newCtxm, n, tunnelAddress, sserv)
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress)
} else {
// Check if the service is still alive
// if not cancel the context
@ -272,7 +309,7 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string
}
// This is the P2P worker main
func ExposeService(ctx context.Context, host, port, token, servicesID string) error {
func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) {
if servicesID == "" {
servicesID = defaultServicesID
}
@ -280,7 +317,7 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er
nodeOpts, err := newNodeOpts(token)
if err != nil {
return err
return nil, err
}
// generate a random string for the name
name := utils.RandString(10)
@ -290,27 +327,23 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er
services.RegisterService(llger, time.Duration(60)*time.Second, name, fmt.Sprintf("%s:%s", host, port))...)
n, err := node.New(nodeOpts...)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
return nil, fmt.Errorf("creating a new node: %w", err)
}
err = n.Start(ctx)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
return n, fmt.Errorf("creating a new node: %w", err)
}
ledger, err := n.Ledger()
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
return n, fmt.Errorf("creating a new node: %w", err)
}
ledger.Announce(
ctx,
20*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey("services_localai", name)
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[name] = &NodeData{
Name: name,
@ -318,11 +351,10 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er
ID: nodeID(name),
}
ledger.Add(servicesID, updatedMap)
// }
},
)
return err
return n, err
}
func NewNode(token string) (*node.Node, error) {
@ -345,19 +377,25 @@ func newNodeOpts(token string) ([]node.Option, error) {
// TODO: move this up, expose more config options when creating a node
noDHT := os.Getenv("LOCALAI_P2P_DISABLE_DHT") == "true"
noLimits := os.Getenv("LOCALAI_P2P_DISABLE_LIMITS") == "true"
loglevel := "info"
noLimits := os.Getenv("LOCALAI_P2P_ENABLE_LIMITS") == "true"
loglevel := os.Getenv("LOCALAI_P2P_LOGLEVEL")
if loglevel == "" {
loglevel = "info"
}
libp2ploglevel := os.Getenv("LOCALAI_LIBP2P_LOGLEVEL")
if libp2ploglevel == "" {
libp2ploglevel = "fatal"
}
c := config.Config{
Limit: config.ResourceLimit{
Enable: !noLimits,
Enable: noLimits,
MaxConns: 100,
},
NetworkToken: token,
LowProfile: false,
LogLevel: loglevel,
Libp2pLogLevel: "fatal",
Libp2pLogLevel: libp2ploglevel,
Ledger: config.Ledger{
SyncInterval: defaultInterval,
AnnounceInterval: defaultInterval,
@ -366,19 +404,19 @@ func newNodeOpts(token string) ([]node.Option, error) {
Service: true,
Map: true,
RateLimit: true,
RateLimitGlobal: 10,
RateLimitPeer: 10,
RateLimitGlobal: 100,
RateLimitPeer: 100,
RateLimitInterval: defaultInterval,
},
Discovery: config.Discovery{
DHT: noDHT,
DHT: !noDHT,
MDNS: true,
Interval: 30 * time.Second,
Interval: 10 * time.Second,
},
Connection: config.Connection{
HolePunch: true,
AutoRelay: true,
MaxConnections: 100,
MaxConnections: 1000,
},
}

View file

@ -18,12 +18,12 @@ func (f *FederatedServer) Start(ctx context.Context) error {
return fmt.Errorf("not implemented")
}
func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error {
func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData), allocate bool) error {
return fmt.Errorf("not implemented")
}
func ExposeService(ctx context.Context, host, port, token, servicesID string) error {
return fmt.Errorf("not implemented")
func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) {
return nil, fmt.Errorf("not implemented")
}
func IsP2PEnabled() bool {

View file

@ -139,6 +139,17 @@ type ChatCompletionResponseFormat struct {
Type ChatCompletionResponseFormatType `json:"type,omitempty"`
}
type JsonSchemaRequest struct {
Type string `json:"type"`
JsonSchema JsonSchema `json:"json_schema"`
}
type JsonSchema struct {
Name string `json:"name"`
Strict bool `json:"strict"`
Schema functions.Item `json:"schema"`
}
type OpenAIRequest struct {
PredictionOptions

View file

@ -106,7 +106,7 @@ func Startup(opts ...config.AppOption) (*config.BackendConfigLoader, *model.Mode
err := assets.ExtractFiles(options.BackendAssets, options.AssetsDestination)
log.Debug().Msgf("Extracting backend assets files to %s", options.AssetsDestination)
if err != nil {
log.Warn().Msgf("Failed extracting backend assets files: %s (might be required for some backends to work properly, like gpt4all)", err)
log.Warn().Msgf("Failed extracting backend assets files: %s (might be required for some backends to work properly)", err)
}
}