chihaya/database/reload.go

361 lines
8.7 KiB
Go

/*
* This file is part of Chihaya.
*
* Chihaya is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Chihaya is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Chihaya. If not, see <http://www.gnu.org/licenses/>.
*/
package database
import (
"math"
"sync/atomic"
"time"
"chihaya/collectors"
"chihaya/config"
cdb "chihaya/database/types"
"chihaya/log"
"chihaya/util"
)
var (
reloadInterval int
// GlobalFreeleech indicates whether site is now in freeleech mode (takes precedence over torrent-specific multipliers)
GlobalFreeleech atomic.Bool
)
func init() {
intervals := config.Section("intervals")
reloadInterval, _ = intervals.GetInt("database_reload", 45)
}
/*
* Reloading is performed synchronously for each cache to lower database thrashing.
*
* Cache synchronization is handled by using sync.RWMutex, which has a bunch of advantages:
* - The number of simultaneous readers is arbitrarily high
* - Writing is blocked until all current readers release the mutex
* - Once a writer locks the mutex, new readers block until the writer unlocks it
*/
func (db *Database) startReloading() {
go func() {
util.ContextTick(db.ctx, time.Duration(reloadInterval)*time.Second, func() {
db.waitGroup.Add(1)
defer db.waitGroup.Done()
db.loadUsers()
db.loadHitAndRuns()
db.loadTorrents()
db.loadGroupsFreeleech()
db.loadConfig()
db.loadClients()
})
}()
}
func (db *Database) loadUsers() {
start := time.Now()
dbUsers := *db.Users.Load()
newUsers := make(map[string]*cdb.User, len(dbUsers))
rows := db.query(db.loadUsersStmt)
if rows == nil {
log.Error.Print("Failed to load hit and runs from database")
log.WriteStack()
return
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var (
id uint32
torrentPass string
downMultiplier, upMultiplier float64
disableDownload, trackerHide bool
)
if err := rows.Scan(&id, &torrentPass, &downMultiplier, &upMultiplier, &disableDownload, &trackerHide); err != nil {
log.Error.Printf("Error scanning user row: %s", err)
log.WriteStack()
}
if old, exists := dbUsers[torrentPass]; exists && old != nil {
old.ID.Store(id)
old.DownMultiplier.Store(math.Float64bits(downMultiplier))
old.UpMultiplier.Store(math.Float64bits(upMultiplier))
old.DisableDownload.Store(disableDownload)
old.TrackerHide.Store(trackerHide)
newUsers[torrentPass] = old
} else {
u := &cdb.User{}
u.ID.Store(id)
u.DownMultiplier.Store(math.Float64bits(downMultiplier))
u.UpMultiplier.Store(math.Float64bits(upMultiplier))
u.DisableDownload.Store(disableDownload)
u.TrackerHide.Store(trackerHide)
newUsers[torrentPass] = u
}
}
db.Users.Store(&newUsers)
elapsedTime := time.Since(start)
collectors.UpdateReloadTime("users", elapsedTime)
log.Info.Printf("User load complete (%d rows, %s)", len(newUsers), elapsedTime.String())
}
func (db *Database) loadHitAndRuns() {
start := time.Now()
newHnr := make(map[cdb.UserTorrentPair]struct{})
rows := db.query(db.loadHnrStmt)
if rows == nil {
log.Error.Print("Failed to load hit and runs from database")
log.WriteStack()
return
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var uid, fid uint32
if err := rows.Scan(&uid, &fid); err != nil {
log.Error.Printf("Error scanning hit and run row: %s", err)
log.WriteStack()
}
hnr := cdb.UserTorrentPair{
UserID: uid,
TorrentID: fid,
}
newHnr[hnr] = struct{}{}
}
db.HitAndRuns.Store(&newHnr)
elapsedTime := time.Since(start)
collectors.UpdateReloadTime("hit_and_runs", elapsedTime)
log.Info.Printf("Hit and run load complete (%d rows, %s)", len(newHnr), elapsedTime.String())
}
func (db *Database) loadTorrents() {
var start time.Time
dbTorrents := *db.Torrents.Load()
newTorrents := make(map[cdb.TorrentHash]*cdb.Torrent, len(dbTorrents))
start = time.Now()
rows := db.query(db.loadTorrentsStmt)
if rows == nil {
log.Error.Print("Failed to load torrents from database")
log.WriteStack()
return
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var (
infoHash cdb.TorrentHash
id uint32
downMultiplier, upMultiplier float64
snatched uint16
status uint8
groupID uint32
torrentType string
)
if err := rows.Scan(
&id,
&infoHash,
&downMultiplier,
&upMultiplier,
&snatched,
&status,
&groupID,
&torrentType,
); err != nil {
log.Error.Printf("Error scanning torrent row: %s", err)
log.WriteStack()
}
torrentTypeUint64, err := cdb.TorrentTypeFromString(torrentType)
if err != nil {
log.Error.Printf("Error storing torrent row: %s", err)
log.WriteStack()
}
if old, exists := dbTorrents[infoHash]; exists && old != nil {
old.ID.Store(id)
old.DownMultiplier.Store(math.Float64bits(downMultiplier))
old.UpMultiplier.Store(math.Float64bits(upMultiplier))
old.Snatched.Store(uint32(snatched))
old.Status.Store(uint32(status))
old.Group.TorrentType.Store(torrentTypeUint64)
old.Group.GroupID.Store(groupID)
newTorrents[infoHash] = old
} else {
t := &cdb.Torrent{
Seeders: make(map[cdb.PeerKey]*cdb.Peer),
Leechers: make(map[cdb.PeerKey]*cdb.Peer),
}
t.InitializeLock()
t.ID.Store(id)
t.DownMultiplier.Store(math.Float64bits(downMultiplier))
t.UpMultiplier.Store(math.Float64bits(upMultiplier))
t.Snatched.Store(uint32(snatched))
t.Status.Store(uint32(status))
t.Group.TorrentType.Store(torrentTypeUint64)
t.Group.GroupID.Store(groupID)
newTorrents[infoHash] = t
}
}
db.Torrents.Store(&newTorrents)
elapsedTime := time.Since(start)
collectors.UpdateReloadTime("torrents", elapsedTime)
log.Info.Printf("Torrent load complete (%d rows, %s)", len(newTorrents), elapsedTime.String())
}
func (db *Database) loadGroupsFreeleech() {
start := time.Now()
newTorrentGroupFreeleech := make(map[cdb.TorrentGroupKey]*cdb.TorrentGroupFreeleech)
rows := db.query(db.loadTorrentGroupFreeleechStmt)
if rows == nil {
log.Error.Print("Failed to load torrent group freeleech data from database")
log.WriteStack()
return
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var (
downMultiplier, upMultiplier float64
groupID uint32
torrentType string
)
if err := rows.Scan(&groupID, &torrentType, &downMultiplier, &upMultiplier); err != nil {
log.Error.Printf("Error scanning torrent group freeleech row: %s", err)
log.WriteStack()
}
k, err := cdb.TorrentGroupKeyFromString(torrentType, groupID)
if err != nil {
log.Error.Printf("Error storing torrent group freeleech row: %s", err)
log.WriteStack()
}
newTorrentGroupFreeleech[k] = &cdb.TorrentGroupFreeleech{
UpMultiplier: upMultiplier,
DownMultiplier: downMultiplier,
}
}
db.TorrentGroupFreeleech.Store(&newTorrentGroupFreeleech)
elapsedTime := time.Since(start)
collectors.UpdateReloadTime("groups_freeleech", elapsedTime)
log.Info.Printf("Group freeleech load complete (%d rows, %s)", len(newTorrentGroupFreeleech), elapsedTime.String())
}
func (db *Database) loadConfig() {
rows := db.query(db.loadFreeleechStmt)
if rows == nil {
log.Error.Print("Failed to load config from database")
log.WriteStack()
return
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var globalFreelech bool
if err := rows.Scan(&globalFreelech); err != nil {
log.Error.Printf("Error scanning config row: %s", err)
log.WriteStack()
}
GlobalFreeleech.Store(globalFreelech)
}
}
func (db *Database) loadClients() {
start := time.Now()
newClients := make(map[uint16]string)
rows := db.query(db.loadClientsStmt)
if rows == nil {
log.Error.Print("Failed to load clients from database")
log.WriteStack()
return
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var (
id uint16
peerID string
)
if err := rows.Scan(&id, &peerID); err != nil {
log.Error.Printf("Error scanning client list row: %s", err)
log.WriteStack()
}
newClients[id] = peerID
}
db.Clients.Store(&newClients)
elapsedTime := time.Since(start)
collectors.UpdateReloadTime("clients", elapsedTime)
log.Info.Printf("Client list load complete (%d rows, %s)", len(newClients), elapsedTime.String())
}