batched worker badger mode

This commit is contained in:
DataHoarder 2022-02-04 15:37:31 +01:00
parent c925b3ae68
commit 8635605d22
2 changed files with 19 additions and 22 deletions

View file

@ -259,11 +259,11 @@ func (c *AppendConverter) SortHashes(s *AppendStore, workers int) *TaskState {
return ts
}
func (c *AppendConverter) ConvertToBadger(s *specializedstore.BadgerStore, batchSize int) *TaskState {
func (c *AppendConverter) ConvertToBadger(s *specializedstore.BadgerStore, workers int) *TaskState {
nentries := c.fileSize / c.recordSize
nrecords := int64(len(c.resourceIndex))
splitSize := int64(batchSize)
splitSize := nrecords / int64(workers)
if splitSize < 1 {
splitSize = 1
}
@ -271,26 +271,22 @@ func (c *AppendConverter) ConvertToBadger(s *specializedstore.BadgerStore, batch
ts := &TaskState{
total: nentries,
}
ts.wg.Add(1)
go func() {
defer ts.wg.Done()
for i := int64(0); i < nrecords; i += splitSize {
l := i + splitSize
if l > nrecords {
l = nrecords
}
records := make([]panako.StoreRecord, 0, int64(c.resourceIndex[i].length)*splitSize)
for _, control := range c.resourceIndex[i:l] {
records = append(records, c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length))...)
}
atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize*(l-i))
s.StorePanakoPrints(records)
s.RunGarbageCollection()
for i := int64(0); i < nrecords; i += splitSize {
l := i + splitSize
if l > nrecords {
l = nrecords
}
}()
ts.wg.Add(1)
go func(slice []AppendConverterControlData) {
defer ts.wg.Done()
for _, control := range slice {
records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length))
s.StorePanakoPrints(records)
atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize)
}
}(c.resourceIndex[i:l])
}
return ts
}

View file

@ -52,7 +52,7 @@ func main() {
}
defer source.Close()
destination, err := specializedstore.NewBadgerStore(*destinationPath, true, false, 500000)
destination, err := specializedstore.NewBadgerStore(*destinationPath, true, false, 5000000)
if err != nil {
log.Panic(err)
}
@ -64,6 +64,7 @@ func main() {
p, t := taskState.Progress()
log.Printf("%.03f%% %d/%d", float64(p*100)/float64(t), p, t)
destination.RunGarbageCollection()
if p >= t {
break