Stores to chromem (WIP)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto 2025-01-21 10:35:01 +01:00
parent 2f09aa1b85
commit a1d5462ad0
16 changed files with 50 additions and 489 deletions

View file

@ -4,101 +4,36 @@ package main
// It is meant to be used by the main executable that is the server for the specific backend type (falcon, gpt3, etc)
import (
"container/heap"
"context"
"fmt"
"math"
"slices"
"runtime"
"github.com/mudler/LocalAI/pkg/grpc/base"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
chromem "github.com/philippgille/chromem-go"
"github.com/rs/zerolog/log"
)
type Store struct {
base.SingleThread
// The sorted keys
keys [][]float32
// The sorted values
values [][]byte
// If for every K it holds that ||k||^2 = 1, then we can use the normalized distance functions
// TODO: Should we normalize incoming keys if they are not instead?
keysAreNormalized bool
// The first key decides the length of the keys
keyLen int
}
// TODO: Only used for sorting using Go's builtin implementation. The interfaces are columnar because
// that's theoretically best for memory layout and cache locality, but this isn't optimized yet.
type Pair struct {
Key []float32
Value []byte
*chromem.DB
*chromem.Collection
}
func NewStore() *Store {
return &Store{
keys: make([][]float32, 0),
values: make([][]byte, 0),
keysAreNormalized: true,
keyLen: -1,
}
}
func compareSlices(k1, k2 []float32) int {
assert(len(k1) == len(k2), fmt.Sprintf("compareSlices: len(k1) = %d, len(k2) = %d", len(k1), len(k2)))
return slices.Compare(k1, k2)
}
func hasKey(unsortedSlice [][]float32, target []float32) bool {
return slices.ContainsFunc(unsortedSlice, func(k []float32) bool {
return compareSlices(k, target) == 0
})
}
func findInSortedSlice(sortedSlice [][]float32, target []float32) (int, bool) {
return slices.BinarySearchFunc(sortedSlice, target, func(k, t []float32) int {
return compareSlices(k, t)
})
}
func isSortedPairs(kvs []Pair) bool {
for i := 1; i < len(kvs); i++ {
if compareSlices(kvs[i-1].Key, kvs[i].Key) > 0 {
return false
}
}
return true
}
func isSortedKeys(keys [][]float32) bool {
for i := 1; i < len(keys); i++ {
if compareSlices(keys[i-1], keys[i]) > 0 {
return false
}
}
return true
}
func sortIntoKeySlicese(keys []*pb.StoresKey) [][]float32 {
ks := make([][]float32, len(keys))
for i, k := range keys {
ks[i] = k.Floats
}
slices.SortFunc(ks, compareSlices)
assert(len(ks) == len(keys), fmt.Sprintf("len(ks) = %d, len(keys) = %d", len(ks), len(keys)))
assert(isSortedKeys(ks), "keys are not sorted")
return ks
return &Store{}
}
func (s *Store) Load(opts *pb.ModelOptions) error {
db := chromem.NewDB()
collection, err := db.CreateCollection("all-documents", nil, nil)
if err != nil {
return err
}
s.DB = db
s.Collection = collection
return nil
}
@ -111,156 +46,25 @@ func (s *Store) StoresSet(opts *pb.StoresSetOptions) error {
if len(opts.Keys) != len(opts.Values) {
return fmt.Errorf("len(keys) = %d, len(values) = %d", len(opts.Keys), len(opts.Values))
}
if s.keyLen == -1 {
s.keyLen = len(opts.Keys[0].Floats)
} else {
if len(opts.Keys[0].Floats) != s.keyLen {
return fmt.Errorf("Try to add key with length %d when existing length is %d", len(opts.Keys[0].Floats), s.keyLen)
}
}
kvs := make([]Pair, len(opts.Keys))
docs := []chromem.Document{}
for i, k := range opts.Keys {
if s.keysAreNormalized && !isNormalized(k.Floats) {
s.keysAreNormalized = false
var sample []float32
if len(s.keys) > 5 {
sample = k.Floats[:5]
} else {
sample = k.Floats
}
log.Debug().Msgf("Key is not normalized: %v", sample)
}
kvs[i] = Pair{
Key: k.Floats,
Value: opts.Values[i].Bytes,
}
docs = append(docs, chromem.Document{
ID: k.String(),
Content: opts.Values[i].String(),
})
}
slices.SortFunc(kvs, func(a, b Pair) int {
return compareSlices(a.Key, b.Key)
})
assert(len(kvs) == len(opts.Keys), fmt.Sprintf("len(kvs) = %d, len(opts.Keys) = %d", len(kvs), len(opts.Keys)))
assert(isSortedPairs(kvs), "keys are not sorted")
l := len(kvs) + len(s.keys)
merge_ks := make([][]float32, 0, l)
merge_vs := make([][]byte, 0, l)
i, j := 0, 0
for {
if i+j >= l {
break
}
if i >= len(kvs) {
merge_ks = append(merge_ks, s.keys[j])
merge_vs = append(merge_vs, s.values[j])
j++
continue
}
if j >= len(s.keys) {
merge_ks = append(merge_ks, kvs[i].Key)
merge_vs = append(merge_vs, kvs[i].Value)
i++
continue
}
c := compareSlices(kvs[i].Key, s.keys[j])
if c < 0 {
merge_ks = append(merge_ks, kvs[i].Key)
merge_vs = append(merge_vs, kvs[i].Value)
i++
} else if c > 0 {
merge_ks = append(merge_ks, s.keys[j])
merge_vs = append(merge_vs, s.values[j])
j++
} else {
merge_ks = append(merge_ks, kvs[i].Key)
merge_vs = append(merge_vs, kvs[i].Value)
i++
j++
}
}
assert(len(merge_ks) == l, fmt.Sprintf("len(merge_ks) = %d, l = %d", len(merge_ks), l))
assert(isSortedKeys(merge_ks), "merge keys are not sorted")
s.keys = merge_ks
s.values = merge_vs
return nil
return s.Collection.AddDocuments(context.Background(), docs, runtime.NumCPU())
}
func (s *Store) StoresDelete(opts *pb.StoresDeleteOptions) error {
if len(opts.Keys) == 0 {
return fmt.Errorf("no keys to delete")
func (s *Store) StoresReset(opts *pb.StoresResetOptions) error {
err := s.DB.DeleteCollection("all-documents")
if err != nil {
return err
}
if len(opts.Keys) == 0 {
return fmt.Errorf("no keys to add")
}
if s.keyLen == -1 {
s.keyLen = len(opts.Keys[0].Floats)
} else {
if len(opts.Keys[0].Floats) != s.keyLen {
return fmt.Errorf("Trying to delete key with length %d when existing length is %d", len(opts.Keys[0].Floats), s.keyLen)
}
}
ks := sortIntoKeySlicese(opts.Keys)
l := len(s.keys) - len(ks)
merge_ks := make([][]float32, 0, l)
merge_vs := make([][]byte, 0, l)
tail_ks := s.keys
tail_vs := s.values
for _, k := range ks {
j, found := findInSortedSlice(tail_ks, k)
if found {
merge_ks = append(merge_ks, tail_ks[:j]...)
merge_vs = append(merge_vs, tail_vs[:j]...)
tail_ks = tail_ks[j+1:]
tail_vs = tail_vs[j+1:]
} else {
assert(!hasKey(s.keys, k), fmt.Sprintf("Key exists, but was not found: t=%d, %v", len(tail_ks), k))
}
log.Debug().Msgf("Delete: found = %v, t = %d, j = %d, len(merge_ks) = %d, len(merge_vs) = %d", found, len(tail_ks), j, len(merge_ks), len(merge_vs))
}
merge_ks = append(merge_ks, tail_ks...)
merge_vs = append(merge_vs, tail_vs...)
assert(len(merge_ks) <= len(s.keys), fmt.Sprintf("len(merge_ks) = %d, len(s.keys) = %d", len(merge_ks), len(s.keys)))
s.keys = merge_ks
s.values = merge_vs
assert(len(s.keys) >= l, fmt.Sprintf("len(s.keys) = %d, l = %d", len(s.keys), l))
assert(isSortedKeys(s.keys), "keys are not sorted")
assert(func() bool {
for _, k := range ks {
if _, found := findInSortedSlice(s.keys, k); found {
return false
}
}
return true
}(), "Keys to delete still present")
if len(s.keys) != l {
log.Debug().Msgf("Delete: Some keys not found: len(s.keys) = %d, l = %d", len(s.keys), l)
}
return nil
s.Collection, err = s.CreateCollection("all-documents", nil, nil)
return err
}
func (s *Store) StoresGet(opts *pb.StoresGetOptions) (pb.StoresGetResult, error) {