From 2ee3c8311ae9a3b3bf5cc070c7c7fa03592ade5e Mon Sep 17 00:00:00 2001 From: mintyleaf Date: Thu, 3 Apr 2025 03:08:57 +0400 Subject: [PATCH] add net.Conn http request checker --- core/http/routes/peerguard.go | 80 -------- core/p2p/federated_server.go | 331 ++++++++++++++++++++++++++++------ go.mod | 2 +- go.sum | 8 +- 4 files changed, 279 insertions(+), 142 deletions(-) delete mode 100644 core/http/routes/peerguard.go diff --git a/core/http/routes/peerguard.go b/core/http/routes/peerguard.go deleted file mode 100644 index dd576656..00000000 --- a/core/http/routes/peerguard.go +++ /dev/null @@ -1,80 +0,0 @@ -package routes - -import ( - "context" - "time" - - "github.com/gofiber/fiber/v2" - "github.com/mudler/edgevpn/pkg/node" -) - -const DefaultInterval = 5 * time.Second -const Timeout = 20 * time.Second - -// TODO connect routes and write a middleware for authorization based on p2p auth providers private keys -func RegisterPeerguardAuthRoutes(app *fiber.App, e *node.Node) { - app.Get("ledger/:bucket/:key", func(c *fiber.Ctx) error { - bucket := c.Params("bucket") - key := c.Params("key") - - ledger, err := e.Ledger() - if err != nil { - return err - } - - return c.JSON(ledger.CurrentData()[bucket][key]) - }) - - app.Get("ledger/:bucket", func(c *fiber.Ctx) error { - bucket := c.Params("bucket") - - ledger, err := e.Ledger() - if err != nil { - return err - } - - return c.JSON(ledger.CurrentData()[bucket]) - }) - - announcing := struct{ State string }{"Announcing"} - - // Store arbitrary data - app.Get("ledger/:bucket/:key/:value", func(c *fiber.Ctx) error { - bucket := c.Params("bucket") - key := c.Params("key") - value := c.Params("value") - - ledger, err := e.Ledger() - if err != nil { - return err - } - - ledger.Persist(context.Background(), DefaultInterval, Timeout, bucket, key, value) - return c.JSON(announcing) - }) - // Delete data from ledger - app.Get("ledger/:bucket", func(c *fiber.Ctx) error { - bucket := c.Params("bucket") - - ledger, err := e.Ledger() - if err != nil { - return err - } - - ledger.AnnounceDeleteBucket(context.Background(), DefaultInterval, Timeout, bucket) - return c.JSON(announcing) - }) - - app.Get("ledger/:bucket/:key", func(c *fiber.Ctx) error { - bucket := c.Params("bucket") - key := c.Params("key") - - ledger, err := e.Ledger() - if err != nil { - return err - } - - ledger.AnnounceDeleteBucketKey(context.Background(), DefaultInterval, Timeout, bucket, key) - return c.JSON(announcing) - }) -} diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go index 081bd134..81fccaa3 100644 --- a/core/p2p/federated_server.go +++ b/core/p2p/federated_server.go @@ -4,17 +4,36 @@ package p2p import ( + "bufio" "context" + "encoding/json" "errors" "fmt" "io" "net" + "net/http" + "slices" + "strings" + "time" + logP2P "github.com/ipfs/go-log/v2" cliP2P "github.com/mudler/LocalAI/core/cli/p2p" + edgevpnConfig "github.com/mudler/edgevpn/pkg/config" + "github.com/mudler/edgevpn/pkg/logger" "github.com/mudler/edgevpn/pkg/node" + "github.com/mudler/edgevpn/pkg/trustzone" "github.com/rs/zerolog/log" ) +const Timeout = 20 * time.Second + +const ( + peekBufferSize = 512 + authHeader = "X-Auth-Token" + headerEnd = "\r\n\r\n" + lineEnd = "\r\n" +) + func (fs *FederatedServer) Start(ctx context.Context, p2pCommonFlags cliP2P.P2PCommonFlags) error { p2pCfg := NewP2PConfig(p2pCommonFlags) p2pCfg.NetworkToken = fs.p2ptoken @@ -36,11 +55,26 @@ func (fs *FederatedServer) Start(ctx context.Context, p2pCommonFlags cliP2P.P2PC return err } - return fs.proxy(ctx, n) + lvl, err := logP2P.LevelFromString(p2pCfg.LogLevel) + if err != nil { + lvl = logP2P.LevelError + } + llger := logger.New(lvl) + + aps := []trustzone.AuthProvider{} + for ap, providerOpts := range p2pCfg.PeerGuard.AuthProviders { + a, err := edgevpnConfig.AuthProvider(llger, ap, providerOpts) + if err != nil { + log.Warn().Msgf("invalid authprovider: %v", err) + continue + } + aps = append(aps, a) + } + + return fs.listener(ctx, n, aps) } -func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { - +func (fs *FederatedServer) listener(ctx context.Context, node *node.Node, aps []trustzone.AuthProvider) error { log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr) // Open local port for listening l, err := net.Listen("tcp", fs.listenAddr) @@ -57,6 +91,7 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { nodeAnnounce(ctx, node) defer l.Close() + for { select { case <-ctx.Done(): @@ -70,62 +105,243 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { continue } - // Handle connections in a new goroutine, forwarding to the p2p service go func() { - workerID := "" - if fs.workerTarget != "" { - workerID = fs.workerTarget - } else if fs.loadBalanced { - log.Debug().Msgf("Load balancing request") - - workerID = fs.SelectLeastUsedServer() - if workerID == "" { - log.Debug().Msgf("Least used server not found, selecting random") - workerID = fs.RandomServer() + if len(aps) > 0 { + if fs.handleHTTP(conn, node, aps) { + return } - } else { - workerID = fs.RandomServer() - } - - if workerID == "" { - log.Error().Msg("No available nodes yet") - fs.sendHTMLResponse(conn, 503, "Sorry, waiting for nodes to connect") - return - } - - log.Debug().Msgf("Selected node %s", workerID) - nodeData, exists := GetNode(fs.service, workerID) - if !exists { - log.Error().Msgf("Node %s not found", workerID) - fs.sendHTMLResponse(conn, 404, "Node not found") - return - } - - proxyP2PConnection(ctx, node, nodeData.ServiceID, conn) - if fs.loadBalanced { - fs.RecordRequest(workerID) } + fs.proxy(ctx, node, conn) }() } } } -// sendHTMLResponse sends a basic HTML response with a status code and a message. -// This is extracted to make the HTML content maintainable. -func (fs *FederatedServer) sendHTMLResponse(conn net.Conn, statusCode int, message string) { +func (fs *FederatedServer) handleHTTP(conn net.Conn, node *node.Node, aps []trustzone.AuthProvider) bool { + defer func() { + if r := recover(); r != nil { + log.Debug().Msgf("Recovered from panic: %v", r) + conn.Close() + } + }() + + r, err := testForHTTPRequest(conn) + if err != nil { + return false + } + defer r.Body.Close() + pathParts := strings.Split(strings.TrimPrefix(r.URL.Path, "/ledger/"), "/") + announcing := struct{ State string }{"Announcing"} + + // TODO deal with AuthProviders + // pubKey := r.Header.Get(authHeader) + + switch r.Method { + case http.MethodGet: + switch len(pathParts) { + case 2: // /ledger/:bucket/:key + bucket := pathParts[0] + key := pathParts[1] + + ledger, err := node.Ledger() + if err != nil { + fs.sendRawResponse(conn, http.StatusInternalServerError, "text/plain", []byte(err.Error())) + return true + } + + fs.sendJSONResponse(conn, http.StatusOK, ledger.CurrentData()[bucket][key]) + + case 1: // /ledger/:bucket + bucket := pathParts[0] + + ledger, err := node.Ledger() + if err != nil { + fs.sendRawResponse(conn, http.StatusInternalServerError, "text/plain", []byte(err.Error())) + return true + } + + fs.sendJSONResponse(conn, http.StatusOK, ledger.CurrentData()[bucket]) + + default: + fs.sendRawResponse(conn, http.StatusNotFound, "text/plain", []byte("not found")) + + } + + case http.MethodPut: + if len(pathParts) == 3 { // /ledger/:bucket/:key/:value + bucket := pathParts[0] + key := pathParts[1] + value := pathParts[2] + + ledger, err := node.Ledger() + if err != nil { + fs.sendRawResponse(conn, http.StatusInternalServerError, "text/plain", []byte(err.Error())) + return true + } + + ledger.Persist(context.Background(), DefaultInterval, Timeout, bucket, key, value) + fs.sendJSONResponse(conn, http.StatusOK, announcing) + + } else { + fs.sendRawResponse(conn, http.StatusNotFound, "text/plain", []byte("not found")) + } + + case http.MethodDelete: + switch len(pathParts) { + case 1: // /ledger/:bucket + bucket := pathParts[0] + + ledger, err := node.Ledger() + if err != nil { + fs.sendRawResponse(conn, http.StatusInternalServerError, "text/plain", []byte(err.Error())) + return true + } + + ledger.AnnounceDeleteBucket(context.Background(), DefaultInterval, Timeout, bucket) + fs.sendJSONResponse(conn, http.StatusOK, announcing) + + case 2: // /ledger/:bucket/:key + bucket := pathParts[0] + key := pathParts[1] + + ledger, err := node.Ledger() + if err != nil { + fs.sendRawResponse(conn, http.StatusInternalServerError, "text/plain", []byte(err.Error())) + return true + } + + ledger.AnnounceDeleteBucketKey(context.Background(), DefaultInterval, Timeout, bucket, key) + fs.sendJSONResponse(conn, http.StatusOK, announcing) + + default: + fs.sendRawResponse(conn, http.StatusNotFound, "text/plain", []byte("not found")) + + } + } + + return true +} + +// testForHTTPRequest peeking the first N bytes from the accepted conn, and trying to match it +// against the supported http methods, then against the supported route, then if there is auth header +func testForHTTPRequest(conn net.Conn) (*http.Request, error) { + reader := bufio.NewReader(conn) + + peekedData, err := reader.Peek(peekBufferSize) + if err != nil && err != bufio.ErrBufferFull { + log.Debug().Msgf("Error peeking data: %v", err) + return nil, err + } + peekedString := string(peekedData) + + // 1. Parse Request Line + firstLineEnd := strings.Index(peekedString, lineEnd) + if firstLineEnd == -1 { + log.Debug().Msg("Could not find request line end") + return nil, err + } + requestLine := peekedString[:firstLineEnd] + parts := strings.Split(requestLine, " ") + if len(parts) != 3 { + log.Debug().Msg("Invalid request line format") + return nil, err + } + method := parts[0] + uri := parts[1] + + if !slices.Contains([]string{ + http.MethodGet, + http.MethodPut, + http.MethodDelete, + }, method) { + log.Debug().Msg("Unsupported HTTP method") + return nil, err + } + if !strings.HasPrefix(uri, "/ledger") { + log.Debug().Msg("Unsupported HTTP route") + return nil, err + } + + headersPart := peekedString[firstLineEnd+len(lineEnd):] + headerEndIndex := strings.Index(headersPart, headerEnd) + if headerEndIndex == -1 { + log.Debug().Msg("Could not find end of headers within peek buffer") + return nil, err + } + headersString := headersPart[:headerEndIndex] + headers := strings.Split(headersString, lineEnd) + + foundAuth := false + for _, header := range headers { + if strings.HasPrefix(header, authHeader+":") { + parts := strings.SplitN(header, ":", 2) + if len(parts) == 2 { + foundAuth = true + break + } + } + } + + if !foundAuth { + log.Debug().Msgf("Required header '%s' not found.", authHeader) + return nil, err + } + + req, err := http.ReadRequest(reader) + if err != nil { + log.Debug().Msgf("Error reading full request: %v", err) + return nil, err + } + return req, nil +} + +func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node, conn net.Conn) { + workerID := "" + if fs.workerTarget != "" { + workerID = fs.workerTarget + } else if fs.loadBalanced { + log.Debug().Msgf("Load balancing request") + + workerID = fs.SelectLeastUsedServer() + if workerID == "" { + log.Debug().Msgf("Least used server not found, selecting random") + workerID = fs.RandomServer() + } + } else { + workerID = fs.RandomServer() + } + + if workerID == "" { + log.Error().Msg("No available nodes yet") + fs.sendHTMLResponse(conn, 503, "Sorry, waiting for nodes to connect") + return + } + + log.Debug().Msgf("Selected node %s", workerID) + nodeData, exists := GetNode(fs.service, workerID) + if !exists { + log.Error().Msgf("Node %s not found", workerID) + fs.sendHTMLResponse(conn, 404, "Node not found") + return + } + + proxyP2PConnection(ctx, node, nodeData.ServiceID, conn) + if fs.loadBalanced { + fs.RecordRequest(workerID) + } +} + +// sendRawResponse sends whatever provided byte data with provided content type header +func (fs *FederatedServer) sendRawResponse(conn net.Conn, statusCode int, contentType string, data []byte) { defer conn.Close() - // Define the HTML content separately for easier maintenance. - htmlContent := fmt.Sprintf("

