Rewrite P2P cmd flags handling; add peerguard/auth edgevpn support; WIP edgevpn ledger management http API

This commit is contained in:
mintyleaf 2025-03-29 06:45:00 +04:00
parent c965197d6f
commit aa7171dd5d
14 changed files with 306 additions and 229 deletions

View file

@ -5,32 +5,35 @@ package p2p
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"time"
"github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/pkg/utils"
"github.com/mudler/edgevpn/pkg/config"
p2pConfig "github.com/mudler/edgevpn/pkg/config"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/services"
"github.com/mudler/edgevpn/pkg/types"
eutils "github.com/mudler/edgevpn/pkg/utils"
"github.com/multiformats/go-multiaddr"
"github.com/phayes/freeport"
zlog "github.com/rs/zerolog/log"
"github.com/mudler/edgevpn/pkg/logger"
)
func generateNewConnectionData(DHTInterval, OTPInterval int) *node.YAMLConnectionConfig {
const DefaultInterval = 10 * time.Second
func GenerateNewConnectionData(DHTInterval, OTPInterval int, privkey string, peerguardMode bool) (*node.YAMLConnectionConfig, error) {
maxMessSize := 20 << 20 // 20MB
keyLength := 43
if DHTInterval == 0 {
@ -40,7 +43,7 @@ func generateNewConnectionData(DHTInterval, OTPInterval int) *node.YAMLConnectio
OTPInterval = 9000
}
return &node.YAMLConnectionConfig{
connectionConfig := node.YAMLConnectionConfig{
MaxMessageSize: maxMessSize,
RoomName: eutils.RandStringRunes(keyLength),
Rendezvous: eutils.RandStringRunes(keyLength),
@ -58,11 +61,27 @@ func generateNewConnectionData(DHTInterval, OTPInterval int) *node.YAMLConnectio
},
},
}
}
func GenerateToken(DHTInterval, OTPInterval int) string {
// Generates a new config and exit
return generateNewConnectionData(DHTInterval, OTPInterval).Base64()
if peerguardMode {
key, err := crypto.UnmarshalPrivateKey([]byte(privkey))
if err != nil {
return &connectionConfig, err
}
pid, err := peer.IDFromPublicKey(key.GetPublic())
if err != nil {
return &connectionConfig, err
}
connectionConfig.TrustedPeerIDS = []string{
pid.String(),
}
connectionConfig.ProtectedStoreKeys = []string{
"trustzone",
"trustzoneAuth",
}
}
return &connectionConfig, nil
}
func IsP2PEnabled() bool {
@ -176,11 +195,11 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
// This is the main of the server (which keeps the env variable updated)
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error {
func ServiceDiscoverer(ctx context.Context, n *node.Node, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error {
if servicesID == "" {
servicesID = defaultServicesID
}
tunnels, err := discoveryTunnels(ctx, n, token, servicesID, allocate)
tunnels, err := discoveryTunnels(ctx, n, servicesID, allocate)
if err != nil {
return err
}
@ -207,7 +226,7 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri
return nil
}
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) {
func discoveryTunnels(ctx context.Context, n *node.Node, servicesID string, allocate bool) (chan NodeData, error) {
tunnels := make(chan NodeData)
ledger, err := n.Ledger()
@ -316,16 +335,16 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string
}
// This is the P2P worker main
func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) {
if servicesID == "" {
servicesID = defaultServicesID
}
func ExposeService(ctx context.Context, p2pCfg p2pConfig.Config, host, port, servicesID string) (*node.Node, error) {
llger := logger.New(log.LevelFatal)
nodeOpts, err := newNodeOpts(token)
nodeOpts, _, err := p2pCfg.ToOpts(llger)
if err != nil {
return nil, err
return nil, fmt.Errorf("parsing config for new node: %w", err)
}
nodeOpts = append(nodeOpts, node.FromBase64(true, p2pCfg.Discovery.DHT, p2pCfg.NetworkToken, nil, nil))
nodeOpts = append(nodeOpts, services.Alive(30*time.Second, 900*time.Second, 15*time.Minute)...)
// generate a random string for the name
name := utils.RandString(10)
@ -347,6 +366,9 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (*
return n, fmt.Errorf("creating a new node: %w", err)
}
if servicesID == "" {
servicesID = defaultServicesID
}
ledger.Announce(
ctx,
20*time.Second,
@ -364,11 +386,15 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (*
return n, err
}
func NewNode(token string) (*node.Node, error) {
nodeOpts, err := newNodeOpts(token)
func NewNode(p2pCfg p2pConfig.Config) (*node.Node, error) {
llger := logger.New(log.LevelFatal)
nodeOpts, _, err := p2pCfg.ToOpts(llger)
if err != nil {
return nil, err
return nil, fmt.Errorf("parsing config for new node: %w", err)
}
nodeOpts = append(nodeOpts, node.FromBase64(true, p2pCfg.Discovery.DHT, p2pCfg.NetworkToken, nil, nil))
nodeOpts = append(nodeOpts, services.Alive(30*time.Second, 900*time.Second, 15*time.Minute)...)
n, err := node.New(nodeOpts...)
if err != nil {
@ -378,89 +404,64 @@ func NewNode(token string) (*node.Node, error) {
return n, nil
}
func newNodeOpts(token string) ([]node.Option, error) {
llger := logger.New(log.LevelFatal)
defaultInterval := 10 * time.Second
func NewP2PConfig(p2pCommonFlags cliP2P.P2PCommonFlags) p2pConfig.Config {
pa := p2pCommonFlags.Peer2PeerAuthProvders
d := map[string]map[string]interface{}{}
json.Unmarshal([]byte(pa), &d)
// TODO: move this up, expose more config options when creating a node
noDHT := os.Getenv("LOCALAI_P2P_DISABLE_DHT") == "true"
noLimits := os.Getenv("LOCALAI_P2P_ENABLE_LIMITS") == "true"
var listenMaddrs []string
var bootstrapPeers []string
laddrs := os.Getenv("LOCALAI_P2P_LISTEN_MADDRS")
if laddrs != "" {
listenMaddrs = strings.Split(laddrs, ",")
}
bootmaddr := os.Getenv("LOCALAI_P2P_BOOTSTRAP_PEERS_MADDRS")
if bootmaddr != "" {
bootstrapPeers = strings.Split(bootmaddr, ",")
}
dhtAnnounceMaddrs := stringsToMultiAddr(strings.Split(os.Getenv("LOCALAI_P2P_DHT_ANNOUNCE_MADDRS"), ","))
libp2ploglevel := os.Getenv("LOCALAI_P2P_LIB_LOGLEVEL")
if libp2ploglevel == "" {
libp2ploglevel = "fatal"
}
c := config.Config{
ListenMaddrs: listenMaddrs,
DHTAnnounceMaddrs: dhtAnnounceMaddrs,
Limit: config.ResourceLimit{
Enable: noLimits,
c := p2pConfig.Config{
ListenMaddrs: p2pCommonFlags.Peer2PeerListenAddrs,
DHTAnnounceMaddrs: utils.StringsToMultiAddr(p2pCommonFlags.Peer2PeerDHTAnnounceAddrs),
Limit: p2pConfig.ResourceLimit{
Enable: p2pCommonFlags.Peer2PeerLimit,
MaxConns: 100,
},
NetworkToken: token,
LowProfile: false,
LogLevel: logLevel,
Libp2pLogLevel: libp2ploglevel,
Ledger: config.Ledger{
SyncInterval: defaultInterval,
AnnounceInterval: defaultInterval,
LowProfile: false,
LogLevel: logLevel,
Ledger: p2pConfig.Ledger{
SyncInterval: DefaultInterval,
AnnounceInterval: DefaultInterval,
},
NAT: config.NAT{
NAT: p2pConfig.NAT{
Service: true,
Map: true,
RateLimit: true,
RateLimitGlobal: 100,
RateLimitPeer: 100,
RateLimitInterval: defaultInterval,
RateLimitInterval: DefaultInterval,
},
Discovery: config.Discovery{
DHT: !noDHT,
Discovery: p2pConfig.Discovery{
DHT: !p2pCommonFlags.Peer2PeerNoDHT,
MDNS: true,
Interval: 10 * time.Second,
BootstrapPeers: bootstrapPeers,
Interval: DefaultInterval,
BootstrapPeers: p2pCommonFlags.Peer2PeerBootAddrs,
},
Connection: config.Connection{
Connection: p2pConfig.Connection{
HolePunch: true,
AutoRelay: true,
MaxConnections: 1000,
},
PeerGuard: p2pConfig.PeerGuard{
Enable: p2pCommonFlags.Peer2PeerUsePeerguard,
// Default from edgevpn
SyncInterval: 120 * time.Second,
AuthProviders: d,
},
}
nodeOpts, _, err := c.ToOpts(llger)
if err != nil {
return nil, fmt.Errorf("parsing options: %w", err)
privkey := p2pCommonFlags.Peer2PeerPrivkey
if privkey != "" {
c.Privkey = []byte(privkey)
}
nodeOpts = append(nodeOpts, services.Alive(30*time.Second, 900*time.Second, 15*time.Minute)...)
return nodeOpts, nil
}
func stringsToMultiAddr(peers []string) []multiaddr.Multiaddr {
res := []multiaddr.Multiaddr{}
for _, p := range peers {
addr, err := multiaddr.NewMultiaddr(p)
if err != nil {
continue
}
res = append(res, addr)
libp2ploglevel := p2pCommonFlags.Peer2PeerLibLoglevel
if libp2ploglevel == "" {
libp2ploglevel = "fatal"
}
return res
c.Libp2pLogLevel = libp2ploglevel
return c
}
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {