From 291b0ca96c51aab57904bdec9dfc656f3c03cab7 Mon Sep 17 00:00:00 2001 From: WeebDataHoarder <57538841+WeebDataHoarder@users.noreply.github.com> Date: Fri, 4 Feb 2022 11:45:53 +0100 Subject: [PATCH] Add badger converter --- store/appendconverter.go | 34 ++++++++++++++++++++++++++++++++++ store/cli/cli.go | 29 ++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/store/appendconverter.go b/store/appendconverter.go index ad0a3e3..cc416aa 100644 --- a/store/appendconverter.go +++ b/store/appendconverter.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "git.gammaspectra.live/S.O.N.G/Hibiki/strategy/panako" + "git.gammaspectra.live/S.O.N.G/Hibiki/utilities/specializedstore" "io" "log" "math" @@ -258,6 +259,39 @@ func (c *AppendConverter) SortHashes(s *AppendStore, workers int) *TaskState { return ts } +func (c *AppendConverter) ConvertToBadger(s *specializedstore.BadgerStore, batchSize int) *TaskState { + nentries := c.fileSize / c.recordSize + + nrecords := int64(len(c.resourceIndex)) + splitSize := nrecords / int64(batchSize) + if splitSize < 1 { + splitSize = 1 + } + + ts := &TaskState{ + total: nentries, + } + + ts.wg.Add(1) + go func() { + defer ts.wg.Done() + for i := int64(0); i < nrecords; i += splitSize { + l := i + splitSize + if l > nrecords { + l = nrecords + } + + for _, control := range c.resourceIndex[i:l] { + records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length)) + s.StorePanakoPrints(records) + atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize) + } + } + }() + + return ts +} + func (c *AppendConverter) ReadRecords(id panako.ResourceId, recordIndex, n int64) (results []panako.StoreRecord) { c.resourceIndex = c.resourceIndex[:0] diff --git a/store/cli/cli.go b/store/cli/cli.go index bd07e23..1b04db2 100644 --- a/store/cli/cli.go +++ b/store/cli/cli.go @@ -2,6 +2,7 @@ package main import ( "flag" + "git.gammaspectra.live/S.O.N.G/Hibiki/utilities/specializedstore" "git.gammaspectra.live/S.O.N.G/METANOIA/store" "log" "runtime" @@ -10,7 +11,7 @@ import ( func main() { command := flag.String("cmd", "sort", "Commands: sort") - nworkers := flag.Int("workers", runtime.NumCPU(), "NUmber of workers for bulk tasks") + nworkers := flag.Int("workers", runtime.NumCPU(), "Number of splits for bulk tasks") sourcePath := flag.String("src", "", "Source database append log path") destinationPath := flag.String("dst", "", "Destination database append log path") @@ -41,6 +42,32 @@ func main() { time.Sleep(time.Second * 5) } + taskState.Wait() + } else if *command == "badger" { + source, err := store.NewAppendConverter(*sourcePath, true, true) + if err != nil { + log.Panic(err) + } + + destination, err := specializedstore.NewBadgerStore(*destinationPath, true, false, 500000) + if err != nil { + log.Panic(err) + } + + taskState := source.ConvertToBadger(destination, *nworkers) + + for { + p, t := taskState.Progress() + + log.Printf("%.03f%% %d/%d", float64(p*100)/float64(t), p, t) + + if p >= t { + break + } + + time.Sleep(time.Second * 5) + } + taskState.Wait() } }