%s

\r\n", message) - - // Create the HTTP response with dynamic status code and content. response := fmt.Sprintf( "HTTP/1.1 %d %s\r\n"+ - "Content-Type: text/html\r\n"+ + "Content-Type: %s\r\n"+ "Connection: close\r\n"+ "\r\n"+ "%s", - statusCode, getHTTPStatusText(statusCode), htmlContent, + statusCode, http.StatusText(statusCode), contentType, data, ) // Write the response to the client connection. @@ -135,16 +351,21 @@ func (fs *FederatedServer) sendHTMLResponse(conn net.Conn, statusCode int, messa } } -// getHTTPStatusText returns a textual representation of HTTP status codes. -func getHTTPStatusText(statusCode int) string { - switch statusCode { - case 503: - return "Service Unavailable" - case 404: - return "Not Found" - case 200: - return "OK" - default: - return "Unknown Status" +// sendJSONResponse marshals provided data to JSON and sends it +func (fs *FederatedServer) sendJSONResponse(conn net.Conn, statusCode int, v any) { + data, err := json.Marshal(v) + if err != nil { + log.Error().Err(err).Msg("Error JSON marshaling") } + + fs.sendRawResponse(conn, statusCode, "application/json", data) +} + +// sendHTMLResponse sends a basic HTML response with a status code and a message. +// This is extracted to make the HTML content maintainable. +func (fs *FederatedServer) sendHTMLResponse(conn net.Conn, statusCode int, message string) { + // Define the HTML content separately for easier maintenance. + htmlContent := fmt.Sprintf("

%s

\r\n", message) + + fs.sendRawResponse(conn, statusCode, "text/html", []byte(htmlContent)) } diff --git a/go.mod b/go.mod index 45385a49..c1d88669 100644 --- a/go.mod +++ b/go.mod @@ -303,4 +303,4 @@ require ( lukechampine.com/blake3 v1.3.0 // indirect ) -replace github.com/mudler/edgevpn => github.com/swarmind/edgevpn v0.0.0-20250329011455-c0a96483b1ff +replace github.com/mudler/edgevpn => github.com/swarmind/edgevpn v0.0.0-20250331231759-326a9e7360b0 diff --git a/go.sum b/go.sum index 6e8bbb5e..51154aa1 100644 --- a/go.sum +++ b/go.sum @@ -147,8 +147,6 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/ggerganov/whisper.cpp/bindings/go v0.0.0-20240626202019-c118733a29ad h1:dQ93Vd6i25o+zH9vvnZ8mu7jtJQ6jT3D+zE3V8Q49n0= -github.com/ggerganov/whisper.cpp/bindings/go v0.0.0-20240626202019-c118733a29ad/go.mod h1:QIjZ9OktHFG7p+/m3sMvrAJKKdWrr1fZIK0rM6HZlyo= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= @@ -459,8 +457,6 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= -github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc h1:RxwneJl1VgvikiX28EkpdAyL4yQVnJMrbquKospjHyA= -github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc/go.mod h1:O7SwdSWMilAWhBZMK9N9Y/oBDyMMzshE3ju8Xkexwig= github.com/mudler/go-processmanager v0.0.0-20240820160718-8b802d3ecf82 h1:FVT07EI8njvsD4tC2Hw8Xhactp5AWhsQWD4oTeQuSAU= github.com/mudler/go-processmanager v0.0.0-20240820160718-8b802d3ecf82/go.mod h1:Urp7LG5jylKoDq0663qeBh0pINGcRl35nXdKx82PSoU= github.com/mudler/water v0.0.0-20221010214108-8c7313014ce0 h1:Qh6ghkMgTu6siFbTf7L3IszJmshMhXxNL4V+t7IIA6w= @@ -703,8 +699,8 @@ github.com/swaggo/files/v2 v2.0.0 h1:hmAt8Dkynw7Ssz46F6pn8ok6YmGZqHSVLZ+HQM7i0kw github.com/swaggo/files/v2 v2.0.0/go.mod h1:24kk2Y9NYEJ5lHuCra6iVwkMjIekMCaFq/0JQj66kyM= github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg= github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk= -github.com/swarmind/edgevpn v0.0.0-20250329011455-c0a96483b1ff h1:tAZL+yhXDkwSkkjmPWMk+iBJ/t1MP43ntJiQA4Q5+kw= -github.com/swarmind/edgevpn v0.0.0-20250329011455-c0a96483b1ff/go.mod h1:bGUdGQzwLOuMs3SII1N6SazoI1qQ1ekxdxNatOCS5ZM= +github.com/swarmind/edgevpn v0.0.0-20250331231759-326a9e7360b0 h1:1IzdOtFh9IubHt/7kkSO56chnwHF3Yp7DsWSpXTaMgE= +github.com/swarmind/edgevpn v0.0.0-20250331231759-326a9e7360b0/go.mod h1:bGUdGQzwLOuMs3SII1N6SazoI1qQ1ekxdxNatOCS5ZM= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/thxcode/gguf-parser-go v0.1.0 h1:J4QruXyEQGjrAKeKZFlsD2na9l4XF5+bjR194d+wJS4= github.com/thxcode/gguf-parser-go v0.1.0/go.mod h1:Tn1PsO/YDEtLIxm1+QDCjIIH9L/9Sr7+KpxZKm0sEuE=