METANOIA/METANOIA.go

631 lines
16 KiB
Go

package main
import (
"database/sql"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"flag"
"fmt"
"git.gammaspectra.live/S.O.N.G/Hibiki/panako"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/audio"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/audio/format/flac"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/audio/format/mp3"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/audio/format/opus"
"git.gammaspectra.live/S.O.N.G/Hibiki/utilities/specializedstore"
"git.gammaspectra.live/S.O.N.G/METANOIA/database"
"git.gammaspectra.live/S.O.N.G/METANOIA/store"
"git.gammaspectra.live/S.O.N.G/MakyuuIchaival/httputils"
"git.gammaspectra.live/S.O.N.G/MakyuuIchaival/tlsutils"
"github.com/dhowden/tag"
"github.com/minio/sha256-simd"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"time"
)
var panakoInstance *panako.Instance
var printStrategy *panako.Strategy
var db *database.Database
var resourceCreationMutex sync.Mutex
var fdlimit chan bool
func OpenFileWithLimit(filePath string) (*os.File, error) {
fdlimit <- true
f, err := os.Open(filePath)
if err != nil {
<-fdlimit
}
return f, err
}
func CloseFileWithLimit(f *os.File, force bool) error {
err := f.Close()
if err == nil || force {
<-fdlimit
}
return err
}
func GetMimeTypeFromExtension(ext string) string {
if len(ext) > 0 {
switch strings.ToLower(ext[1:]) {
//Audio types
case "flac":
return "audio/flac"
case "mp3":
return "audio/mpeg;codecs=mp3"
case "m4a":
return "audio/mp4"
case "mka":
return "audio/x-matroska"
case "ogg":
return "audio/ogg"
case "opus":
return "audio/opus"
case "tta":
return "audio/tta"
case "aac":
return "audio/aac"
case "alac":
return "audio/alac"
case "wav":
return "audio/wav"
case "ape":
return "audio/ape"
//Image types
case "png":
return "image/png"
case "jfif":
fallthrough
case "jpeg":
fallthrough
case "jpg":
return "image/jpeg"
case "gif":
return "image/gif"
case "svg":
return "image/svg+xml"
case "tiff":
fallthrough
case "tif":
return "image/tiff"
case "webp":
return "image/webp"
case "bmp":
return "image/bmp"
//Text types
case "txt":
return "text/plain"
case "log":
return "text/x-log"
case "accurip":
return "text/x-accurip"
case "cue":
return "text/x-cue"
case "toc":
return "text/x-toc"
//Text subtitles
case "lrc":
return "text/x-subtitle-lrc"
case "ssa":
return "text/x-subtitle-ssa"
case "ass":
return "text/x-subtitle-ass"
case "srt":
return "text/x-subtitle-subrip"
//Web types
case "js":
return "text/javascript"
case "wasm":
return "application/wasm"
case "html":
return "text/html"
case "css":
return "text/css"
case "ttf":
return "font/ttf"
case "otf":
return "font/otf"
case "woff":
return "font/woff"
case "woff2":
return "font/woff2"
}
}
return "application/octet-stream"
}
type PendingTransaction struct {
Tx *sql.Tx
WaitGroup sync.WaitGroup
}
func AddFolderToDatabase(release *database.Release, tx *PendingTransaction, pathEntry string) {
defer tx.WaitGroup.Done()
entries, err := ioutil.ReadDir(pathEntry)
if err != nil {
return
}
for _, entry := range entries {
if entry.IsDir() {
tx.WaitGroup.Add(1)
AddFolderToDatabase(release, tx, pathEntry+"/"+entry.Name())
} else {
ext := path.Ext(entry.Name())
mime := GetMimeTypeFromExtension(ext)
if mime == "application/octet-stream" { //Generic, do not store
continue
}
isAudio := mime[0:6] == "audio/"
isText := mime[0:5] == "text/" && mime != "text/html" && mime != "text/css" && mime != "text/javascript"
isImage := mime[0:6] == "image/"
if isAudio {
tx.WaitGroup.Add(1)
AddAudioToDatabase(release, tx, entry.Size(), mime, pathEntry+"/"+entry.Name())
} else if isText || isImage {
tx.WaitGroup.Add(1)
AddFileToDatabase(release, tx, entry.Size(), mime, pathEntry+"/"+entry.Name())
}
}
}
}
func AddFileToDatabase(release *database.Release, tx *PendingTransaction, size int64, mime, pathEntry string) {
defer tx.WaitGroup.Done()
}
func AddAudioToDatabase(release *database.Release, tx *PendingTransaction, size int64, mime, pathEntry string) {
log.Printf("handling %s", pathEntry)
defer tx.WaitGroup.Done()
resource := database.GetResourceFromDatabaseByPath(db, pathEntry)
if resource != nil && len(resource.GetReleases()) > 0 {
return
}
f, err := OpenFileWithLimit(pathEntry)
if err != nil { //TODO
log.Panic(err)
return
}
tx.WaitGroup.Add(1)
go func() {
defer tx.WaitGroup.Done()
defer CloseFileWithLimit(f, true)
hasher := sha256.New()
io.Copy(hasher, f)
var hash database.ResourceHashIdentifier
copy(hash[:], hasher.Sum(nil))
resourceCreationMutex.Lock()
resource = database.GetResourceFromDatabaseByHash(db, hash)
var exists = resource != nil && len(resource.GetReleases()) > 0
if resource == nil {
//insert
resource = database.InsertResourceToDatabase(db, hash, size, []byte(pathEntry), mime)
}
resourceCreationMutex.Unlock()
if resource == nil {
log.Panic(errors.New("failed to create resource")) //TODO: not panic
}
log.Printf("hash %x path %s", hash, pathEntry)
f.Seek(0, 0)
meta, err := tag.ReadFrom(f)
if err == nil { //no metadata
log.Print(meta)
} else {
log.Print(err)
err = nil
}
f.Seek(0, 0)
defer resource.AddRelease(release)
if !exists {
//Fingerprint
f.Seek(0, 0)
var stream *audio.Stream
switch mime { //TODO: change this to scan style
case "audio/flac":
//TODO: add CRC32 scan
flacFormat := flac.NewFormat()
stream, err = flacFormat.Open(f, panakoInstance.BlockSize)
case "audio/mpeg;codecs=mp3":
mp3Format := mp3.NewFormat()
stream, err = mp3Format.Open(f, panakoInstance.BlockSize)
case "audio/ogg":
fallthrough
case "audio/opus":
opusFormat := opus.NewFormat()
stream, err = opusFormat.Open(f, panakoInstance.BlockSize)
}
if err != nil { //cannot decode
log.Printf("error for %s: %s", pathEntry, err)
return //TODO
}
if stream == nil { //no known decoder
log.Printf("error for %s: no known decoder", pathEntry)
return
}
printStrategy.StoreStream(panako.ResourceId(resource.GetId()), stream)
log.Printf("finished fingerprinting %s", pathEntry)
}
}()
}
func main() {
pgConnStr := flag.String("connstr", "", "Postgres connection string for postgres database")
printDbOption := flag.String("printdb", "", "Fingerprint database path. Fast storage recommended.")
cmdOption := flag.String("cmd", "add", "Command: add, query, server")
pathOption := flag.String("path", "", "Path for commands.")
listenAddress := flag.String("listen", ":7777", "Address/port to lisent on.")
certificatePath := flag.String("certificate", "", "Path to SSL certificate file.")
keypairPath := flag.String("keypair", "", "Path to SSL key file.")
sniAddressOption := flag.String("sni", "", "Define SNI address if desired. Empty will serve any requests regardless.")
fdlimitOption := flag.Int("fdlimit", 128, "Set file operation and process limit.")
goProcs := flag.Int("maxprocs", runtime.NumCPU(), "Set GOMAXPROCS.")
flag.Parse()
fdlimit = make(chan bool, *fdlimitOption)
runtime.GOMAXPROCS(*goProcs)
var err error
if *printDbOption == "" || *pgConnStr == "" {
flag.PrintDefaults()
os.Exit(0)
}
db, err = database.OpenDatabase(*pgConnStr)
if err != nil {
log.Fatal(err)
}
defer db.Close()
//TODO: check if mutex is correct!
panakoInstance = panako.NewDefaultPackedInstance()
if *cmdOption == "add" {
printDb, err := store.NewAppendStore(*printDbOption, true, true)
if err != nil {
log.Fatal(err)
}
defer printDb.Close()
printStrategy = panakoInstance.GetStrategy(printDb)
var release *database.Release
releases := database.GetReleasesFromDatabaseByIdentifier(db, "test")
if len(releases) == 0 {
release = database.InsertReleaseToDatabase(db, []string{"test"})
} else {
release = releases[0]
}
tx, err := db.GetTx()
if err != nil {
log.Panic(err)
}
pending := &PendingTransaction{
Tx: tx,
} //TODO
filePath, err := filepath.Abs(*pathOption)
if err != nil {
log.Panic(err)
}
pending.WaitGroup.Add(1)
AddFolderToDatabase(release, pending, filePath)
pending.WaitGroup.Wait()
} else if *cmdOption == "query" {
printDb, err := specializedstore.NewBadgerStore(*printDbOption, true, true, 100000)
if err != nil {
log.Fatal(err)
}
defer printDb.Close()
printStrategy = panakoInstance.GetStrategy(printDb)
pathEntry, err := filepath.Abs(*pathOption)
if err != nil {
log.Panic(err)
}
f, err := OpenFileWithLimit(pathEntry)
if err != nil { //TODO
log.Panic(err)
return
}
var stream *audio.Stream
switch GetMimeTypeFromExtension(path.Ext(pathEntry)) { //TODO: change this to scan style
case "audio/flac":
//TODO: add CRC32 scan
flacFormat := flac.NewFormat()
stream, err = flacFormat.Open(f, panakoInstance.BlockSize)
case "audio/mpeg;codecs=mp3":
mp3Format := mp3.NewFormat()
stream, err = mp3Format.Open(f, panakoInstance.BlockSize)
case "audio/ogg":
fallthrough
case "audio/opus":
opusFormat := opus.NewFormat()
stream, err = opusFormat.Open(f, panakoInstance.BlockSize)
}
if err != nil { //cannot decode
log.Panic(err) //TODO: not panic
}
if stream == nil { //no known decoder
log.Panic(fmt.Errorf("no known decoder for %s", pathEntry))
}
start := time.Now()
prints := printStrategy.StreamToFingerprints(stream)
printT := time.Now()
results := printStrategy.QueryFingerprints(prints)
resultT := time.Now()
log.Printf("decode+fingerprint = %dms, query = %dms", printT.Sub(start).Milliseconds(), resultT.Sub(printT).Milliseconds())
for _, result := range results {
log.Printf("result %#v\n", result)
resource := database.GetResourceFromDatabase(db, int64(result.ReferenceResourceId))
if resource != nil {
log.Printf("matched %s score %d\n", resource.GetPath(), result.Score)
}
}
log.Printf("finished querying %s", pathEntry)
} else if *cmdOption == "server" {
printDb, err := specializedstore.NewBadgerStore(*printDbOption, true, true, 100000)
if err != nil {
log.Fatal(err)
}
defer printDb.Close()
printStrategy = panakoInstance.GetStrategy(printDb)
tlsConfiguration, err := tlsutils.NewTLSConfiguration(*certificatePath, *keypairPath, *sniAddressOption)
if err != nil {
log.Fatal(err)
}
filePath, err := filepath.Abs(*pathOption)
if err != nil {
log.Panic(err)
}
server := &httputils.Server{
ListenAddress: *listenAddress,
TLSConfig: tlsConfiguration,
EnableHTTP2: false,
EnableHTTP3: false,
Handler: func(ctx *httputils.RequestContext) {
if len(ctx.GetRequestHeader("Host")) > 0 && ctx.GetRequestHeader("Host") == ctx.GetTLSServerName() { //Prevents rebinding / DNS stuff
ctx.SetResponseCode(http.StatusNotFound)
return
}
if ctx.IsGet() || ctx.IsHead() {
pathName := ctx.GetPath()
if pathName == "/" {
pathName = "/index.html"
}
ctx.SetResponseHeader("Content-Type", GetMimeTypeFromExtension(path.Ext(filePath+pathName)))
ctx.ServeFile(filePath + pathName)
} else if ctx.IsPost() {
if ctx.GetPath() == "/query" {
setOtherHeaders(ctx)
setCORSHeaders(ctx)
var records []panako.StoreRecord
body := ctx.GetBody()
for {
var hash uint32
err := binary.Read(body, binary.BigEndian, &hash)
if err != nil {
break
}
var packed uint32
err = binary.Read(body, binary.LittleEndian, &packed)
if err != nil {
ctx.SetResponseCode(http.StatusBadRequest)
return
}
records = append(records, panako.NewStoreRecordFromCompactPacked(0, hash, packed))
}
type Result struct {
MatchId int64 `json:"matchId"`
FileHash string `json:"fileHash"`
FileName string `json:"fileName"`
FileSize int `json:"fileSize"`
MatchStart float64 `json:"matchStart"`
MatchEnd float64 `json:"matchEnd"`
QueryStart float64 `json:"queryStart"`
QueryEnd float64 `json:"queryEnd"`
Segment int `json:"segment"`
Score int `json:"score"`
TimeFactor float64 `json:"timeFactor"`
FrequencyFactor float64 `json:"frequencyFactor"`
PercentOfSecondsWithMatches float64 `json:"percentMatch"`
}
results := make([]Result, 0, 1)
for _, m := range printStrategy.QueryStoreRecords(records) {
resource := database.GetResourceFromDatabase(db, int64(m.ReferenceResourceId))
if resource == nil {
continue
}
results = append(results, Result{
MatchId: resource.GetId(),
FileHash: hex.EncodeToString(resource.GetHash().ToBytes()),
FileName: path.Base(string(resource.GetPath())),
FileSize: int(resource.GetSize()),
MatchStart: m.ReferenceStart.Seconds(),
MatchEnd: m.ReferenceStop.Seconds(),
QueryStart: m.QueryStart.Seconds(),
QueryEnd: m.QueryStop.Seconds(),
Segment: -1,
Score: m.Score,
TimeFactor: m.TimeFactor,
FrequencyFactor: m.FrequencyFactor,
PercentOfSecondsWithMatches: m.PercentOfSecondsWithMatches,
})
}
if len(records) > 0 {
splitBy := len(records) / 10
for i := 0; i < len(records); i += splitBy {
if i+splitBy >= len(records) {
continue
}
for _, m := range printStrategy.QueryStoreRecords(records[i : i+splitBy]) {
exists := false
for _, v := range results {
if v.MatchId == int64(m.ReferenceResourceId) {
exists = true
break
}
}
if exists {
continue
}
resource := database.GetResourceFromDatabase(db, int64(m.ReferenceResourceId))
if resource == nil {
continue
}
results = append(results, Result{
MatchId: resource.GetId(),
FileHash: hex.EncodeToString(resource.GetHash().ToBytes()),
FileName: path.Base(string(resource.GetPath())),
FileSize: int(resource.GetSize()),
MatchStart: m.ReferenceStart.Seconds(),
MatchEnd: m.ReferenceStop.Seconds(),
QueryStart: m.QueryStart.Seconds(),
QueryEnd: m.QueryStop.Seconds(),
Segment: i,
Score: m.Score,
TimeFactor: m.TimeFactor,
FrequencyFactor: m.FrequencyFactor,
PercentOfSecondsWithMatches: m.PercentOfSecondsWithMatches,
})
}
}
}
sort.Slice(results, func(i, j int) bool {
return results[j].Score < results[i].Score
})
ctx.SetResponseHeader("Content-Type", "application/json")
b, _ := json.MarshalIndent(results, "", " ")
ctx.ServeBytes(b)
ctx.SetResponseCode(http.StatusOK)
} else {
ctx.SetResponseCode(http.StatusNotFound)
}
} else if ctx.IsOptions() {
setOtherHeaders(ctx)
setCORSHeaders(ctx)
ctx.SetResponseCode(http.StatusNoContent)
} else {
ctx.SetResponseCode(http.StatusNotImplemented)
}
},
Debug: false,
}
server.Serve()
}
}
func setOtherHeaders(ctx *httputils.RequestContext) {
ctx.SetResponseHeader("Server", "METANOIA")
ctx.SetResponseHeader("Vary", "Content-Encoding")
ctx.SetResponseHeader("X-Content-Type-Options", "nosniff")
ctx.SetResponseHeader("X-Robots-Tags", "noindex, nofollow, notranslate")
ctx.SetResponseHeader("Referrer-Policy", "origin")
}
func setCORSHeaders(ctx *httputils.RequestContext) {
ctx.SetResponseHeader("Access-Control-Allow-Credentials", "true")
ctx.SetResponseHeader("Access-Control-Max-Age", "7200") //Firefox caps this to 86400, Chrome to 7200. Default is 5 seconds (!!!)
ctx.SetResponseHeader("Access-Control-Allow-Methods", "GET,HEAD,POST,OPTIONS")
ctx.SetResponseHeader("Access-Control-Allow-Headers", "DNT,ETag,Origin,Accept,Accept-Language,X-Requested-With,Range")
ctx.SetResponseHeader("Access-Control-Allow-Origin", "*")
ctx.SetResponseHeader("Access-Control-Expose-Headers", "*")
//CORP, COEP, COOP
ctx.SetResponseHeader("Cross-Origin-Embedder-Policy", "require-corp")
ctx.SetResponseHeader("Cross-Origin-Resource-Policy", "cross-origin")
ctx.SetResponseHeader("Cross-Origin-Opener-Policy", "unsafe-none")
}