diff --git a/.github/workflows/comment-pr.yaml b/.github/workflows/comment-pr.yaml index 90eefd88..3c20b21b 100644 --- a/.github/workflows/comment-pr.yaml +++ b/.github/workflows/comment-pr.yaml @@ -8,9 +8,10 @@ jobs: MODEL_NAME: hermes-2-theta-llama-3-8b runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - name: Checkout code + uses: actions/checkout@v3 with: - fetch-depth: 0 # needed to checkout all branches for this Action to work + ref: "${{ github.event.pull_request.merge_commit_sha }}" - uses: mudler/localai-github-action@v1 with: model: 'hermes-2-theta-llama-3-8b' # Any from models.localai.io, or from huggingface.com with: "huggingface:///file" @@ -21,6 +22,7 @@ jobs: json_diff_file_output: diff.json raw_diff_file_output: diff.txt file_output_only: "true" + base_branch: ${{ github.event.pull_request.base.sha }} - name: Show diff env: DIFF: ${{ steps.git-diff-action.outputs.raw-diff-path }} diff --git a/Makefile b/Makefile index 183ead8e..df13cbfb 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ DETECT_LIBS?=true # llama.cpp versions GOLLAMA_REPO?=https://github.com/go-skynet/go-llama.cpp GOLLAMA_VERSION?=2b57a8ae43e4699d3dc5d1496a1ccd42922993be -CPPLLAMA_VERSION?=b3283448ce9a5098226afe1d8648ccc578511fe4 +CPPLLAMA_VERSION?=705b7ecf60e667ced57c15d67aa86865e3cc7aa7 # gpt4all version GPT4ALL_REPO?=https://github.com/nomic-ai/gpt4all diff --git a/core/cli/federated.go b/core/cli/federated.go index b1de1840..32f0fa87 100644 --- a/core/cli/federated.go +++ b/core/cli/federated.go @@ -2,135 +2,20 @@ package cli import ( "context" - "errors" - "fmt" - "io" - "net" - "time" - - "math/rand/v2" cliContext "github.com/mudler/LocalAI/core/cli/context" "github.com/mudler/LocalAI/core/p2p" - "github.com/mudler/edgevpn/pkg/node" - "github.com/mudler/edgevpn/pkg/protocol" - "github.com/mudler/edgevpn/pkg/types" - "github.com/rs/zerolog/log" ) type FederatedCLI struct { Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` + LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"` } func (f *FederatedCLI) Run(ctx *cliContext.Context) error { - n, err := p2p.NewNode(f.Peer2PeerToken) - if err != nil { - return fmt.Errorf("creating a new node: %w", err) - } - err = n.Start(context.Background()) - if err != nil { - return fmt.Errorf("creating a new node: %w", err) - } + fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken, f.LoadBalanced) - if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, func(servicesID string, tunnel p2p.NodeData) { - log.Debug().Msgf("Discovered node: %s", tunnel.ID) - }); err != nil { - return err - } - - return Proxy(context.Background(), n, f.Address, p2p.FederatedID) -} - -func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error { - - log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) - // Open local port for listening - l, err := net.Listen("tcp", listenAddr) - if err != nil { - log.Error().Err(err).Msg("Error listening") - return err - } - // ll.Info("Binding local port on", srcaddr) - - ledger, _ := node.Ledger() - - // Announce ourselves so nodes accepts our connection - ledger.Announce( - ctx, - 10*time.Second, - func() { - // Retrieve current ID for ip in the blockchain - //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) - // If mismatch, update the blockchain - //if !found { - updatedMap := map[string]interface{}{} - updatedMap[node.Host().ID().String()] = &types.User{ - PeerID: node.Host().ID().String(), - Timestamp: time.Now().String(), - } - ledger.Add(protocol.UsersLedgerKey, updatedMap) - // } - }, - ) - - defer l.Close() - for { - select { - case <-ctx.Done(): - return errors.New("context canceled") - default: - log.Debug().Msg("New for connection") - // Listen for an incoming connection. - conn, err := l.Accept() - if err != nil { - fmt.Println("Error accepting: ", err.Error()) - continue - } - - // Handle connections in a new goroutine, forwarding to the p2p service - go func() { - var tunnelAddresses []string - for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) { - if v.IsOnline() { - tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) - } else { - log.Info().Msgf("Node %s is offline", v.ID) - } - } - - if len(tunnelAddresses) == 0 { - log.Error().Msg("No available nodes yet") - return - } - // open a TCP stream to one of the tunnels - // chosen randomly - // TODO: optimize this and track usage - tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))] - - tunnelConn, err := net.Dial("tcp", tunnelAddr) - if err != nil { - log.Error().Err(err).Msg("Error connecting to tunnel") - return - } - - log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) - closer := make(chan struct{}, 2) - go copyStream(closer, tunnelConn, conn) - go copyStream(closer, conn, tunnelConn) - <-closer - - tunnelConn.Close() - conn.Close() - // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) - }() - } - } - -} - -func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { - defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy - io.Copy(dst, src) + return fs.Start(context.Background()) } diff --git a/core/p2p/federated.go b/core/p2p/federated.go new file mode 100644 index 00000000..b56c9e0c --- /dev/null +++ b/core/p2p/federated.go @@ -0,0 +1,47 @@ +package p2p + +const FederatedID = "federated" + +type FederatedServer struct { + listenAddr, service, p2ptoken string + requestTable map[string]int + loadBalanced bool +} + +func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool) *FederatedServer { + return &FederatedServer{ + listenAddr: listenAddr, + service: service, + p2ptoken: p2pToken, + requestTable: map[string]int{}, + loadBalanced: loadBalanced, + } +} + +func (fs *FederatedServer) SelectLeastUsedServer() string { + // cycle over requestTable and find the entry with the lower number + // if there are multiple entries with the same number, select one randomly + // if there are no entries, return an empty string + var min int + var minKey string + for k, v := range fs.requestTable { + if min == 0 || v < min { + min = v + minKey = k + } + } + return minKey +} + +func (fs *FederatedServer) RecordRequest(nodeID string) { + // increment the counter for the nodeID in the requestTable + fs.requestTable[nodeID]++ +} + +func (fs *FederatedServer) EnsureRecordExist(nodeID string) { + // if the nodeID is not in the requestTable, add it with a counter of 0 + _, ok := fs.requestTable[nodeID] + if !ok { + fs.requestTable[nodeID] = 0 + } +} diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go new file mode 100644 index 00000000..75da97ec --- /dev/null +++ b/core/p2p/federated_server.go @@ -0,0 +1,140 @@ +//go:build p2p +// +build p2p + +package p2p + +import ( + "context" + "errors" + "fmt" + "net" + "time" + + "math/rand/v2" + + "github.com/mudler/edgevpn/pkg/node" + "github.com/mudler/edgevpn/pkg/protocol" + "github.com/mudler/edgevpn/pkg/types" + "github.com/rs/zerolog/log" +) + +func (f *FederatedServer) Start(ctx context.Context) error { + + n, err := NewNode(f.p2ptoken) + if err != nil { + return fmt.Errorf("creating a new node: %w", err) + } + err = n.Start(ctx) + if err != nil { + return fmt.Errorf("creating a new node: %w", err) + } + + if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) { + log.Debug().Msgf("Discovered node: %s", tunnel.ID) + }); err != nil { + return err + } + + return f.proxy(ctx, n) +} + +func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { + + log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr) + // Open local port for listening + l, err := net.Listen("tcp", fs.listenAddr) + if err != nil { + log.Error().Err(err).Msg("Error listening") + return err + } + // ll.Info("Binding local port on", srcaddr) + + ledger, _ := node.Ledger() + + // Announce ourselves so nodes accepts our connection + ledger.Announce( + ctx, + 10*time.Second, + func() { + // Retrieve current ID for ip in the blockchain + //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) + // If mismatch, update the blockchain + //if !found { + updatedMap := map[string]interface{}{} + updatedMap[node.Host().ID().String()] = &types.User{ + PeerID: node.Host().ID().String(), + Timestamp: time.Now().String(), + } + ledger.Add(protocol.UsersLedgerKey, updatedMap) + // } + }, + ) + + defer l.Close() + for { + select { + case <-ctx.Done(): + return errors.New("context canceled") + default: + log.Debug().Msg("New for connection") + // Listen for an incoming connection. + conn, err := l.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + continue + } + + // Handle connections in a new goroutine, forwarding to the p2p service + go func() { + var tunnelAddresses []string + for _, v := range GetAvailableNodes(fs.service) { + if v.IsOnline() { + tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) + } else { + log.Info().Msgf("Node %s is offline", v.ID) + } + } + + if len(tunnelAddresses) == 0 { + log.Error().Msg("No available nodes yet") + return + } + + tunnelAddr := "" + + if fs.loadBalanced { + for _, t := range tunnelAddresses { + fs.EnsureRecordExist(t) + } + + tunnelAddr = fs.SelectLeastUsedServer() + log.Debug().Msgf("Selected tunnel %s", tunnelAddr) + if tunnelAddr == "" { + tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))] + } + + fs.RecordRequest(tunnelAddr) + } else { + tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))] + } + + tunnelConn, err := net.Dial("tcp", tunnelAddr) + if err != nil { + log.Error().Err(err).Msg("Error connecting to tunnel") + return + } + + log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) + closer := make(chan struct{}, 2) + go copyStream(closer, tunnelConn, conn) + go copyStream(closer, conn, tunnelConn) + <-closer + + tunnelConn.Close() + conn.Close() + // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) + }() + } + } + +} diff --git a/core/p2p/node.go b/core/p2p/node.go index 1d5356e6..6394498f 100644 --- a/core/p2p/node.go +++ b/core/p2p/node.go @@ -6,7 +6,6 @@ import ( ) const defaultServicesID = "services_localai" -const FederatedID = "federated" type NodeData struct { Name string diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index 9b71f7de..927f0e24 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -137,11 +137,6 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv } -func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { - defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy - io.Copy(dst, src) -} - // This is the main of the server (which keeps the env variable updated) // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error { @@ -396,3 +391,8 @@ func newNodeOpts(token string) ([]node.Option, error) { return nodeOpts, nil } + +func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { + defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy + io.Copy(dst, src) +} diff --git a/core/p2p/p2p_disabled.go b/core/p2p/p2p_disabled.go index b1d1d04a..ab1d69dc 100644 --- a/core/p2p/p2p_disabled.go +++ b/core/p2p/p2p_disabled.go @@ -14,6 +14,10 @@ func GenerateToken() string { return "not implemented" } +func (f *FederatedServer) Start(ctx context.Context) error { + return fmt.Errorf("not implemented") +} + func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error { return fmt.Errorf("not implemented") } diff --git a/swagger/docs.go b/swagger/docs.go index d5ffbc09..9a5a1784 100644 --- a/swagger/docs.go +++ b/swagger/docs.go @@ -700,18 +700,6 @@ const docTemplate = `{ } } }, - "functions.Argument": { - "type": "object", - "properties": { - "properties": { - "type": "object", - "additionalProperties": true - }, - "type": { - "type": "string" - } - } - }, "functions.Function": { "type": "object", "properties": { @@ -727,48 +715,19 @@ const docTemplate = `{ } } }, - "functions.FunctionName": { - "type": "object", - "properties": { - "const": { - "type": "string" - } - } - }, - "functions.FunctionProperties": { - "type": "object", - "properties": { - "arguments": { - "$ref": "#/definitions/functions.Argument" - }, - "function": { - "$ref": "#/definitions/functions.FunctionName" - } - } - }, - "functions.ItemFunction": { + "functions.Item": { "type": "object", "properties": { "properties": { - "$ref": "#/definitions/functions.FunctionProperties" + "type": "object", + "additionalProperties": true }, "type": { "type": "string" } } }, - "functions.ItemName": { - "type": "object", - "properties": { - "properties": { - "$ref": "#/definitions/functions.NameProperties" - }, - "type": { - "type": "string" - } - } - }, - "functions.JSONFunctionStructureFunction": { + "functions.JSONFunctionStructure": { "type": "object", "properties": { "$defs": { @@ -778,49 +737,17 @@ const docTemplate = `{ "anyOf": { "type": "array", "items": { - "$ref": "#/definitions/functions.ItemFunction" + "$ref": "#/definitions/functions.Item" } }, "oneOf": { "type": "array", "items": { - "$ref": "#/definitions/functions.ItemFunction" + "$ref": "#/definitions/functions.Item" } } } }, - "functions.JSONFunctionStructureName": { - "type": "object", - "properties": { - "$defs": { - "type": "object", - "additionalProperties": true - }, - "anyOf": { - "type": "array", - "items": { - "$ref": "#/definitions/functions.ItemName" - } - }, - "oneOf": { - "type": "array", - "items": { - "$ref": "#/definitions/functions.ItemName" - } - } - } - }, - "functions.NameProperties": { - "type": "object", - "properties": { - "arguments": { - "$ref": "#/definitions/functions.Argument" - }, - "name": { - "$ref": "#/definitions/functions.FunctionName" - } - } - }, "functions.Tool": { "type": "object", "properties": { @@ -1488,10 +1415,7 @@ const docTemplate = `{ "type": "string" }, "grammar_json_functions": { - "$ref": "#/definitions/functions.JSONFunctionStructureFunction" - }, - "grammar_json_name": { - "$ref": "#/definitions/functions.JSONFunctionStructureName" + "$ref": "#/definitions/functions.JSONFunctionStructure" }, "ignore_eos": { "type": "boolean" diff --git a/swagger/swagger.json b/swagger/swagger.json index 71feb8bb..9d53fbbe 100644 --- a/swagger/swagger.json +++ b/swagger/swagger.json @@ -693,18 +693,6 @@ } } }, - "functions.Argument": { - "type": "object", - "properties": { - "properties": { - "type": "object", - "additionalProperties": true - }, - "type": { - "type": "string" - } - } - }, "functions.Function": { "type": "object", "properties": { @@ -720,48 +708,19 @@ } } }, - "functions.FunctionName": { - "type": "object", - "properties": { - "const": { - "type": "string" - } - } - }, - "functions.FunctionProperties": { - "type": "object", - "properties": { - "arguments": { - "$ref": "#/definitions/functions.Argument" - }, - "function": { - "$ref": "#/definitions/functions.FunctionName" - } - } - }, - "functions.ItemFunction": { + "functions.Item": { "type": "object", "properties": { "properties": { - "$ref": "#/definitions/functions.FunctionProperties" + "type": "object", + "additionalProperties": true }, "type": { "type": "string" } } }, - "functions.ItemName": { - "type": "object", - "properties": { - "properties": { - "$ref": "#/definitions/functions.NameProperties" - }, - "type": { - "type": "string" - } - } - }, - "functions.JSONFunctionStructureFunction": { + "functions.JSONFunctionStructure": { "type": "object", "properties": { "$defs": { @@ -771,49 +730,17 @@ "anyOf": { "type": "array", "items": { - "$ref": "#/definitions/functions.ItemFunction" + "$ref": "#/definitions/functions.Item" } }, "oneOf": { "type": "array", "items": { - "$ref": "#/definitions/functions.ItemFunction" + "$ref": "#/definitions/functions.Item" } } } }, - "functions.JSONFunctionStructureName": { - "type": "object", - "properties": { - "$defs": { - "type": "object", - "additionalProperties": true - }, - "anyOf": { - "type": "array", - "items": { - "$ref": "#/definitions/functions.ItemName" - } - }, - "oneOf": { - "type": "array", - "items": { - "$ref": "#/definitions/functions.ItemName" - } - } - } - }, - "functions.NameProperties": { - "type": "object", - "properties": { - "arguments": { - "$ref": "#/definitions/functions.Argument" - }, - "name": { - "$ref": "#/definitions/functions.FunctionName" - } - } - }, "functions.Tool": { "type": "object", "properties": { @@ -1481,10 +1408,7 @@ "type": "string" }, "grammar_json_functions": { - "$ref": "#/definitions/functions.JSONFunctionStructureFunction" - }, - "grammar_json_name": { - "$ref": "#/definitions/functions.JSONFunctionStructureName" + "$ref": "#/definitions/functions.JSONFunctionStructure" }, "ignore_eos": { "type": "boolean" diff --git a/swagger/swagger.yaml b/swagger/swagger.yaml index e40ef119..2d628566 100644 --- a/swagger/swagger.yaml +++ b/swagger/swagger.yaml @@ -7,14 +7,6 @@ definitions: url: type: string type: object - functions.Argument: - properties: - properties: - additionalProperties: true - type: object - type: - type: string - type: object functions.Function: properties: description: @@ -25,67 +17,28 @@ definitions: additionalProperties: true type: object type: object - functions.FunctionName: - properties: - const: - type: string - type: object - functions.FunctionProperties: - properties: - arguments: - $ref: '#/definitions/functions.Argument' - function: - $ref: '#/definitions/functions.FunctionName' - type: object - functions.ItemFunction: + functions.Item: properties: properties: - $ref: '#/definitions/functions.FunctionProperties' + additionalProperties: true + type: object type: type: string type: object - functions.ItemName: - properties: - properties: - $ref: '#/definitions/functions.NameProperties' - type: - type: string - type: object - functions.JSONFunctionStructureFunction: + functions.JSONFunctionStructure: properties: $defs: additionalProperties: true type: object anyOf: items: - $ref: '#/definitions/functions.ItemFunction' + $ref: '#/definitions/functions.Item' type: array oneOf: items: - $ref: '#/definitions/functions.ItemFunction' + $ref: '#/definitions/functions.Item' type: array type: object - functions.JSONFunctionStructureName: - properties: - $defs: - additionalProperties: true - type: object - anyOf: - items: - $ref: '#/definitions/functions.ItemName' - type: array - oneOf: - items: - $ref: '#/definitions/functions.ItemName' - type: array - type: object - functions.NameProperties: - properties: - arguments: - $ref: '#/definitions/functions.Argument' - name: - $ref: '#/definitions/functions.FunctionName' - type: object functions.Tool: properties: function: @@ -538,9 +491,7 @@ definitions: description: A grammar to constrain the LLM output type: string grammar_json_functions: - $ref: '#/definitions/functions.JSONFunctionStructureFunction' - grammar_json_name: - $ref: '#/definitions/functions.JSONFunctionStructureName' + $ref: '#/definitions/functions.JSONFunctionStructure' ignore_eos: type: boolean input: {}