419 lines
9.6 KiB
Go
419 lines
9.6 KiB
Go
package store
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"git.gammaspectra.live/S.O.N.G/Hibiki/panako"
|
|
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/specializedstore"
|
|
"io"
|
|
"log"
|
|
"math"
|
|
"math/bits"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
type Bitfield struct {
|
|
size uint64
|
|
field []uint64
|
|
}
|
|
|
|
func getBitfieldIndex(n uint64) (uint64, uint64) {
|
|
itemIndex := n / uint64(binary.Size(uint64(0)))
|
|
itemOffset := n % uint64(binary.Size(uint64(0)))
|
|
|
|
return itemIndex, itemOffset
|
|
}
|
|
|
|
func NewBitfield(size uint64) *Bitfield {
|
|
return &Bitfield{
|
|
size: size,
|
|
field: make([]uint64, size/uint64(binary.Size(uint64(0)))+1),
|
|
}
|
|
}
|
|
|
|
func (b *Bitfield) Get(n uint64) bool {
|
|
index, offset := getBitfieldIndex(n)
|
|
|
|
return (b.field[index] & (1 << offset)) != 0
|
|
}
|
|
|
|
func (b *Bitfield) GetNextValue(fromIndex uint64, value bool) uint64 {
|
|
index, offset := getBitfieldIndex(fromIndex)
|
|
|
|
size := binary.Size(uint64(0))
|
|
fieldSize := uint64(len(b.field))
|
|
|
|
if value {
|
|
for index < fieldSize {
|
|
n := bits.LeadingZeros64(b.field[index])
|
|
|
|
if n == size {
|
|
offset = 0
|
|
index++
|
|
continue
|
|
}
|
|
|
|
if n >= int(offset) {
|
|
return index*uint64(size) + uint64(n)
|
|
} else if n == int(offset) {
|
|
//handle case where we triggered before initial index
|
|
return index*uint64(size) + offset
|
|
}
|
|
|
|
offset = 0
|
|
index++
|
|
continue
|
|
}
|
|
|
|
return math.MaxUint64
|
|
} else {
|
|
for index < fieldSize {
|
|
n := bits.LeadingZeros64(^b.field[index])
|
|
|
|
if n == size {
|
|
offset = 0
|
|
index++
|
|
continue
|
|
}
|
|
|
|
if n >= int(offset) {
|
|
return index*uint64(size) + uint64(n)
|
|
} else if n == int(offset) {
|
|
//handle case where we triggered before initial index
|
|
return index*uint64(size) + offset
|
|
}
|
|
|
|
offset = 0
|
|
index++
|
|
continue
|
|
}
|
|
|
|
return math.MaxUint64
|
|
}
|
|
}
|
|
|
|
func (b *Bitfield) Set(n uint64, value bool) {
|
|
index, offset := getBitfieldIndex(n)
|
|
|
|
if value {
|
|
b.field[index] |= 1 << offset
|
|
} else {
|
|
b.field[index] &^= 1 << offset
|
|
}
|
|
}
|
|
|
|
type AppendConverter struct {
|
|
f *os.File
|
|
//writeMutex sync.RWMutex
|
|
useCompactHashes bool
|
|
usePackedValues bool
|
|
recordSize int64
|
|
controlSize int64
|
|
fileSize int64
|
|
resourceIndex []AppendConverterControlData
|
|
processedField *Bitfield
|
|
}
|
|
|
|
func NewAppendConverter(pathName string, useCompactHashes, usePackedValues bool) (*AppendConverter, error) {
|
|
f, err := os.Open(pathName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c := &AppendConverter{
|
|
f: f,
|
|
useCompactHashes: useCompactHashes,
|
|
usePackedValues: usePackedValues,
|
|
}
|
|
c.recordSize = c.GetEntrySize()
|
|
c.controlSize = c.GetControlRecordSizeInEntries()
|
|
info, err := c.f.Stat()
|
|
c.fileSize = info.Size()
|
|
//c.processedField = NewBitfield(uint64(c.fileSize / c.GetEntrySize()))
|
|
c.ScanControlData()
|
|
return c, nil
|
|
}
|
|
|
|
func (c *AppendConverter) getRecordsAt(offset, n int64) {
|
|
buf := make([]byte, n*c.recordSize)
|
|
_, err := c.f.ReadAt(buf, offset*c.recordSize)
|
|
if err != nil {
|
|
log.Panic(err)
|
|
}
|
|
}
|
|
|
|
func (c *AppendConverter) decodeRecord(resourceId panako.ResourceId, buf *bytes.Reader) panako.StoreRecord {
|
|
var hash uint64
|
|
if c.useCompactHashes {
|
|
var h uint32
|
|
binary.Read(buf, binary.LittleEndian, &h)
|
|
hash = uint64(h)
|
|
} else {
|
|
binary.Read(buf, binary.LittleEndian, &hash)
|
|
}
|
|
if c.usePackedValues {
|
|
var packed uint32
|
|
binary.Read(buf, binary.LittleEndian, &packed)
|
|
|
|
return panako.NewStoreRecordFromCompactPacked(resourceId, uint32(hash), packed)
|
|
} else {
|
|
var time, frequency uint32
|
|
binary.Read(buf, binary.LittleEndian, &time)
|
|
binary.Read(buf, binary.LittleEndian, &frequency)
|
|
|
|
return panako.StoreRecord{
|
|
ResourceId: resourceId,
|
|
Hash: hash,
|
|
Time: time,
|
|
Frequency: frequency,
|
|
}
|
|
}
|
|
}
|
|
|
|
type AppendConverterControlData struct {
|
|
resourceId panako.ResourceId
|
|
length uint32
|
|
recordsStartIndex int64
|
|
}
|
|
|
|
func (c *AppendConverter) decodeControl(buf *bytes.Reader) (result AppendConverterControlData) {
|
|
|
|
binary.Read(buf, binary.LittleEndian, &result.resourceId)
|
|
binary.Read(buf, binary.LittleEndian, &result.length)
|
|
|
|
return
|
|
}
|
|
|
|
func (c *AppendConverter) GetEntrySize() int64 {
|
|
if c.useCompactHashes && c.usePackedValues {
|
|
return 4 + 4
|
|
} else if !c.useCompactHashes && c.usePackedValues {
|
|
return 8 + 4
|
|
} else if c.useCompactHashes && !c.usePackedValues {
|
|
return 4 + 8
|
|
} else {
|
|
log.Panic()
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func (c *AppendConverter) GetControlRecordSizeInEntries() int64 {
|
|
if c.useCompactHashes && c.usePackedValues {
|
|
return 1 + 2
|
|
} else if !c.useCompactHashes && c.usePackedValues {
|
|
return 1 + 1
|
|
} else if c.useCompactHashes && !c.usePackedValues {
|
|
return 1 + 1
|
|
} else {
|
|
log.Panic()
|
|
return 0
|
|
}
|
|
}
|
|
|
|
type TaskState struct {
|
|
wg sync.WaitGroup
|
|
total int64
|
|
processed int64
|
|
}
|
|
|
|
func (t *TaskState) Wait() {
|
|
t.wg.Wait()
|
|
}
|
|
|
|
func (t *TaskState) Progress() (int64, int64) {
|
|
return atomic.LoadInt64(&t.processed), t.total
|
|
}
|
|
|
|
func (c *AppendConverter) SortHashes(s *AppendStore, workers int) *TaskState {
|
|
nentries := c.fileSize / c.recordSize
|
|
|
|
nrecords := int64(len(c.resourceIndex))
|
|
splitSize := nrecords / int64(workers)
|
|
if splitSize < 1 {
|
|
splitSize = 1
|
|
}
|
|
|
|
ts := &TaskState{
|
|
total: nentries,
|
|
}
|
|
|
|
for i := int64(0); i < nrecords; i += splitSize {
|
|
l := i + splitSize
|
|
if l > nrecords {
|
|
l = nrecords
|
|
}
|
|
|
|
ts.wg.Add(1)
|
|
go func(slice []AppendConverterControlData) {
|
|
defer ts.wg.Done()
|
|
for _, control := range slice {
|
|
records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length))
|
|
s.StorePanakoPrints(records)
|
|
atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize)
|
|
}
|
|
}(c.resourceIndex[i:l])
|
|
}
|
|
|
|
return ts
|
|
}
|
|
|
|
func (c *AppendConverter) ConvertToBadger(s *specializedstore.BadgerStore, iterations int) *TaskState {
|
|
keySize := 34
|
|
if c.useCompactHashes {
|
|
keySize = 32
|
|
}
|
|
|
|
keyTotal := uint64(1 << keySize)
|
|
|
|
splitSize := keyTotal / uint64(iterations)
|
|
if splitSize < 1 {
|
|
splitSize = 1
|
|
}
|
|
|
|
ts := &TaskState{
|
|
total: int64(keyTotal),
|
|
}
|
|
|
|
ts.wg.Add(1)
|
|
go func() {
|
|
defer ts.wg.Done()
|
|
var recordsWritten int64
|
|
var buffer []panako.StoreRecord
|
|
|
|
hints := make(map[panako.ResourceId]int64)
|
|
for _, control := range c.resourceIndex {
|
|
hints[control.resourceId] = 0
|
|
}
|
|
|
|
for i := uint64(0); i < keyTotal; i += splitSize {
|
|
resourcesNone := 0
|
|
for _, control := range c.resourceIndex {
|
|
if int64(control.length)-hints[control.resourceId] == 0 {
|
|
resourcesNone++
|
|
continue
|
|
}
|
|
//Read with offset
|
|
records := c.ReadSortedRecordsRange(control.resourceId, control.recordsStartIndex+hints[control.resourceId], int64(control.length)-hints[control.resourceId], i, i+splitSize)
|
|
hints[control.resourceId] += int64(len(records))
|
|
buffer = append(buffer, records...)
|
|
}
|
|
log.Printf("storing %d records for iteration %d of step size %d", len(buffer), i/splitSize, splitSize)
|
|
if len(buffer) > 0 {
|
|
recordsWritten += int64(len(buffer))
|
|
s.StorePanakoPrintsUnique(buffer)
|
|
}
|
|
atomic.AddInt64(&ts.processed, int64(splitSize))
|
|
buffer = buffer[:0]
|
|
|
|
if resourcesNone == len(c.resourceIndex) {
|
|
log.Printf("early exit: completed %d resources, total %d records", resourcesNone, recordsWritten)
|
|
|
|
atomic.AddInt64(&ts.processed, ts.total-ts.processed)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ts
|
|
}
|
|
|
|
func (c *AppendConverter) ConvertToBPTree(s *specializedstore.BPTreeFileStore) *TaskState {
|
|
nentries := c.fileSize / c.recordSize
|
|
|
|
ts := &TaskState{
|
|
total: nentries,
|
|
}
|
|
|
|
ts.wg.Add(1)
|
|
go func() {
|
|
defer ts.wg.Done()
|
|
for _, control := range c.resourceIndex {
|
|
records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length))
|
|
atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize)
|
|
s.StorePanakoPrints(records)
|
|
}
|
|
}()
|
|
|
|
return ts
|
|
}
|
|
|
|
func (c *AppendConverter) ReadRecords(id panako.ResourceId, recordIndex, n int64) (results []panako.StoreRecord) {
|
|
buf := make([]byte, c.recordSize*n)
|
|
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
|
|
if err != nil {
|
|
log.Panic(err)
|
|
}
|
|
|
|
bufReader := bytes.NewReader(buf)
|
|
for i := int64(0); i < n; i++ {
|
|
results = append(results, c.decodeRecord(id, bufReader))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (c *AppendConverter) ReadSortedRecordsRange(id panako.ResourceId, recordIndex, n int64, start, stop uint64) (results []panako.StoreRecord) {
|
|
buf := make([]byte, c.recordSize*n)
|
|
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
|
|
if err != nil {
|
|
log.Panic(err)
|
|
}
|
|
|
|
bufReader := bytes.NewReader(buf)
|
|
for i := int64(0); i < n; i++ {
|
|
r := c.decodeRecord(id, bufReader)
|
|
if r.Hash < start { //shouldn't trigger given input is sorted
|
|
log.Panic("input not sorted, or duplicate keys exist")
|
|
}
|
|
if r.Hash >= stop {
|
|
break
|
|
}
|
|
results = append(results, r)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (c *AppendConverter) ScanControlData() {
|
|
c.resourceIndex = c.resourceIndex[:0]
|
|
|
|
var recordIndex int64 = 0
|
|
buf := make([]byte, c.recordSize*c.controlSize)
|
|
emptyRecord := make([]byte, c.recordSize)
|
|
for {
|
|
|
|
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
log.Panic(err)
|
|
}
|
|
|
|
if bytes.Compare(buf[0:c.recordSize], emptyRecord) != 0 {
|
|
log.Panicf("Found corrupted control entry at %d #%d\n", recordIndex, recordIndex*c.recordSize)
|
|
}
|
|
|
|
control := c.decodeControl(bytes.NewReader(buf[c.recordSize:]))
|
|
control.recordsStartIndex = recordIndex + c.controlSize
|
|
|
|
/*for i := int64(0); i < c.controlSize; i++ {
|
|
c.processedField.Set(uint64(recordIndex+i), true)
|
|
}*/
|
|
|
|
//advance seek using length information
|
|
recordIndex += c.controlSize + int64(control.length)
|
|
c.resourceIndex = append(c.resourceIndex, control)
|
|
|
|
if len(c.resourceIndex)%1000 == 0 {
|
|
log.Printf("read %d control records\n", len(c.resourceIndex))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *AppendConverter) Close() error {
|
|
return c.f.Close()
|
|
}
|