diff --git a/core/p2p/federated.go b/core/p2p/federated.go index 454ddc1b..6475715e 100644 --- a/core/p2p/federated.go +++ b/core/p2p/federated.go @@ -40,9 +40,9 @@ func (fs *FederatedServer) RandomServer() string { var tunnelAddresses []string for _, v := range GetAvailableNodes(fs.service) { if v.IsOnline() { - tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) + tunnelAddresses = append(tunnelAddresses, v.ID) } else { - delete(fs.requestTable, v.TunnelAddress) // make sure it's not tracked + delete(fs.requestTable, v.ID) // make sure it's not tracked log.Info().Msgf("Node %s is offline", v.ID) } } @@ -61,8 +61,8 @@ func (fs *FederatedServer) syncTableStatus() { for _, v := range GetAvailableNodes(fs.service) { if v.IsOnline() { - fs.ensureRecordExist(v.TunnelAddress) - currentTunnels[v.TunnelAddress] = struct{}{} + fs.ensureRecordExist(v.ID) + currentTunnels[v.ID] = struct{}{} } } diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go index c356ae96..87df633f 100644 --- a/core/p2p/federated_server.go +++ b/core/p2p/federated_server.go @@ -8,16 +8,12 @@ import ( "errors" "fmt" "net" - "time" "github.com/mudler/edgevpn/pkg/node" - "github.com/mudler/edgevpn/pkg/protocol" - "github.com/mudler/edgevpn/pkg/types" "github.com/rs/zerolog/log" ) func (f *FederatedServer) Start(ctx context.Context) error { - n, err := NewNode(f.p2ptoken) if err != nil { return fmt.Errorf("creating a new node: %w", err) @@ -29,7 +25,7 @@ func (f *FederatedServer) Start(ctx context.Context) error { if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) { log.Debug().Msgf("Discovered node: %s", tunnel.ID) - }, true); err != nil { + }, false); err != nil { return err } @@ -50,21 +46,8 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { <-ctx.Done() l.Close() }() - ledger, _ := node.Ledger() - // Announce ourselves so nodes accepts our connection - ledger.Announce( - ctx, - 10*time.Second, - func() { - updatedMap := map[string]interface{}{} - updatedMap[node.Host().ID().String()] = &types.User{ - PeerID: node.Host().ID().String(), - Timestamp: time.Now().String(), - } - ledger.Add(protocol.UsersLedgerKey, updatedMap) - }, - ) + nodeAnnounce(ctx, node) defer l.Close() for { @@ -82,52 +65,36 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { // Handle connections in a new goroutine, forwarding to the p2p service go func() { - tunnelAddr := "" - + workerID := "" if fs.workerTarget != "" { - for _, v := range GetAvailableNodes(fs.service) { - if v.ID == fs.workerTarget { - tunnelAddr = v.TunnelAddress - break - } - } + workerID = fs.workerTarget } else if fs.loadBalanced { log.Debug().Msgf("Load balancing request") - tunnelAddr = fs.SelectLeastUsedServer() - if tunnelAddr == "" { + workerID = fs.SelectLeastUsedServer() + if workerID == "" { log.Debug().Msgf("Least used server not found, selecting random") - tunnelAddr = fs.RandomServer() + workerID = fs.RandomServer() } - } else { - tunnelAddr = fs.RandomServer() + workerID = fs.RandomServer() } - if tunnelAddr == "" { + if workerID == "" { log.Error().Msg("No available nodes yet") return } - log.Debug().Msgf("Selected tunnel %s", tunnelAddr) - - tunnelConn, err := net.Dial("tcp", tunnelAddr) - if err != nil { - log.Error().Err(err).Msg("Error connecting to tunnel") + log.Debug().Msgf("Selected node %s", workerID) + nodeData, exists := GetNode(fs.service, workerID) + if !exists { + log.Error().Msgf("Node %s not found", workerID) return } - log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) - closer := make(chan struct{}, 2) - go copyStream(closer, tunnelConn, conn) - go copyStream(closer, conn, tunnelConn) - <-closer - - tunnelConn.Close() - conn.Close() - + proxyP2PConnection(ctx, node, nodeData.ServiceID, conn) if fs.loadBalanced { - fs.RecordRequest(tunnelAddr) + fs.RecordRequest(workerID) } }() } diff --git a/core/p2p/node.go b/core/p2p/node.go index b89bb7c6..6c43dde0 100644 --- a/core/p2p/node.go +++ b/core/p2p/node.go @@ -14,6 +14,7 @@ type NodeData struct { Name string ID string TunnelAddress string + ServiceID string LastSeen time.Time } @@ -39,6 +40,19 @@ func GetAvailableNodes(serviceID string) []NodeData { return availableNodes } +func GetNode(serviceID, nodeID string) (NodeData, bool) { + if serviceID == "" { + serviceID = defaultServicesID + } + mu.Lock() + defer mu.Unlock() + if _, ok := nodes[serviceID]; !ok { + return NodeData{}, false + } + nd, exists := nodes[serviceID][nodeID] + return nd, exists +} + func AddNode(serviceID string, node NodeData) { if serviceID == "" { serviceID = defaultServicesID diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index a5e7715d..1bc46e7d 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -66,22 +66,7 @@ func nodeID(s string) string { return fmt.Sprintf("%s-%s", hostname, s) } -func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error { - - zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) - // Open local port for listening - l, err := net.Listen("tcp", listenAddr) - if err != nil { - zlog.Error().Err(err).Msg("Error listening") - return err - } - go func() { - <-ctx.Done() - l.Close() - }() - - // ll.Info("Binding local port on", srcaddr) - +func nodeAnnounce(ctx context.Context, node *node.Node) { ledger, _ := node.Ledger() // Announce ourselves so nodes accepts our connection @@ -97,6 +82,66 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv ledger.Add(protocol.UsersLedgerKey, updatedMap) }, ) +} + +func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) { + ledger, _ := node.Ledger() + // Retrieve current ID for ip in the blockchain + existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID) + service := &types.Service{} + existingValue.Unmarshal(service) + // If mismatch, update the blockchain + if !found { + zlog.Error().Msg("Service not found on blockchain") + conn.Close() + // ll.Debugf("service '%s' not found on blockchain", serviceID) + return + } + + // Decode the Peer + d, err := peer.Decode(service.PeerID) + if err != nil { + zlog.Error().Msg("cannot decode peer") + + conn.Close() + // ll.Debugf("could not decode peer '%s'", service.PeerID) + return + } + + // Open a stream + stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID()) + if err != nil { + zlog.Error().Err(err).Msg("cannot open stream peer") + + conn.Close() + // ll.Debugf("could not open stream '%s'", err.Error()) + return + } + // ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String()) + zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String()) + closer := make(chan struct{}, 2) + go copyStream(closer, stream, conn) + go copyStream(closer, conn, stream) + <-closer + + stream.Close() + conn.Close() +} + +func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error { + zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) + // Open local port for listening + l, err := net.Listen("tcp", listenAddr) + if err != nil { + zlog.Error().Err(err).Msg("Error listening") + return err + } + go func() { + <-ctx.Done() + l.Close() + }() + + nodeAnnounce(ctx, node) defer l.Close() for { @@ -114,47 +159,7 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv // Handle connections in a new goroutine, forwarding to the p2p service go func() { - // Retrieve current ID for ip in the blockchain - existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, service) - service := &types.Service{} - existingValue.Unmarshal(service) - // If mismatch, update the blockchain - if !found { - zlog.Error().Msg("Service not found on blockchain") - conn.Close() - // ll.Debugf("service '%s' not found on blockchain", serviceID) - return - } - - // Decode the Peer - d, err := peer.Decode(service.PeerID) - if err != nil { - zlog.Error().Msg("cannot decode peer") - - conn.Close() - // ll.Debugf("could not decode peer '%s'", service.PeerID) - return - } - - // Open a stream - stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID()) - if err != nil { - zlog.Error().Msg("cannot open stream peer") - - conn.Close() - // ll.Debugf("could not open stream '%s'", err.Error()) - return - } - // ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String()) - zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String()) - closer := make(chan struct{}, 2) - go copyStream(closer, stream, conn) - go copyStream(closer, conn, stream) - <-closer - - stream.Close() - conn.Close() - // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) + proxyP2PConnection(ctx, node, service, conn) }() } } @@ -258,6 +263,7 @@ var muservice sync.Mutex func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) { muservice.Lock() defer muservice.Unlock() + nd.ServiceID = sserv if ndService, found := service[nd.Name]; !found { if !nd.IsOnline() { // if node is offline and not present, do nothing