This commit is contained in:
mintyleaf 2025-04-27 17:12:26 +03:00 committed by GitHub
commit 1b27ef53df
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 517 additions and 288 deletions

View file

@ -8,12 +8,13 @@ import (
"strings"
"github.com/mudler/LocalAI/core/p2p"
p2pConfig "github.com/mudler/edgevpn/pkg/config"
"github.com/mudler/edgevpn/pkg/node"
"github.com/rs/zerolog/log"
)
func StartP2PStack(ctx context.Context, address, token, networkID string, federated bool) error {
func StartP2PStack(ctx context.Context, p2pCfg p2pConfig.Config, address, networkID string, federated bool) error {
var n *node.Node
// Here we are avoiding creating multiple nodes:
// - if the federated mode is enabled, we create a federated node and expose a service
@ -29,12 +30,12 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa
// Here a new node is created and started
// and a service is exposed by the node
node, err := p2p.ExposeService(ctx, "localhost", port, token, p2p.NetworkID(networkID, p2p.FederatedID))
node, err := p2p.ExposeService(ctx, p2pCfg, "localhost", port, p2p.NetworkID(networkID, p2p.FederatedID))
if err != nil {
return err
}
if err := p2p.ServiceDiscoverer(ctx, node, token, p2p.NetworkID(networkID, p2p.FederatedID), nil, false); err != nil {
if err := p2p.ServiceDiscoverer(ctx, node, p2p.NetworkID(networkID, p2p.FederatedID), nil, false); err != nil {
return err
}
@ -42,10 +43,10 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa
}
// If the p2p mode is enabled, we start the service discovery
if token != "" {
if p2pCfg.NetworkToken != "" {
// If a node wasn't created previously, create it
if n == nil {
node, err := p2p.NewNode(token)
node, err := p2p.NewNode(p2pCfg)
if err != nil {
return err
}
@ -58,7 +59,7 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa
// Attach a ServiceDiscoverer to the p2p node
log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
if err := p2p.ServiceDiscoverer(ctx, n, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, p2p.WorkerID)) {
if v.IsOnline() {

View file

@ -5,11 +5,14 @@ import (
"time"
cliContext "github.com/mudler/LocalAI/core/cli/context"
cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/core/explorer"
"github.com/mudler/LocalAI/core/http"
)
type ExplorerCMD struct {
cliP2P.P2PCommonFlags `embed:""`
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
@ -33,14 +36,14 @@ func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
if e.WithSync {
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
go ds.Start(context.Background(), true)
go ds.Start(context.Background(), e.P2PCommonFlags, true)
}
if e.OnlySync {
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
ctx := context.Background()
return ds.Start(ctx, false)
return ds.Start(ctx, e.P2PCommonFlags, false)
}
appHTTP := http.Explorer(db)

View file

@ -2,22 +2,43 @@ package cli
import (
"context"
"fmt"
cliContext "github.com/mudler/LocalAI/core/cli/context"
cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/core/p2p"
"github.com/rs/zerolog/log"
)
type FederatedCLI struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
cliP2P.P2PCommonFlags `embed:""`
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
}
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
if f.Peer2PeerToken == "" {
log.Info().Msg("No token provided, generating one")
connectionData, err := p2p.GenerateNewConnectionData(
f.Peer2PeerDHTInterval, f.Peer2PeerOTPInterval,
f.Peer2PeerPrivkey, f.Peer2PeerUsePeerguard,
)
if err != nil {
log.Warn().Msgf("Error generating token: %s", err.Error())
}
f.Peer2PeerToken = connectionData.Base64()
log.Info().Msg("Generated Token:")
fmt.Println(f.Peer2PeerToken)
log.Info().Msg("To use the token, you can run the following command in another node or terminal:")
fmt.Printf("export TOKEN=\"%s\"\nlocal-ai worker p2p-llama-cpp-rpc\n", f.Peer2PeerToken)
}
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker)
return fs.Start(context.Background())
return fs.Start(context.Background(), f.P2PCommonFlags)
}

17
core/cli/p2p/p2p.go Normal file
View file

@ -0,0 +1,17 @@
package cli
type P2PCommonFlags struct {
Peer2PeerNoDHT bool `env:"LOCALAI_P2P_DISABLE_DHT,P2P_DISABLE_DHT" name:"p2p-disable-dht" help:"Disable DHT" group:"p2p"`
Peer2PeerLimit bool `env:"LOCALAI_P2P_ENABLE_LIMITS,P2P_ENABLE_LIMITS" name:"p2p-enable-limits" help:"Enable Limits" group:"p2p"`
Peer2PeerListenAddrs []string `env:"LOCALAI_P2P_LISTEN_MADDRS,P2P_LISTEN_MADDRS" name:"p2p-listen-maddrs" help:"A list of listen multiaddresses" group:"p2p"`
Peer2PeerBootAddrs []string `env:"LOCALAI_P2P_BOOTSTRAP_PEERS_MADDRS,P2P_BOOTSTRAP_PEERS_MADDRS" name:"p2p-bootstrap-peers-maddrs" help:"A list of bootstrap peers multiaddresses" group:"p2p"`
Peer2PeerDHTAnnounceAddrs []string `env:"LOCALAI_P2P_DHT_ANNOUNCE_MADDRS,P2P_DHT_ANNOUNCE_MADDRS" name:"p2p-dht-announce-maddrs" help:"A list of DHT announce maddrs" group:"p2p"`
Peer2PeerLibLoglevel string `env:"LOCALAI_P2P_LIB_LOGLEVEL,P2P_LIB_LOGLEVEL" name:"p2p-lib-loglevel" help:"libp2p specific loglevel" group:"p2p"`
Peer2PeerDHTInterval int `env:"LOCALAI_P2P_DHT_INTERVAL,P2P_DHT_INTERVAL" default:"360" name:"p2p-dht-interval" help:"Interval for DHT refresh (used during token generation)" group:"p2p"`
Peer2PeerOTPInterval int `env:"LOCALAI_P2P_OTP_INTERVAL,P2P_OTP_INTERVAL" default:"9000" name:"p2p-otp-interval" help:"Interval for OTP refresh (used during token generation)" group:"p2p"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
Peer2PeerPrivkey string `env:"LOCALAI_P2P_PRIVKEY,P2P_PRIVKEY" name:"p2pprivkey" help:"A base64 encoded protobuf serialized private key used for fixed ID (edgevpn can be used for generating one)" group:"p2p"`
Peer2PeerUsePeerguard bool `env:"LOCALAI_P2P_PEERGUARD,P2P_PEERGUARD" name:"p2ppeerguard" help:"Enable peerguarding through ecdsa authorization of nodes" group:"p2p"`
Peer2PeerAuthProvders string `env:"LOCALAI_P2P_PEERGATE_AUTH,P2P_PEERGATE_AUTH" name:"p2pauth" help:"JSON dict string with '{authProviderName: {providerOpt: value}}' structure, see edgevpn project" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
}

View file

@ -9,6 +9,7 @@ import (
"github.com/mudler/LocalAI/core/application"
cli_api "github.com/mudler/LocalAI/core/cli/api"
cliContext "github.com/mudler/LocalAI/core/cli/context"
cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http"
"github.com/mudler/LocalAI/core/p2p"
@ -54,10 +55,7 @@ type RunCMD struct {
DisableMetricsEndpoint bool `env:"LOCALAI_DISABLE_METRICS_ENDPOINT,DISABLE_METRICS_ENDPOINT" default:"false" help:"Disable the /metrics endpoint" group:"api"`
HttpGetExemptedEndpoints []string `env:"LOCALAI_HTTP_GET_EXEMPTED_ENDPOINTS" default:"^/$,^/browse/?$,^/talk/?$,^/p2p/?$,^/chat/?$,^/text2image/?$,^/tts/?$,^/static/.*$,^/swagger.*$" help:"If LOCALAI_DISABLE_API_KEY_REQUIREMENT_FOR_HTTP_GET is overriden to true, this is the list of endpoints to exempt. Only adjust this in case of a security incident or as a result of a personal security posture review" group:"hardening"`
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
Peer2PeerDHTInterval int `env:"LOCALAI_P2P_DHT_INTERVAL,P2P_DHT_INTERVAL" default:"360" name:"p2p-dht-interval" help:"Interval for DHT refresh (used during token generation)" group:"p2p"`
Peer2PeerOTPInterval int `env:"LOCALAI_P2P_OTP_INTERVAL,P2P_OTP_INTERVAL" default:"9000" name:"p2p-otp-interval" help:"Interval for OTP refresh (used during token generation)" group:"p2p"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
cliP2P.P2PCommonFlags `embed:""`
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
@ -112,14 +110,24 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
}
token := ""
p2pCfg := p2p.NewP2PConfig(r.P2PCommonFlags)
if r.Peer2Peer || r.Peer2PeerToken != "" {
log.Info().Msg("P2P mode enabled")
token = r.Peer2PeerToken
if token == "" {
// IF no token is provided, and p2p is enabled,
// we generate one and wait for the user to pick up the token (this is for interactive)
log.Info().Msg("No token provided, generating one")
token = p2p.GenerateToken(r.Peer2PeerDHTInterval, r.Peer2PeerOTPInterval)
connectionData, err := p2p.GenerateNewConnectionData(
r.Peer2PeerDHTInterval, r.Peer2PeerOTPInterval,
r.Peer2PeerPrivkey, r.Peer2PeerUsePeerguard,
)
if err != nil {
log.Warn().Msgf("Error generating token: %s", err.Error())
}
token = connectionData.Base64()
log.Info().Msg("Generated Token:")
fmt.Println(token)
@ -127,11 +135,13 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
fmt.Printf("export TOKEN=\"%s\"\nlocal-ai worker p2p-llama-cpp-rpc\n", token)
}
opts = append(opts, config.WithP2PToken(token))
p2pCfg.NetworkToken = token
}
backgroundCtx := context.Background()
if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, r.Federated); err != nil {
if err := cli_api.StartP2PStack(backgroundCtx, p2pCfg, r.Address, r.Peer2PeerNetworkID, r.Federated); err != nil {
return err
}

View file

@ -12,6 +12,7 @@ import (
"time"
cliContext "github.com/mudler/LocalAI/core/cli/context"
cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/LocalAI/pkg/assets"
"github.com/mudler/LocalAI/pkg/library"
@ -20,12 +21,13 @@ import (
)
type P2P struct {
WorkerFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
WorkerFlags `embed:""`
cliP2P.P2PCommonFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
}
func (r *P2P) Run(ctx *cliContext.Context) error {
@ -41,6 +43,8 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
if r.Token == "" {
return fmt.Errorf("Token is required")
}
p2pCfg := p2p.NewP2PConfig(r.P2PCommonFlags)
p2pCfg.NetworkToken = r.Token
port, err := freeport.GetFreePort()
if err != nil {
@ -60,7 +64,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
p = r.RunnerPort
}
_, err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
_, err = p2p.ExposeService(context.Background(), p2pCfg, address, p, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
@ -103,7 +107,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
}
}()
_, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
_, err = p2p.ExposeService(context.Background(), p2pCfg, address, fmt.Sprint(port), p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}