feat(p2p): Federation and AI swarms (#2723)

* Wip p2p enhancements

* get online state

* Pass-by token to show in the dashboard

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Style

* Minor fixups

* parametrize SearchID

* Refactoring

* Allow to expose/bind more services

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Add federation

* Display federated mode in the WebUI

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Small fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* make federated nodes visible from the WebUI

* Fix version display

* improve web page

* live page update

* visual enhancements

* enhancements

* visual enhancements

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto 2024-07-08 22:04:06 +02:00 committed by GitHub
parent dd95ae130f
commit cca881ec49
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 815 additions and 82 deletions

View file

@ -10,19 +10,18 @@ import (
"io"
"net"
"os"
"strings"
"sync"
"time"
"github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mudler/LocalAI/pkg/utils"
"github.com/mudler/edgevpn/pkg/config"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/services"
"github.com/mudler/edgevpn/pkg/types"
"github.com/phayes/freeport"
"github.com/ipfs/go-log"
"github.com/mudler/edgevpn/pkg/config"
"github.com/mudler/edgevpn/pkg/services"
zlog "github.com/rs/zerolog/log"
"github.com/mudler/edgevpn/pkg/logger"
@ -34,6 +33,15 @@ func GenerateToken() string {
return newData.Base64()
}
func IsP2PEnabled() bool {
return true
}
func nodeID(s string) string {
hostname, _ := os.Hostname()
return fmt.Sprintf("%s-%s", hostname, s)
}
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
@ -53,16 +61,16 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)
@ -80,7 +88,6 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
continue
}
// ll.Info("New connection from", l.Addr().String())
// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
// Retrieve current ID for ip in the blockchain
@ -137,24 +144,30 @@ func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
// This is the main of the server (which keeps the env variable updated)
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
tunnels, err := discoveryTunnels(ctx, token)
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func()) error {
if servicesID == "" {
servicesID = defaultServicesID
}
tunnels, err := discoveryTunnels(ctx, n, token, servicesID)
if err != nil {
return err
}
// TODO: discoveryTunnels should return all the nodes that are available?
// In this way we updated availableNodes here instead of appending
// e.g. we have a LastSeen field in NodeData that is updated in discoveryTunnels
// each time the node is seen
// In this case the below function should be idempotent and just keep track of the nodes
go func() {
totalTunnels := []string{}
for {
select {
case <-ctx.Done():
zlog.Error().Msg("Discoverer stopped")
return
case tunnel := <-tunnels:
totalTunnels = append(totalTunnels, tunnel)
os.Setenv("LLAMACPP_GRPC_SERVERS", strings.Join(totalTunnels, ","))
zlog.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", strings.Join(totalTunnels, ","))
AddNode(servicesID, tunnel)
if discoveryFunc != nil {
discoveryFunc()
}
}
}
}()
@ -162,19 +175,10 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
return nil
}
func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
tunnels := make(chan string)
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string) (chan NodeData, error) {
tunnels := make(chan NodeData)
nodeOpts, err := newNodeOpts(token)
if err != nil {
return nil, err
}
n, err := node.New(nodeOpts...)
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
}
err = n.Start(ctx)
err := n.Start(ctx)
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
}
@ -184,8 +188,14 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
}
// get new services, allocate and return to the channel
// TODO:
// a function ensureServices that:
// - starts a service if not started, if the worker is Online
// - checks that workers are Online, if not cancel the context of allocateLocalService
// - discoveryTunnels should return all the nodes and addresses associated with it
// - the caller should take now care of the fact that we are always returning fresh informations
go func() {
emitted := map[string]bool{}
for {
select {
case <-ctx.Done():
@ -195,20 +205,20 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
time.Sleep(5 * time.Second)
zlog.Debug().Msg("Searching for workers")
data := ledger.LastBlock().Storage["services_localai"]
for k := range data {
data := ledger.LastBlock().Storage[servicesID]
for k, v := range data {
zlog.Info().Msgf("Found worker %s", k)
if _, found := emitted[k]; !found {
emitted[k] = true
//discoveredPeers <- k
port, err := freeport.GetFreePort()
if err != nil {
fmt.Print(err)
}
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
go allocateLocalService(ctx, n, tunnelAddress, k)
tunnels <- tunnelAddress
nd := &NodeData{}
if err := v.Unmarshal(nd); err != nil {
zlog.Error().Msg("cannot unmarshal node data")
continue
}
ensureService(ctx, n, nd, k)
muservice.Lock()
if _, ok := service[nd.Name]; ok {
tunnels <- service[nd.Name].NodeData
}
muservice.Unlock()
}
}
}
@ -217,8 +227,60 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
return tunnels, err
}
type nodeServiceData struct {
NodeData NodeData
CancelFunc context.CancelFunc
}
var service = map[string]nodeServiceData{}
var muservice sync.Mutex
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string) {
muservice.Lock()
defer muservice.Unlock()
if ndService, found := service[nd.Name]; !found {
if !nd.IsOnline() {
// if node is offline and not present, do nothing
return
}
newCtxm, cancel := context.WithCancel(ctx)
// Start the service
port, err := freeport.GetFreePort()
if err != nil {
fmt.Print(err)
}
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
nd.TunnelAddress = tunnelAddress
service[nd.Name] = nodeServiceData{
NodeData: *nd,
CancelFunc: cancel,
}
go allocateLocalService(newCtxm, n, tunnelAddress, sserv)
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress)
} else {
// Check if the service is still alive
// if not cancel the context
if !nd.IsOnline() && !ndService.NodeData.IsOnline() {
ndService.CancelFunc()
delete(service, nd.Name)
zlog.Info().Msgf("Node %s is offline, deleting", nd.ID)
} else if nd.IsOnline() {
// update last seen inside service
nd.TunnelAddress = ndService.NodeData.TunnelAddress
service[nd.Name] = nodeServiceData{
NodeData: *nd,
CancelFunc: ndService.CancelFunc,
}
zlog.Debug().Msgf("Node %s is still online", nd.ID)
}
}
}
// This is the P2P worker main
func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
func ExposeService(ctx context.Context, host, port, token, servicesID string) error {
if servicesID == "" {
servicesID = defaultServicesID
}
llger := logger.New(log.LevelFatal)
nodeOpts, err := newNodeOpts(token)
@ -248,22 +310,40 @@ func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
ledger.Announce(
ctx,
10*time.Second,
20*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
_, found := ledger.GetKey("services_localai", name)
//_, found := ledger.GetKey("services_localai", name)
// If mismatch, update the blockchain
if !found {
updatedMap := map[string]interface{}{}
updatedMap[name] = "p2p"
ledger.Add("services_localai", updatedMap)
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[name] = &NodeData{
Name: name,
LastSeen: time.Now(),
ID: nodeID(name),
}
ledger.Add(servicesID, updatedMap)
// }
},
)
return err
}
func NewNode(token string) (*node.Node, error) {
nodeOpts, err := newNodeOpts(token)
if err != nil {
return nil, err
}
n, err := node.New(nodeOpts...)
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
}
return n, nil
}
func newNodeOpts(token string) ([]node.Option, error) {
llger := logger.New(log.LevelFatal)
defaultInterval := 10 * time.Second