mirror of
https://github.com/mudler/LocalAI.git
synced 2025-05-20 10:35:01 +00:00
feat(p2p): allow to run multiple clusters in the same p2p network (#3128)
feat(p2p): allow to run multiple clusters in the same network Allow to specify a network ID via CLI which allows to run multiple clusters, logically separated within the same network (by using the same shared token). Note: This segregation is not "secure" by any means, anyone having the network token can see the services available in all the network, however, this provides a way to separate the inference endpoints. This allows for instance to have a node which is both federated and having attached a set of llama.cpp workers. Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
parent
2c8623dbb4
commit
36e185ba63
8 changed files with 50 additions and 26 deletions
|
@ -8,14 +8,15 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type FederatedCLI struct {
|
type FederatedCLI struct {
|
||||||
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
|
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
|
||||||
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
|
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
|
||||||
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
|
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
|
||||||
|
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
|
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
|
||||||
|
|
||||||
fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken, f.LoadBalanced)
|
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, f.LoadBalanced)
|
||||||
|
|
||||||
return fs.Start(context.Background())
|
return fs.Start(context.Background())
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,7 @@ type RunCMD struct {
|
||||||
OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"hardening"`
|
OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"hardening"`
|
||||||
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
|
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
|
||||||
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
|
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
|
||||||
|
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
|
||||||
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
|
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
|
||||||
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
|
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
|
||||||
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
|
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
|
||||||
|
@ -94,6 +95,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
|
||||||
config.WithModelsURL(append(r.Models, r.ModelArgs...)...),
|
config.WithModelsURL(append(r.Models, r.ModelArgs...)...),
|
||||||
config.WithOpaqueErrors(r.OpaqueErrors),
|
config.WithOpaqueErrors(r.OpaqueErrors),
|
||||||
config.WithEnforcedPredownloadScans(!r.DisablePredownloadScan),
|
config.WithEnforcedPredownloadScans(!r.DisablePredownloadScan),
|
||||||
|
config.WithP2PNetworkID(r.Peer2PeerNetworkID),
|
||||||
}
|
}
|
||||||
|
|
||||||
token := ""
|
token := ""
|
||||||
|
@ -119,9 +121,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info().Msg("Starting P2P server discovery...")
|
log.Info().Msg("Starting P2P server discovery...")
|
||||||
if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func(serviceID string, node p2p.NodeData) {
|
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, ""), func(serviceID string, node p2p.NodeData) {
|
||||||
var tunnelAddresses []string
|
var tunnelAddresses []string
|
||||||
for _, v := range p2p.GetAvailableNodes("") {
|
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, "")) {
|
||||||
if v.IsOnline() {
|
if v.IsOnline() {
|
||||||
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
|
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
|
||||||
} else {
|
} else {
|
||||||
|
@ -142,14 +144,15 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.FederatedID); err != nil {
|
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
node, err := p2p.NewNode(token)
|
node, err := p2p.NewNode(token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.FederatedID, nil); err != nil {
|
|
||||||
|
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,12 +19,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type P2P struct {
|
type P2P struct {
|
||||||
WorkerFlags `embed:""`
|
WorkerFlags `embed:""`
|
||||||
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
|
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
|
||||||
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
|
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
|
||||||
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
|
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
|
||||||
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
|
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
|
||||||
ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"`
|
ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"`
|
||||||
|
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *P2P) Run(ctx *cliContext.Context) error {
|
func (r *P2P) Run(ctx *cliContext.Context) error {
|
||||||
|
@ -59,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
|
||||||
p = r.RunnerPort
|
p = r.RunnerPort
|
||||||
}
|
}
|
||||||
|
|
||||||
err = p2p.ExposeService(context.Background(), address, p, r.Token, "")
|
err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -99,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, "")
|
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ type ApplicationConfig struct {
|
||||||
EnforcePredownloadScans bool
|
EnforcePredownloadScans bool
|
||||||
OpaqueErrors bool
|
OpaqueErrors bool
|
||||||
P2PToken string
|
P2PToken string
|
||||||
|
P2PNetworkID string
|
||||||
|
|
||||||
ModelLibraryURL string
|
ModelLibraryURL string
|
||||||
|
|
||||||
|
@ -91,6 +92,12 @@ func WithCors(b bool) AppOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithP2PNetworkID(s string) AppOption {
|
||||||
|
return func(o *ApplicationConfig) {
|
||||||
|
o.P2PNetworkID = s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithCsrf(b bool) AppOption {
|
func WithCsrf(b bool) AppOption {
|
||||||
return func(o *ApplicationConfig) {
|
return func(o *ApplicationConfig) {
|
||||||
o.CSRF = b
|
o.CSRF = b
|
||||||
|
|
|
@ -11,12 +11,14 @@ import (
|
||||||
// @Summary Returns available P2P nodes
|
// @Summary Returns available P2P nodes
|
||||||
// @Success 200 {object} []schema.P2PNodesResponse "Response"
|
// @Success 200 {object} []schema.P2PNodesResponse "Response"
|
||||||
// @Router /api/p2p [get]
|
// @Router /api/p2p [get]
|
||||||
func ShowP2PNodes(c *fiber.Ctx) error {
|
func ShowP2PNodes(appConfig *config.ApplicationConfig) func(*fiber.Ctx) error {
|
||||||
// Render index
|
// Render index
|
||||||
return c.JSON(schema.P2PNodesResponse{
|
return func(c *fiber.Ctx) error {
|
||||||
Nodes: p2p.GetAvailableNodes(""),
|
return c.JSON(schema.P2PNodesResponse{
|
||||||
FederatedNodes: p2p.GetAvailableNodes(p2p.FederatedID),
|
Nodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, "")),
|
||||||
})
|
FederatedNodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)),
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShowP2PToken returns the P2P token
|
// ShowP2PToken returns the P2P token
|
||||||
|
|
|
@ -59,7 +59,7 @@ func RegisterLocalAIRoutes(app *fiber.App,
|
||||||
|
|
||||||
// p2p
|
// p2p
|
||||||
if p2p.IsP2PEnabled() {
|
if p2p.IsP2PEnabled() {
|
||||||
app.Get("/api/p2p", auth, localai.ShowP2PNodes)
|
app.Get("/api/p2p", auth, localai.ShowP2PNodes(appConfig))
|
||||||
app.Get("/api/p2p/token", auth, localai.ShowP2PToken(appConfig))
|
app.Get("/api/p2p/token", auth, localai.ShowP2PToken(appConfig))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,6 +96,7 @@ func RegisterUIRoutes(app *fiber.App,
|
||||||
//"FederatedNodes": p2p.GetAvailableNodes(p2p.FederatedID),
|
//"FederatedNodes": p2p.GetAvailableNodes(p2p.FederatedID),
|
||||||
"IsP2PEnabled": p2p.IsP2PEnabled(),
|
"IsP2PEnabled": p2p.IsP2PEnabled(),
|
||||||
"P2PToken": appConfig.P2PToken,
|
"P2PToken": appConfig.P2PToken,
|
||||||
|
"NetworkID": appConfig.P2PNetworkID,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Render index
|
// Render index
|
||||||
|
@ -104,17 +105,17 @@ func RegisterUIRoutes(app *fiber.App,
|
||||||
|
|
||||||
/* show nodes live! */
|
/* show nodes live! */
|
||||||
app.Get("/p2p/ui/workers", auth, func(c *fiber.Ctx) error {
|
app.Get("/p2p/ui/workers", auth, func(c *fiber.Ctx) error {
|
||||||
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes("")))
|
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, ""))))
|
||||||
})
|
})
|
||||||
app.Get("/p2p/ui/workers-federation", auth, func(c *fiber.Ctx) error {
|
app.Get("/p2p/ui/workers-federation", auth, func(c *fiber.Ctx) error {
|
||||||
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.FederatedID)))
|
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID))))
|
||||||
})
|
})
|
||||||
|
|
||||||
app.Get("/p2p/ui/workers-stats", auth, func(c *fiber.Ctx) error {
|
app.Get("/p2p/ui/workers-stats", auth, func(c *fiber.Ctx) error {
|
||||||
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes("")))
|
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, ""))))
|
||||||
})
|
})
|
||||||
app.Get("/p2p/ui/workers-federation-stats", auth, func(c *fiber.Ctx) error {
|
app.Get("/p2p/ui/workers-federation-stats", auth, func(c *fiber.Ctx) error {
|
||||||
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.FederatedID)))
|
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID))))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,16 @@
|
||||||
package p2p
|
package p2p
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
|
||||||
const FederatedID = "federated"
|
const FederatedID = "federated"
|
||||||
|
|
||||||
|
func NetworkID(networkID, serviceID string) string {
|
||||||
|
if networkID != "" {
|
||||||
|
return fmt.Sprintf("%s_%s", networkID, serviceID)
|
||||||
|
}
|
||||||
|
return serviceID
|
||||||
|
}
|
||||||
|
|
||||||
type FederatedServer struct {
|
type FederatedServer struct {
|
||||||
listenAddr, service, p2ptoken string
|
listenAddr, service, p2ptoken string
|
||||||
requestTable map[string]int
|
requestTable map[string]int
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue