LocalAI/core/explorer/discovery.go
Ettore Di Giacinto 9e3e892ac7
feat(p2p): add network explorer and community pools (#3125)
* WIP

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Wire up a simple explorer DB

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* wip

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* WIP

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor: group services id so can be identified easily in the ledger table

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(discovery): discovery service now gather worker informations correctly

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(explorer): display network token

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(explorer): display form to add new networks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(explorer): stop from overwriting networks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(explorer): display only networks with active workers

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(explorer): list only clusters in a network if it has online workers

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* remove invalid and inactive networks

if networks have no workers delete them from the database, similarly,
if invalid.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* ci: add workflow to deploy new explorer versions automatically

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* build-api: build with p2p tag

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Allow to specify a connection timeout

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* logging

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Better p2p defaults

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Set loglevel

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Fix dht enable

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Default to info for loglevel

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Add navbar

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Slightly improve rendering

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Allow to copy the token easily

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* ci fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2024-08-09 20:12:01 +02:00

203 lines
4.3 KiB
Go

package explorer
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/edgevpn/pkg/blockchain"
)
type DiscoveryServer struct {
sync.Mutex
database *Database
networkState *NetworkState
connectionTime time.Duration
}
type NetworkState struct {
Networks map[string]Network
}
func (s *DiscoveryServer) NetworkState() *NetworkState {
s.Lock()
defer s.Unlock()
return s.networkState
}
// NewDiscoveryServer creates a new DiscoveryServer with the given Database.
// it keeps the db state in sync with the network state
func NewDiscoveryServer(db *Database, dur time.Duration) *DiscoveryServer {
if dur == 0 {
dur = 50 * time.Second
}
return &DiscoveryServer{
database: db,
connectionTime: dur,
networkState: &NetworkState{
Networks: map[string]Network{},
},
}
}
type Network struct {
Clusters []ClusterData
}
func (s *DiscoveryServer) runBackground() {
if len(s.database.TokenList()) == 0 {
time.Sleep(5 * time.Second) // avoid busy loop
return
}
for _, token := range s.database.TokenList() {
c, cancel := context.WithTimeout(context.Background(), s.connectionTime)
defer cancel()
// Connect to the network
// Get the number of nodes
// save it in the current state (mutex)
// do not do in parallel
n, err := p2p.NewNode(token)
if err != nil {
log.Err(err).Msg("Failed to create node")
s.database.Delete(token)
continue
}
err = n.Start(c)
if err != nil {
log.Err(err).Msg("Failed to start node")
s.database.Delete(token)
continue
}
ledger, err := n.Ledger()
if err != nil {
log.Err(err).Msg("Failed to start ledger")
s.database.Delete(token)
continue
}
networkData := make(chan ClusterData)
// get the network data - it takes the whole timeout
// as we might not be connected to the network yet,
// and few attempts would have to be made before bailing out
go s.retrieveNetworkData(c, ledger, networkData)
hasWorkers := false
ledgerK := []ClusterData{}
for key := range networkData {
ledgerK = append(ledgerK, key)
if len(key.Workers) > 0 {
hasWorkers = true
}
}
log.Debug().Any("network", token).Msgf("Network has %d clusters", len(ledgerK))
if len(ledgerK) != 0 {
for _, k := range ledgerK {
log.Debug().Any("network", token).Msgf("Clusterdata %+v", k)
}
}
if hasWorkers {
s.Lock()
s.networkState.Networks[token] = Network{
Clusters: ledgerK,
}
s.Unlock()
} else {
log.Info().Any("network", token).Msg("No workers found in the network. Removing it from the database")
s.database.Delete(token)
}
}
}
type ClusterData struct {
Workers []string
Type string
NetworkID string
}
func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockchain.Ledger, networkData chan ClusterData) {
clusters := map[string]ClusterData{}
defer func() {
for _, n := range clusters {
networkData <- n
}
close(networkData)
}()
for {
select {
case <-c.Done():
return
default:
time.Sleep(5 * time.Second)
data := ledger.LastBlock().Storage
LEDGER:
for d := range data {
toScanForWorkers := false
cd := ClusterData{}
isWorkerCluster := d == p2p.WorkerID || (strings.Contains(d, "_") && strings.Contains(d, p2p.WorkerID))
isFederatedCluster := d == p2p.FederatedID || (strings.Contains(d, "_") && strings.Contains(d, p2p.FederatedID))
switch {
case isWorkerCluster:
toScanForWorkers = true
cd.Type = "worker"
case isFederatedCluster:
toScanForWorkers = true
cd.Type = "federated"
}
if strings.Contains(d, "_") {
cd.NetworkID = strings.Split(d, "_")[0]
}
if !toScanForWorkers {
continue LEDGER
}
atLeastOneWorker := false
DATA:
for _, v := range data[d] {
nd := &p2p.NodeData{}
if err := v.Unmarshal(nd); err != nil {
continue DATA
}
if nd.IsOnline() {
atLeastOneWorker = true
(&cd).Workers = append(cd.Workers, nd.ID)
}
}
if atLeastOneWorker {
clusters[d] = cd
}
}
}
}
}
// Start the discovery server. This is meant to be run in to a goroutine.
func (s *DiscoveryServer) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled")
default:
// Collect data
s.runBackground()
}
}
}