METANOIA/METANOIA.go

394 lines
8.6 KiB
Go

package main
import (
"database/sql"
"errors"
"flag"
"fmt"
"git.gammaspectra.live/S.O.N.G/Hibiki/strategy/panako"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/audio"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/audio/format/flac"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/audio/format/mp3"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/audio/format/opus"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/specializedstore"
"git.gammaspectra.live/S.O.N.G/METANOIA/database"
"github.com/dhowden/tag"
"github.com/minio/sha256-simd"
"io"
"io/ioutil"
"log"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
)
var printDb *specializedstore.BadgerStore
var printStrategy *panako.Strategy
var db *database.Database
var resourceCreationMutex sync.Mutex
var fdlimit = make(chan bool, 64)
func OpenFileWithLimit(filePath string) (*os.File, error) {
fdlimit <- true
f, err := os.Open(filePath)
if err != nil {
<-fdlimit
}
return f, err
}
func CloseFileWithLimit(f *os.File, force bool) error {
err := f.Close()
if err == nil || force {
<-fdlimit
}
return err
}
func GetMimeTypeFromExtension(ext string) string {
if len(ext) > 0 {
switch strings.ToLower(ext[1:]) {
//Audio types
case "flac":
return "audio/flac"
case "mp3":
return "audio/mpeg;codecs=mp3"
case "m4a":
return "audio/mp4"
case "mka":
return "audio/x-matroska"
case "ogg":
return "audio/ogg"
case "opus":
return "audio/opus"
case "tta":
return "audio/tta"
case "aac":
return "audio/aac"
case "alac":
return "audio/alac"
case "wav":
return "audio/wav"
case "ape":
return "audio/ape"
//Image types
case "png":
return "image/png"
case "jfif":
fallthrough
case "jpeg":
fallthrough
case "jpg":
return "image/jpeg"
case "gif":
return "image/gif"
case "svg":
return "image/svg+xml"
case "tiff":
fallthrough
case "tif":
return "image/tiff"
case "webp":
return "image/webp"
case "bmp":
return "image/bmp"
//Text types
case "txt":
return "text/plain"
case "log":
return "text/x-log"
case "accurip":
return "text/x-accurip"
case "cue":
return "text/x-cue"
case "toc":
return "text/x-toc"
//Text subtitles
case "lrc":
return "text/x-subtitle-lrc"
case "ssa":
return "text/x-subtitle-ssa"
case "ass":
return "text/x-subtitle-ass"
case "srt":
return "text/x-subtitle-subrip"
}
}
return "application/octet-stream"
}
type PendingTransaction struct {
Tx *sql.Tx
WaitGroup sync.WaitGroup
}
func AddFolderToDatabase(release *database.Release, tx *PendingTransaction, pathEntry string) {
defer tx.WaitGroup.Done()
entries, err := ioutil.ReadDir(pathEntry)
if err != nil {
return
}
for _, entry := range entries {
if entry.IsDir() {
tx.WaitGroup.Add(1)
AddFolderToDatabase(release, tx, pathEntry+"/"+entry.Name())
} else {
ext := path.Ext(entry.Name())
mime := GetMimeTypeFromExtension(ext)
if mime == "application/octet-stream" { //Generic, do not store
continue
}
isAudio := mime[0:6] == "audio/"
isText := mime[0:5] == "text/"
isImage := mime[0:6] == "image/"
if isAudio {
tx.WaitGroup.Add(1)
AddAudioToDatabase(release, tx, entry.Size(), mime, pathEntry+"/"+entry.Name())
} else if isText || isImage {
tx.WaitGroup.Add(1)
AddFileToDatabase(release, tx, entry.Size(), mime, pathEntry+"/"+entry.Name())
}
}
}
}
func AddFileToDatabase(release *database.Release, tx *PendingTransaction, size int64, mime, pathEntry string) {
defer tx.WaitGroup.Done()
}
func AddAudioToDatabase(release *database.Release, tx *PendingTransaction, size int64, mime, pathEntry string) {
log.Printf("handling %s", pathEntry)
defer tx.WaitGroup.Done()
if database.GetResourceFromDatabaseByPath(db, pathEntry) != nil {
return
}
f, err := OpenFileWithLimit(pathEntry)
if err != nil { //TODO
log.Panic(err)
return
}
tx.WaitGroup.Add(1)
go func() {
defer tx.WaitGroup.Done()
defer CloseFileWithLimit(f, true)
hasher := sha256.New()
io.Copy(hasher, f)
var hash database.ResourceHashIdentifier
copy(hash[:], hasher.Sum(nil))
resourceCreationMutex.Lock()
var resource = database.GetResourceFromDatabaseByHash(db, hash)
var exists = resource != nil && len(resource.GetReleases()) > 0
if resource == nil {
//insert
resource = database.InsertResourceToDatabase(db, hash, size, []byte(pathEntry), mime)
}
resourceCreationMutex.Unlock()
if resource == nil {
log.Panic(errors.New("failed to create resource")) //TODO: not panic
}
log.Printf("hash %x path %s", hash, pathEntry)
f.Seek(0, 0)
meta, err := tag.ReadFrom(f)
if err == nil { //no metadata
log.Print(meta)
} else {
log.Print(err)
err = nil
}
f.Seek(0, 0)
defer resource.AddRelease(release)
if !exists {
//Fingerprint
f.Seek(0, 0)
var stream *audio.Stream
switch mime { //TODO: change this to scan style
case "audio/flac":
//TODO: add CRC32 scan
flacFormat := flac.NewFormat()
stream, err = flacFormat.Open(f)
case "audio/mpeg;codecs=mp3":
mp3Format := mp3.NewFormat()
stream, err = mp3Format.Open(f)
case "audio/ogg":
fallthrough
case "audio/opus":
opusFormat := opus.NewFormat()
stream, err = opusFormat.Open(f)
}
if err != nil { //cannot decode
log.Printf("error for %s: %s", pathEntry, err)
return //TODO
}
if stream == nil { //no known decoder
log.Printf("error for %s: no known decoder", pathEntry)
return
}
printStrategy.StoreStream(panako.ResourceId(resource.GetId()), stream)
log.Printf("finished fingerprinting %s", pathEntry)
}
}()
}
func main() {
pgConnStr := flag.String("connstr", "", "Postgres connection string for postgres database")
printDbOption := flag.String("printdb", "", "Fingerprint database path. Fast storage recommended.")
cmdOption := flag.String("cmd", "add", "Command: add, query")
pathOption := flag.String("path", "", "Path for commands.")
flag.Parse()
var err error
if *printDbOption == "" || *pgConnStr == "" {
flag.PrintDefaults()
os.Exit(0)
}
db, err = database.OpenDatabase(*pgConnStr)
if err != nil {
log.Fatal(err)
}
defer db.Close()
printDb, err = specializedstore.NewBadgerStore(*printDbOption, true, *cmdOption == "query")
if err != nil {
log.Fatal(err)
}
dbGcTicker := time.NewTicker(15 * time.Minute)
go func() {
for range dbGcTicker.C {
for printDb.RunGarbageCollection() == nil {
}
}
}()
defer printDb.Close()
defer func() {
for printDb.RunGarbageCollection() == nil {
}
}()
defer dbGcTicker.Stop()
//TODO: check if mutex is correct!
printStrategy = panako.NewStrategy(printDb, true)
if *cmdOption == "add" {
var release *database.Release
releases := database.GetReleasesFromDatabaseByIdentifier(db, "test")
if len(releases) == 0 {
release = database.InsertReleaseToDatabase(db, []string{"test"})
} else {
release = releases[0]
}
tx, err := db.GetTx()
if err != nil {
log.Panic(err)
}
pending := &PendingTransaction{
Tx: tx,
} //TODO
filePath, err := filepath.Abs(*pathOption)
if err != nil {
log.Panic(err)
}
pending.WaitGroup.Add(1)
AddFolderToDatabase(release, pending, filePath)
pending.WaitGroup.Wait()
} else if *cmdOption == "query" {
pathEntry, err := filepath.Abs(*pathOption)
if err != nil {
log.Panic(err)
}
f, err := OpenFileWithLimit(pathEntry)
if err != nil { //TODO
log.Panic(err)
return
}
var stream *audio.Stream
switch GetMimeTypeFromExtension(path.Ext(pathEntry)) { //TODO: change this to scan style
case "audio/flac":
//TODO: add CRC32 scan
flacFormat := flac.NewFormat()
stream, err = flacFormat.Open(f)
case "audio/mpeg;codecs=mp3":
mp3Format := mp3.NewFormat()
stream, err = mp3Format.Open(f)
case "audio/ogg":
fallthrough
case "audio/opus":
opusFormat := opus.NewFormat()
stream, err = opusFormat.Open(f)
}
if err != nil { //cannot decode
log.Panic(err) //TODO: not panic
}
if stream == nil { //no known decoder
log.Panic(fmt.Errorf("no known decoder for %s", pathEntry))
}
start := time.Now()
prints := printStrategy.StreamToFingerprints(stream)
printT := time.Now()
results := printStrategy.QueryFingerprints(prints)
resultT := time.Now()
log.Printf("decode+fingerprint = %dms, query = %dms", printT.Sub(start).Milliseconds(), resultT.Sub(printT).Milliseconds())
for _, result := range results {
log.Printf("result %#v\n", result)
resource := database.GetResourceFromDatabase(db, int64(result.ReferenceResourceId))
if resource != nil {
log.Printf("matched %s score %d\n", resource.GetPath(), result.Score)
}
}
log.Printf("finished querying %s", pathEntry)
}
}