METANOIA/store/appendconverter.go
2022-02-04 10:30:27 +01:00

318 lines
6.9 KiB
Go

package store
import (
"bytes"
"encoding/binary"
"git.gammaspectra.live/S.O.N.G/Hibiki/strategy/panako"
"io"
"log"
"math"
"math/bits"
"os"
"sync"
"sync/atomic"
)
type Bitfield struct {
size uint64
field []uint64
}
func getBitfieldIndex(n uint64) (uint64, uint64) {
itemIndex := n / uint64(binary.Size(uint64(0)))
itemOffset := n % uint64(binary.Size(uint64(0)))
return itemIndex, itemOffset
}
func NewBitfield(size uint64) *Bitfield {
return &Bitfield{
size: size,
field: make([]uint64, size/uint64(binary.Size(uint64(0)))+1),
}
}
func (b *Bitfield) Get(n uint64) bool {
index, offset := getBitfieldIndex(n)
return (b.field[index] & (1 << offset)) != 0
}
func (b *Bitfield) GetNextValue(fromIndex uint64, value bool) uint64 {
index, offset := getBitfieldIndex(fromIndex)
size := binary.Size(uint64(0))
fieldSize := uint64(len(b.field))
if value {
for index < fieldSize {
n := bits.LeadingZeros64(b.field[index])
if n == size {
offset = 0
index++
continue
}
if n >= int(offset) {
return index*uint64(size) + uint64(n)
} else if n == int(offset) {
//handle case where we triggered before initial index
return index*uint64(size) + offset
}
offset = 0
index++
continue
}
return math.MaxUint64
} else {
for index < fieldSize {
n := bits.LeadingZeros64(^b.field[index])
if n == size {
offset = 0
index++
continue
}
if n >= int(offset) {
return index*uint64(size) + uint64(n)
} else if n == int(offset) {
//handle case where we triggered before initial index
return index*uint64(size) + offset
}
offset = 0
index++
continue
}
return math.MaxUint64
}
}
func (b *Bitfield) Set(n uint64, value bool) {
index, offset := getBitfieldIndex(n)
if value {
b.field[index] |= 1 << offset
} else {
b.field[index] &^= 1 << offset
}
}
type AppendConverter struct {
f *os.File
//writeMutex sync.RWMutex
useCompactHashes bool
usePackedValues bool
recordSize int64
controlSize int64
fileSize int64
resourceIndex []AppendConverterControlData
processedField *Bitfield
}
func NewAppendConverter(pathName string, useCompactHashes, usePackedValues bool) (*AppendConverter, error) {
f, err := os.Open(pathName)
if err != nil {
return nil, err
}
c := &AppendConverter{
f: f,
useCompactHashes: useCompactHashes,
usePackedValues: usePackedValues,
}
c.recordSize = c.GetEntrySize()
c.controlSize = c.GetControlRecordSizeInEntries()
info, err := c.f.Stat()
c.fileSize = info.Size()
//c.processedField = NewBitfield(uint64(c.fileSize / c.GetEntrySize()))
c.ScanControlData()
return c, nil
}
func (c *AppendConverter) getRecordsAt(offset, n int64) {
buf := make([]byte, n*c.recordSize)
_, err := c.f.ReadAt(buf, offset*c.recordSize)
if err != nil {
log.Panic(err)
}
}
func (c *AppendConverter) decodeRecord(resourceId panako.ResourceId, buf *bytes.Reader) panako.StoreRecord {
var hash uint64
if c.useCompactHashes {
var h uint32
binary.Read(buf, binary.LittleEndian, &h)
hash = uint64(h)
} else {
binary.Read(buf, binary.LittleEndian, &hash)
}
if c.usePackedValues {
var packed uint32
binary.Read(buf, binary.LittleEndian, &packed)
return panako.NewStoreRecordFromCompactPacked(resourceId, uint32(hash), packed)
} else {
var time, frequency uint32
binary.Read(buf, binary.LittleEndian, &time)
binary.Read(buf, binary.LittleEndian, &frequency)
return panako.StoreRecord{
ResourceId: resourceId,
Hash: hash,
Time: time,
Frequency: frequency,
}
}
}
type AppendConverterControlData struct {
resourceId panako.ResourceId
length uint32
recordsStartIndex int64
}
func (c *AppendConverter) decodeControl(buf *bytes.Reader) (result AppendConverterControlData) {
binary.Read(buf, binary.LittleEndian, &result.resourceId)
binary.Read(buf, binary.LittleEndian, &result.length)
return
}
func (c *AppendConverter) GetEntrySize() int64 {
if c.useCompactHashes && c.usePackedValues {
return 4 + 4
} else if !c.useCompactHashes && c.usePackedValues {
return 8 + 4
} else if c.useCompactHashes && !c.usePackedValues {
return 4 + 8
} else {
log.Panic()
return 0
}
}
func (c *AppendConverter) GetControlRecordSizeInEntries() int64 {
if c.useCompactHashes && c.usePackedValues {
return 1 + 2
} else if !c.useCompactHashes && c.usePackedValues {
return 1 + 1
} else if c.useCompactHashes && !c.usePackedValues {
return 1 + 1
} else {
log.Panic()
return 0
}
}
type TaskState struct {
wg sync.WaitGroup
total int64
processed int64
}
func (t *TaskState) Wait() {
t.wg.Wait()
}
func (t *TaskState) Progress() (int64, int64) {
return atomic.LoadInt64(&t.processed), t.total
}
func (c *AppendConverter) SortHashes(s *AppendStore, workers int) *TaskState {
nentries := c.fileSize / c.recordSize
nrecords := int64(len(c.resourceIndex))
splitSize := nrecords / int64(workers)
if splitSize < 1 {
splitSize = 1
}
ts := &TaskState{
total: nentries,
}
for i := int64(0); i < nrecords; i += splitSize {
l := i + splitSize
if l > nrecords {
l = nrecords
}
ts.wg.Add(1)
go func(slice []AppendConverterControlData) {
defer ts.wg.Done()
for _, control := range slice {
records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length))
s.StorePanakoPrints(records)
atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize)
}
}(c.resourceIndex[i:l])
}
return ts
}
func (c *AppendConverter) ReadRecords(id panako.ResourceId, recordIndex, n int64) (results []panako.StoreRecord) {
c.resourceIndex = c.resourceIndex[:0]
buf := make([]byte, c.recordSize*n)
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
if err != nil {
log.Panic(err)
}
bufReader := bytes.NewReader(buf)
for i := int64(0); i < n; i++ {
results = append(results, c.decodeRecord(id, bufReader))
}
return
}
func (c *AppendConverter) ScanControlData() {
c.resourceIndex = c.resourceIndex[:0]
var recordIndex int64 = 0
buf := make([]byte, c.recordSize*c.controlSize)
emptyRecord := make([]byte, c.recordSize)
for {
_, err := c.f.ReadAt(buf, recordIndex*c.recordSize)
if err != nil {
if err == io.EOF {
break
}
log.Panic(err)
}
if bytes.Compare(buf[0:c.recordSize], emptyRecord) != 0 {
log.Panicf("Found corrupted control entry at %d #%d\n", recordIndex, recordIndex*c.recordSize)
}
control := c.decodeControl(bytes.NewReader(buf[c.recordSize:]))
control.recordsStartIndex = recordIndex + c.controlSize
/*for i := int64(0); i < c.controlSize; i++ {
c.processedField.Set(uint64(recordIndex+i), true)
}*/
//advance seek using length information
recordIndex += c.controlSize + int64(control.length)
c.resourceIndex = append(c.resourceIndex, control)
if len(c.resourceIndex)%1000 == 0 {
log.Printf("read %d control records\n", len(c.resourceIndex))
}
}
}
func (c *AppendConverter) Close() error {
return c.f.Close()
}