Sort hash entries on append store, added tool to sort databases
This commit is contained in:
parent
cb621c031e
commit
5571319d41
|
@ -6,6 +6,7 @@ import (
|
|||
"git.gammaspectra.live/S.O.N.G/Hibiki/strategy/panako"
|
||||
"log"
|
||||
"os"
|
||||
"sort"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
|
@ -153,6 +154,11 @@ func (s *AppendStore) StorePanakoPrints(records []panako.StoreRecord) {
|
|||
length: uint32(len(l)),
|
||||
}, buf)
|
||||
|
||||
//sort entries to allow easier lookups on scan
|
||||
sort.Slice(l, func(i, j int) bool {
|
||||
return l[i].Hash < l[j].Hash
|
||||
})
|
||||
|
||||
for _, v := range l {
|
||||
s.encodeRecord(v, buf)
|
||||
}
|
||||
|
|
317
store/appendconverter.go
Normal file
317
store/appendconverter.go
Normal file
|
@ -0,0 +1,317 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"git.gammaspectra.live/S.O.N.G/Hibiki/strategy/panako"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"math/bits"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type Bitfield struct {
|
||||
size uint64
|
||||
field []uint64
|
||||
}
|
||||
|
||||
func getBitfieldIndex(n uint64) (uint64, uint64) {
|
||||
itemIndex := n / uint64(binary.Size(uint64(0)))
|
||||
itemOffset := n % uint64(binary.Size(uint64(0)))
|
||||
|
||||
return itemIndex, itemOffset
|
||||
}
|
||||
|
||||
func NewBitfield(size uint64) *Bitfield {
|
||||
return &Bitfield{
|
||||
size: size,
|
||||
field: make([]uint64, size/uint64(binary.Size(uint64(0)))+1),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bitfield) Get(n uint64) bool {
|
||||
index, offset := getBitfieldIndex(n)
|
||||
|
||||
return (b.field[index] & (1 << offset)) != 0
|
||||
}
|
||||
|
||||
func (b *Bitfield) GetNextValue(fromIndex uint64, value bool) uint64 {
|
||||
index, offset := getBitfieldIndex(fromIndex)
|
||||
|
||||
size := binary.Size(uint64(0))
|
||||
fieldSize := uint64(len(b.field))
|
||||
|
||||
if value {
|
||||
for index < fieldSize {
|
||||
n := bits.LeadingZeros64(b.field[index])
|
||||
|
||||
if n == size {
|
||||
offset = 0
|
||||
index++
|
||||
continue
|
||||
}
|
||||
|
||||
if n >= int(offset) {
|
||||
return index*uint64(size) + uint64(n)
|
||||
} else if n == int(offset) {
|
||||
//handle case where we triggered before initial index
|
||||
return index*uint64(size) + offset
|
||||
}
|
||||
|
||||
offset = 0
|
||||
index++
|
||||
continue
|
||||
}
|
||||
|
||||
return math.MaxUint64
|
||||
} else {
|
||||
for index < fieldSize {
|
||||
n := bits.LeadingZeros64(^b.field[index])
|
||||
|
||||
if n == size {
|
||||
offset = 0
|
||||
index++
|
||||
continue
|
||||
}
|
||||
|
||||
if n >= int(offset) {
|
||||
return index*uint64(size) + uint64(n)
|
||||
} else if n == int(offset) {
|
||||
//handle case where we triggered before initial index
|
||||
return index*uint64(size) + offset
|
||||
}
|
||||
|
||||
offset = 0
|
||||
index++
|
||||
continue
|
||||
}
|
||||
|
||||
return math.MaxUint64
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bitfield) Set(n uint64, value bool) {
|
||||
index, offset := getBitfieldIndex(n)
|
||||
|
||||
if value {
|
||||
b.field[index] |= 1 << offset
|
||||
} else {
|
||||
b.field[index] &^= 1 << offset
|
||||
}
|
||||
}
|
||||
|
||||
type AppendConverter struct {
|
||||
f *os.File
|
||||
//writeMutex sync.RWMutex
|
||||
useCompactHashes bool
|
||||
usePackedValues bool
|
||||
recordSize int64
|
||||
controlSize int64
|
||||
fileSize int64
|
||||
resourceIndex []AppendConverterControlData
|
||||
processedField *Bitfield
|
||||
}
|
||||
|
||||
func NewAppendConverter(pathName string, useCompactHashes, usePackedValues bool) (*AppendConverter, error) {
|
||||
f, err := os.Open(pathName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &AppendConverter{
|
||||
f: f,
|
||||
useCompactHashes: useCompactHashes,
|
||||
usePackedValues: usePackedValues,
|
||||
}
|
||||
c.recordSize = c.GetEntrySize()
|
||||
c.controlSize = c.GetControlRecordSizeInEntries()
|
||||
info, err := c.f.Stat()
|
||||
c.fileSize = info.Size()
|
||||
c.processedField = NewBitfield(uint64(c.fileSize / c.GetEntrySize()))
|
||||
c.ScanControlData()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *AppendConverter) getRecordsAt(offset, n int64) {
|
||||
buf := make([]byte, n*c.recordSize)
|
||||
_, err := c.f.ReadAt(buf, offset*c.recordSize)
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AppendConverter) decodeRecord(resourceId panako.ResourceId, buf *bytes.Reader) panako.StoreRecord {
|
||||
var hash uint64
|
||||
if c.useCompactHashes {
|
||||
var h uint32
|
||||
binary.Read(buf, binary.LittleEndian, &h)
|
||||
hash = uint64(h)
|
||||
} else {
|
||||
binary.Read(buf, binary.LittleEndian, &hash)
|
||||
}
|
||||
if c.usePackedValues {
|
||||
var packed uint32
|
||||
binary.Read(buf, binary.LittleEndian, &packed)
|
||||
|
||||
return panako.NewStoreRecordFromCompactPacked(resourceId, uint32(hash), packed)
|
||||
} else {
|
||||
var time, frequency uint32
|
||||
binary.Read(buf, binary.LittleEndian, &time)
|
||||
binary.Read(buf, binary.LittleEndian, &frequency)
|
||||
|
||||
return panako.StoreRecord{
|
||||
ResourceId: resourceId,
|
||||
Hash: hash,
|
||||
Time: time,
|
||||
Frequency: frequency,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type AppendConverterControlData struct {
|
||||
resourceId panako.ResourceId
|
||||
length uint32
|
||||
recordsStartIndex int64
|
||||
}
|
||||
|
||||
func (c *AppendConverter) decodeControl(buf *bytes.Reader) (result AppendConverterControlData) {
|
||||
|
||||
binary.Read(buf, binary.LittleEndian, &result.resourceId)
|
||||
binary.Read(buf, binary.LittleEndian, &result.length)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *AppendConverter) GetEntrySize() int64 {
|
||||
if c.useCompactHashes && c.usePackedValues {
|
||||
return 4 + 4
|
||||
} else if !c.useCompactHashes && c.usePackedValues {
|
||||
return 8 + 4
|
||||
} else if c.useCompactHashes && !c.usePackedValues {
|
||||
return 4 + 8
|
||||
} else {
|
||||
log.Panic()
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AppendConverter) GetControlRecordSizeInEntries() int64 {
|
||||
if c.useCompactHashes && c.usePackedValues {
|
||||
return 1 + 2
|
||||
} else if !c.useCompactHashes && c.usePackedValues {
|
||||
return 1 + 1
|
||||
} else if c.useCompactHashes && !c.usePackedValues {
|
||||
return 1 + 1
|
||||
} else {
|
||||
log.Panic()
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
type TaskState struct {
|
||||
wg sync.WaitGroup
|
||||
total int64
|
||||
processed int64
|
||||
}
|
||||
|
||||
func (t *TaskState) Wait() {
|
||||
t.wg.Wait()
|
||||
}
|
||||
|
||||
func (t *TaskState) Progress() (int64, int64) {
|
||||
return atomic.LoadInt64(&t.processed), t.total
|
||||
}
|
||||
|
||||
func (c *AppendConverter) SortHashes(s *AppendStore, workers int) *TaskState {
|
||||
nentries := c.fileSize / c.GetEntrySize()
|
||||
|
||||
nrecords := int64(len(c.resourceIndex))
|
||||
splitSize := nrecords / int64(workers)
|
||||
if splitSize < 1 {
|
||||
splitSize = 1
|
||||
}
|
||||
|
||||
ts := &TaskState{
|
||||
total: nentries,
|
||||
}
|
||||
|
||||
for i := int64(0); i < nrecords; i += splitSize {
|
||||
l := i + splitSize
|
||||
if l > nentries {
|
||||
l = nentries
|
||||
}
|
||||
|
||||
ts.wg.Add(1)
|
||||
go func(slice []AppendConverterControlData) {
|
||||
defer ts.wg.Done()
|
||||
for _, control := range slice {
|
||||
records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length))
|
||||
sort.Slice(records, func(i, j int) bool {
|
||||
return records[i].Hash < records[j].Hash
|
||||
})
|
||||
s.StorePanakoPrints(records)
|
||||
atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize)
|
||||
}
|
||||
}(c.resourceIndex[i:l])
|
||||
}
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
func (c *AppendConverter) ReadRecords(id panako.ResourceId, recordIndex, n int64) (results []panako.StoreRecord) {
|
||||
c.resourceIndex = c.resourceIndex[:0]
|
||||
|
||||
buf := make([]byte, c.recordSize*n)
|
||||
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
}
|
||||
|
||||
bufReader := bytes.NewReader(buf)
|
||||
for i := int64(0); i < n; i++ {
|
||||
results = append(results, c.decodeRecord(id, bufReader))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *AppendConverter) ScanControlData() {
|
||||
c.resourceIndex = c.resourceIndex[:0]
|
||||
|
||||
var recordIndex int64 = 0
|
||||
buf := make([]byte, c.recordSize*c.controlSize)
|
||||
emptyRecord := make([]byte, c.recordSize)
|
||||
for {
|
||||
|
||||
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
log.Panic(err)
|
||||
}
|
||||
|
||||
if bytes.Compare(buf[0:c.recordSize], emptyRecord) != 0 {
|
||||
log.Panicf("Found corrupted control entry at %d #%d\n", recordIndex, recordIndex*c.recordSize)
|
||||
}
|
||||
|
||||
control := c.decodeControl(bytes.NewReader(buf[c.recordSize:]))
|
||||
control.recordsStartIndex = recordIndex + c.controlSize
|
||||
|
||||
for i := int64(0); i < c.controlSize; i++ {
|
||||
c.processedField.Set(uint64(recordIndex+i), true)
|
||||
}
|
||||
|
||||
//advance seek using length information
|
||||
recordIndex += c.controlSize + int64(control.length)
|
||||
c.resourceIndex = append(c.resourceIndex, control)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AppendConverter) Close() error {
|
||||
return c.f.Close()
|
||||
}
|
46
store/cli/cli.go
Normal file
46
store/cli/cli.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"git.gammaspectra.live/S.O.N.G/METANOIA/store"
|
||||
"log"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
command := flag.String("cmd", "sort", "Commands: sort")
|
||||
nworkers := flag.Int("workers", runtime.NumCPU(), "NUmber of workers for bulk tasks")
|
||||
sourcePath := flag.String("src", "", "Source database append log path")
|
||||
destinationPath := flag.String("dst", "", "Destination database append log path")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
if *command == "sort" {
|
||||
source, err := store.NewAppendConverter(*sourcePath, true, true)
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
}
|
||||
|
||||
destination, err := store.NewAppendStore(*destinationPath, true, true)
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
}
|
||||
|
||||
taskState := source.SortHashes(destination, *nworkers)
|
||||
|
||||
for {
|
||||
p, t := taskState.Progress()
|
||||
|
||||
log.Printf("%.03f%% %d/%d", float64(p*100)/float64(t), p, t)
|
||||
|
||||
if p >= t {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 5)
|
||||
}
|
||||
|
||||
taskState.Wait()
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue