package store import ( "bytes" "encoding/binary" "git.gammaspectra.live/S.O.N.G/Hibiki/panako" "git.gammaspectra.live/S.O.N.G/Hibiki/utilities/specializedstore" "io" "log" "math" "math/bits" "os" "sync" "sync/atomic" ) type Bitfield struct { size uint64 field []uint64 } func getBitfieldIndex(n uint64) (uint64, uint64) { itemIndex := n / uint64(binary.Size(uint64(0))) itemOffset := n % uint64(binary.Size(uint64(0))) return itemIndex, itemOffset } func NewBitfield(size uint64) *Bitfield { return &Bitfield{ size: size, field: make([]uint64, size/uint64(binary.Size(uint64(0)))+1), } } func (b *Bitfield) Get(n uint64) bool { index, offset := getBitfieldIndex(n) return (b.field[index] & (1 << offset)) != 0 } func (b *Bitfield) GetNextValue(fromIndex uint64, value bool) uint64 { index, offset := getBitfieldIndex(fromIndex) size := binary.Size(uint64(0)) fieldSize := uint64(len(b.field)) if value { for index < fieldSize { n := bits.LeadingZeros64(b.field[index]) if n == size { offset = 0 index++ continue } if n >= int(offset) { return index*uint64(size) + uint64(n) } else if n == int(offset) { //handle case where we triggered before initial index return index*uint64(size) + offset } offset = 0 index++ continue } return math.MaxUint64 } else { for index < fieldSize { n := bits.LeadingZeros64(^b.field[index]) if n == size { offset = 0 index++ continue } if n >= int(offset) { return index*uint64(size) + uint64(n) } else if n == int(offset) { //handle case where we triggered before initial index return index*uint64(size) + offset } offset = 0 index++ continue } return math.MaxUint64 } } func (b *Bitfield) Set(n uint64, value bool) { index, offset := getBitfieldIndex(n) if value { b.field[index] |= 1 << offset } else { b.field[index] &^= 1 << offset } } type AppendConverter struct { f *os.File //writeMutex sync.RWMutex useCompactHashes bool usePackedValues bool recordSize int64 controlSize int64 fileSize int64 resourceIndex []AppendConverterControlData processedField *Bitfield } func NewAppendConverter(pathName string, useCompactHashes, usePackedValues bool) (*AppendConverter, error) { f, err := os.Open(pathName) if err != nil { return nil, err } c := &AppendConverter{ f: f, useCompactHashes: useCompactHashes, usePackedValues: usePackedValues, } c.recordSize = c.GetEntrySize() c.controlSize = c.GetControlRecordSizeInEntries() info, err := c.f.Stat() c.fileSize = info.Size() //c.processedField = NewBitfield(uint64(c.fileSize / c.GetEntrySize())) c.ScanControlData() return c, nil } func (c *AppendConverter) getRecordsAt(offset, n int64) { buf := make([]byte, n*c.recordSize) _, err := c.f.ReadAt(buf, offset*c.recordSize) if err != nil { log.Panic(err) } } func (c *AppendConverter) decodeRecord(resourceId panako.ResourceId, buf *bytes.Reader) panako.StoreRecord { var hash uint64 if c.useCompactHashes { var h uint32 binary.Read(buf, binary.LittleEndian, &h) hash = uint64(h) } else { binary.Read(buf, binary.LittleEndian, &hash) } if c.usePackedValues { var packed uint32 binary.Read(buf, binary.LittleEndian, &packed) return panako.NewStoreRecordFromCompactPacked(resourceId, uint32(hash), packed) } else { var time, frequency uint32 binary.Read(buf, binary.LittleEndian, &time) binary.Read(buf, binary.LittleEndian, &frequency) return panako.StoreRecord{ ResourceId: resourceId, Hash: hash, Time: time, Frequency: frequency, } } } type AppendConverterControlData struct { resourceId panako.ResourceId length uint32 recordsStartIndex int64 } func (c *AppendConverter) decodeControl(buf *bytes.Reader) (result AppendConverterControlData) { binary.Read(buf, binary.LittleEndian, &result.resourceId) binary.Read(buf, binary.LittleEndian, &result.length) return } func (c *AppendConverter) GetEntrySize() int64 { if c.useCompactHashes && c.usePackedValues { return 4 + 4 } else if !c.useCompactHashes && c.usePackedValues { return 8 + 4 } else if c.useCompactHashes && !c.usePackedValues { return 4 + 8 } else { log.Panic() return 0 } } func (c *AppendConverter) GetControlRecordSizeInEntries() int64 { if c.useCompactHashes && c.usePackedValues { return 1 + 2 } else if !c.useCompactHashes && c.usePackedValues { return 1 + 1 } else if c.useCompactHashes && !c.usePackedValues { return 1 + 1 } else { log.Panic() return 0 } } type TaskState struct { wg sync.WaitGroup total int64 processed int64 } func (t *TaskState) Wait() { t.wg.Wait() } func (t *TaskState) Progress() (int64, int64) { return atomic.LoadInt64(&t.processed), t.total } func (c *AppendConverter) SortHashes(s *AppendStore, workers int) *TaskState { nentries := c.fileSize / c.recordSize nrecords := int64(len(c.resourceIndex)) splitSize := nrecords / int64(workers) if splitSize < 1 { splitSize = 1 } ts := &TaskState{ total: nentries, } for i := int64(0); i < nrecords; i += splitSize { l := i + splitSize if l > nrecords { l = nrecords } ts.wg.Add(1) go func(slice []AppendConverterControlData) { defer ts.wg.Done() for _, control := range slice { records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length)) s.StorePanakoPrints(records) atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize) } }(c.resourceIndex[i:l]) } return ts } func (c *AppendConverter) ConvertToBadger(s *specializedstore.BadgerStore, iterations int) *TaskState { keySize := 34 if c.useCompactHashes { keySize = 32 } keyTotal := uint64(1 << keySize) splitSize := keyTotal / uint64(iterations) if splitSize < 1 { splitSize = 1 } ts := &TaskState{ total: int64(keyTotal), } ts.wg.Add(1) go func() { defer ts.wg.Done() var recordsWritten int64 var buffer []panako.StoreRecord hints := make(map[panako.ResourceId]int64) for _, control := range c.resourceIndex { hints[control.resourceId] = 0 } for i := uint64(0); i < keyTotal; i += splitSize { resourcesNone := 0 for _, control := range c.resourceIndex { if int64(control.length)-hints[control.resourceId] == 0 { resourcesNone++ continue } //Read with offset records := c.ReadSortedRecordsRange(control.resourceId, control.recordsStartIndex+hints[control.resourceId], int64(control.length)-hints[control.resourceId], i, i+splitSize) hints[control.resourceId] += int64(len(records)) buffer = append(buffer, records...) } log.Printf("storing %d records for iteration %d of step size %d", len(buffer), i/splitSize, splitSize) if len(buffer) > 0 { recordsWritten += int64(len(buffer)) s.StorePanakoPrintsUnique(buffer) } atomic.AddInt64(&ts.processed, int64(splitSize)) buffer = buffer[:0] if resourcesNone == len(c.resourceIndex) { log.Printf("early exit: completed %d resources, total %d records", resourcesNone, recordsWritten) atomic.AddInt64(&ts.processed, ts.total-ts.processed) return } } }() return ts } func (c *AppendConverter) ConvertToBPTree(s *specializedstore.BPTreeFileStore) *TaskState { nentries := c.fileSize / c.recordSize ts := &TaskState{ total: nentries, } ts.wg.Add(1) go func() { defer ts.wg.Done() for _, control := range c.resourceIndex { records := c.ReadRecords(control.resourceId, control.recordsStartIndex, int64(control.length)) atomic.AddInt64(&ts.processed, int64(len(records))+c.controlSize) s.StorePanakoPrints(records) } }() return ts } func (c *AppendConverter) ReadRecords(id panako.ResourceId, recordIndex, n int64) (results []panako.StoreRecord) { buf := make([]byte, c.recordSize*n) _, err := c.f.ReadAt(buf, recordIndex*c.recordSize) if err != nil { log.Panic(err) } bufReader := bytes.NewReader(buf) for i := int64(0); i < n; i++ { results = append(results, c.decodeRecord(id, bufReader)) } return } func (c *AppendConverter) ReadSortedRecordsRange(id panako.ResourceId, recordIndex, n int64, start, stop uint64) (results []panako.StoreRecord) { buf := make([]byte, c.recordSize*n) _, err := c.f.ReadAt(buf, recordIndex*c.recordSize) if err != nil { log.Panic(err) } bufReader := bytes.NewReader(buf) for i := int64(0); i < n; i++ { r := c.decodeRecord(id, bufReader) if r.Hash < start { //shouldn't trigger given input is sorted log.Panic("input not sorted, or duplicate keys exist") } if r.Hash >= stop { break } results = append(results, r) } return } func (c *AppendConverter) ScanControlData() { c.resourceIndex = c.resourceIndex[:0] var recordIndex int64 = 0 buf := make([]byte, c.recordSize*c.controlSize) emptyRecord := make([]byte, c.recordSize) for { _, err := c.f.ReadAt(buf, recordIndex*c.recordSize) if err != nil { if err == io.EOF { break } log.Panic(err) } if bytes.Compare(buf[0:c.recordSize], emptyRecord) != 0 { log.Panicf("Found corrupted control entry at %d #%d\n", recordIndex, recordIndex*c.recordSize) } control := c.decodeControl(bytes.NewReader(buf[c.recordSize:])) control.recordsStartIndex = recordIndex + c.controlSize /*for i := int64(0); i < c.controlSize; i++ { c.processedField.Set(uint64(recordIndex+i), true) }*/ //advance seek using length information recordIndex += c.controlSize + int64(control.length) c.resourceIndex = append(c.resourceIndex, control) if len(c.resourceIndex)%1000 == 0 { log.Printf("read %d control records\n", len(c.resourceIndex)) } } } func (c *AppendConverter) Close() error { return c.f.Close() }