METANOIA/store/appendconverter.go

419 lines
9.6 KiB
Go

package store
import (
"bytes"
"encoding/binary"
"git.gammaspectra.live/S.O.N.G/Hibiki/panako"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/specializedstore"
"io"
"log"
"math"
"math/bits"
"os"
"sync"
"sync/atomic"
)
type Bitfield struct {
size uint64
field []uint64
}
func getBitfieldIndex(n uint64) (uint64, uint64) {
itemIndex := n / uint64(binary.Size(uint64(0)))
itemOffset := n % uint64(binary.Size(uint64(0)))
return itemIndex, itemOffset
}
func NewBitfield(size uint64) *Bitfield {
return &Bitfield{
size: size,
field: make([]uint64, size/uint64(binary.Size(uint64(0)))+1),
}
}
func (b *Bitfield) Get(n uint64) bool {
index, offset := getBitfieldIndex(n)
return (b.field[index] & (1 << offset)) != 0
}
func (b *Bitfield) GetNextValue(fromIndex uint64, value bool) uint64 {
index, offset := getBitfieldIndex(fromIndex)
size := binary.Size(uint64(0))
fieldSize := uint64(len(b.field))
if value {
for index < fieldSize {
n := bits.LeadingZeros64(b.field[index])
if n == size {
offset = 0
index++
continue
}
if n >= int(offset) {
return index*uint64(size) + uint64(n)
} else if n == int(offset) {
//handle case where we triggered before initial index
return index*uint64(size) + offset
}
offset = 0
index++
continue
}
return math.MaxUint64
} else {
for index < fieldSize {
n := bits.LeadingZeros64(^b.field[index])
if n == size {
offset = 0
index++
continue
}
if n >= int(offset) {
return index*uint64(size) + uint64(n)
} else if n == int(offset) {
//handle case where we triggered before initial index
return index*uint64(size) + offset
}
offset = 0
index++
continue
}
return math.MaxUint64
}
}
func (b *Bitfield) Set(n uint64, value bool) {
index, offset := getBitfieldIndex(n)
if value {
b.field[index] |= 1 << offset
} else {
b.field[index] &^= 1 << offset
}
}
type AppendConverter struct {
f *os.File
//writeMutex sync.RWMutex
useCompactHashes bool
usePackedValues bool
recordSize int64
controlSize int64
fileSize int64
resourceIndex []AppendConverterControlData
processedField *Bitfield
}
func NewAppendConverter(pathName string, useCompactHashes, usePackedValues bool) (*AppendConverter, error) {
f, err := os.Open(pathName)
if err != nil {
return nil, err
}
c := &AppendConverter{
f: f,
useCompactHashes: useCompactHashes,
usePackedValues: usePackedValues,
}
c.recordSize = c.GetEntrySize()
c.controlSize = c.GetControlRecordSizeInEntries()
info, err := c.f.Stat()
c.fileSize = info.Size()
//c.processedField = NewBitfield(uint64(c.fileSize / c.GetEntrySize()))
c.ScanControlData()
return c, nil
}
func (c *AppendConverter) getRecordsAt(offset, n int64) {
buf := make([]byte, n*c.recordSize)
_, err := c.f.ReadAt(buf, offset*c.recordSize)
if err != nil {
log.Panic(err)
}
}
func (c *AppendConverter) decodeRecord(resourceId panako.ResourceId, buf *bytes.Reader) panako.StoreRecord {
var hash uint64
if c.useCompactHashes {
var h uint32
binary.Read(buf, binary.LittleEndian, &h)
hash = uint64(h)
} else {
binary.Read(buf, binary.LittleEndian, &hash)
}
if c.usePackedValues {
var packed uint32
binary.Read(buf, binary.LittleEndian, &packed)
return panako.NewStoreRecordFromCompactPacked(resourceId, uint32(hash), packed)
} else {
var time, frequency uint32
binary.Read(buf, binary.LittleEndian, &time)
binary.Read(buf, binary.LittleEndian, &frequency)
return panako.StoreRecord{
ResourceId: resourceId,
Hash: hash,
Time: time,
Frequency: frequency,
}
}
}
type AppendConverterControlData struct {
resourceId panako.ResourceId
length uint32
recordsStartIndex int64
}
func (c *AppendConverter) decodeControl(buf *bytes.Reader) (result AppendConverterControlData) {
binary.Read(buf, binary.LittleEndian, &result.resourceId)
binary.Read(buf, binary.LittleEndian, &result.length)
return
}
func (c *AppendConverter) GetEntrySize() int64 {
if c.useCompactHashes && c.usePackedValues {
return 4 + 4
} else if !c.useCompactHashes && c.usePackedValues {
return 8 + 4
} else if c.useCompactHashes && !c.usePackedValues {
return 4 + 8
} else {
log.Panic()
return 0
}
}
func (c *AppendConverter) GetControlRecordSizeInEntries() int64 {
if c.useCompactHashes && c.usePackedValues {
return 1 + 2
} else if !c.useCompactHashes && c.usePackedValues {
return 1 + 1
} else if c.useCompactHashes && !c.usePackedValues {
return 1 + 1
} else {
log.Panic()
return 0
}
}
type TaskState struct {
wg sync.WaitGroup
total int64
processed int64
}
func (t *TaskState) Wait() {
t.wg.Wait()
}
func (t *TaskState) Progress() (int64, int64) {
return atomic.LoadInt64(&t.processed), t.total
}
func (c *AppendConverter) SortHashes(s *AppendStore, workers int) *TaskState {
nentries := c.fileSize / c.recordSize
nrecords := int64(len(c.resourceIndex))
splitSize := nrecords / int64(workers)
if splitSize < 1 {
splitSize = 1
}
ts := &TaskState{
total: nentries,
}
for i := int64(0); i < nrecords; i += splitSize {
l := i + splitSize
if l > nrecords {
l = nrecords
}
ts.wg.Add(1)
go func(slice []AppendConverterControlData) {
defer ts.wg.Done()
for _, control := range slice {
records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length))
s.StorePanakoPrints(records)
atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize)
}
}(c.resourceIndex[i:l])
}
return ts
}
func (c *AppendConverter) ConvertToBadger(s *specializedstore.BadgerStore, iterations int) *TaskState {
keySize := 34
if c.useCompactHashes {
keySize = 32
}
keyTotal := uint64(1 << keySize)
splitSize := keyTotal / uint64(iterations)
if splitSize < 1 {
splitSize = 1
}
ts := &TaskState{
total: int64(keyTotal),
}
ts.wg.Add(1)
go func() {
defer ts.wg.Done()
var recordsWritten int64
var buffer []panako.StoreRecord
hints := make(map[panako.ResourceId]int64)
for _, control := range c.resourceIndex {
hints[control.resourceId] = 0
}
for i := uint64(0); i < keyTotal; i += splitSize {
resourcesNone := 0
for _, control := range c.resourceIndex {
if int64(control.length)-hints[control.resourceId] == 0 {
resourcesNone++
continue
}
//Read with offset
records := c.ReadSortedRecordsRange(control.resourceId, control.recordsStartIndex+hints[control.resourceId], int64(control.length)-hints[control.resourceId], i, i+splitSize)
hints[control.resourceId] += int64(len(records))
buffer = append(buffer, records...)
}
log.Printf("storing %d records for iteration %d of step size %d", len(buffer), i/splitSize, splitSize)
if len(buffer) > 0 {
recordsWritten += int64(len(buffer))
s.StorePanakoPrintsUnique(buffer)
}
atomic.AddInt64(&ts.processed, int64(splitSize))
buffer = buffer[:0]
if resourcesNone == len(c.resourceIndex) {
log.Printf("early exit: completed %d resources, total %d records", resourcesNone, recordsWritten)
atomic.AddInt64(&ts.processed, ts.total-ts.processed)
return
}
}
}()
return ts
}
func (c *AppendConverter) ConvertToBPTree(s *specializedstore.BPTreeFileStore) *TaskState {
nentries := c.fileSize / c.recordSize
ts := &TaskState{
total: nentries,
}
ts.wg.Add(1)
go func() {
defer ts.wg.Done()
for _, control := range c.resourceIndex {
records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length))
atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize)
s.StorePanakoPrints(records)
}
}()
return ts
}
func (c *AppendConverter) ReadRecords(id panako.ResourceId, recordIndex, n int64) (results []panako.StoreRecord) {
buf := make([]byte, c.recordSize*n)
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
if err != nil {
log.Panic(err)
}
bufReader := bytes.NewReader(buf)
for i := int64(0); i < n; i++ {
results = append(results, c.decodeRecord(id, bufReader))
}
return
}
func (c *AppendConverter) ReadSortedRecordsRange(id panako.ResourceId, recordIndex, n int64, start, stop uint64) (results []panako.StoreRecord) {
buf := make([]byte, c.recordSize*n)
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
if err != nil {
log.Panic(err)
}
bufReader := bytes.NewReader(buf)
for i := int64(0); i < n; i++ {
r := c.decodeRecord(id, bufReader)
if r.Hash < start { //shouldn't trigger given input is sorted
log.Panic("input not sorted, or duplicate keys exist")
}
if r.Hash >= stop {
break
}
results = append(results, r)
}
return
}
func (c *AppendConverter) ScanControlData() {
c.resourceIndex = c.resourceIndex[:0]
var recordIndex int64 = 0
buf := make([]byte, c.recordSize*c.controlSize)
emptyRecord := make([]byte, c.recordSize)
for {
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
if err != nil {
if err == io.EOF {
break
}
log.Panic(err)
}
if bytes.Compare(buf[0:c.recordSize], emptyRecord) != 0 {
log.Panicf("Found corrupted control entry at %d #%d\n", recordIndex, recordIndex*c.recordSize)
}
control := c.decodeControl(bytes.NewReader(buf[c.recordSize:]))
control.recordsStartIndex = recordIndex + c.controlSize
/*for i := int64(0); i < c.controlSize; i++ {
c.processedField.Set(uint64(recordIndex+i), true)
}*/
//advance seek using length information
recordIndex += c.controlSize + int64(control.length)
c.resourceIndex = append(c.resourceIndex, control)
if len(c.resourceIndex)%1000 == 0 {
log.Printf("read %d control records\n", len(c.resourceIndex))
}
}
}
func (c *AppendConverter) Close() error {
return c.f.Close()
}