database: keep single sql.DB instead of multiple custom Connection, close spawned goroutines via context tickers, remove TEMPORARY table dependency, alter config to take DSN directly
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
9cd8bdbe37
commit
f39eb883f9
|
@ -58,12 +58,7 @@ Configuration is done in `config.json`, which you'll need to create with the fol
|
|||
```json
|
||||
{
|
||||
"database": {
|
||||
"username": "chihaya",
|
||||
"password": "",
|
||||
"database": "chihaya",
|
||||
"proto": "tcp",
|
||||
"addr": "127.0.0.1:3306",
|
||||
|
||||
"dsn": "chihaya:@tcp(127.0.0.1:3306)/chihaya",
|
||||
"deadlock_pause": 1,
|
||||
"deadlock_retries": 5
|
||||
},
|
||||
|
|
|
@ -40,18 +40,16 @@ func TestMain(m *testing.M) {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
f, err := os.OpenFile("config.json", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
||||
f, err := os.OpenFile(configFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
configTest = make(Map)
|
||||
dbConfig := map[string]interface{}{
|
||||
"username": "chihaya",
|
||||
"password": "",
|
||||
"proto": "tcp",
|
||||
"addr": "127.0.0.1:3306",
|
||||
"database": "chihaya",
|
||||
"dsn": "chihaya:@tcp(127.0.0.1:3306)/chihaya",
|
||||
"deadlock_pause": json.Number("1"),
|
||||
"deadlock_retries": json.Number("5"),
|
||||
}
|
||||
configTest["database"] = dbConfig
|
||||
configTest["addr"] = ":34000"
|
||||
|
@ -148,5 +146,5 @@ func TestSection(t *testing.T) {
|
|||
}
|
||||
|
||||
func cleanup() {
|
||||
_ = os.Remove("config.json")
|
||||
_ = os.Remove(configFile)
|
||||
}
|
||||
|
|
|
@ -19,8 +19,8 @@ package database
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -35,11 +35,6 @@ import (
|
|||
"github.com/go-sql-driver/mysql"
|
||||
)
|
||||
|
||||
type Connection struct {
|
||||
sqlDb *sql.DB
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
type Database struct {
|
||||
snatchChannel chan *bytes.Buffer
|
||||
transferHistoryChannel chan *bytes.Buffer
|
||||
|
@ -62,13 +57,15 @@ type Database struct {
|
|||
TorrentGroupFreeleech atomic.Pointer[map[cdb.TorrentGroupKey]*cdb.TorrentGroupFreeleech]
|
||||
Clients atomic.Pointer[map[uint16]string]
|
||||
|
||||
mainConn *Connection // Used for reloading and misc queries
|
||||
|
||||
bufferPool *util.BufferPool
|
||||
|
||||
transferHistoryLock sync.Mutex
|
||||
|
||||
conn *sql.DB
|
||||
|
||||
terminate atomic.Bool
|
||||
ctx context.Context
|
||||
ctxCancel func()
|
||||
waitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
|
@ -77,71 +74,66 @@ var (
|
|||
maxDeadlockRetries int
|
||||
)
|
||||
|
||||
var defaultDsn = map[string]string{
|
||||
"username": "chihaya",
|
||||
"password": "",
|
||||
"proto": "tcp",
|
||||
"addr": "127.0.0.1:3306",
|
||||
"database": "chihaya",
|
||||
}
|
||||
const defaultDsn = "chihaya:@tcp(127.0.0.1:3306)/chihaya"
|
||||
|
||||
func (db *Database) Init() {
|
||||
db.terminate.Store(false)
|
||||
db.ctx, db.ctxCancel = context.WithCancel(context.Background())
|
||||
|
||||
log.Info.Print("Opening database connection...")
|
||||
|
||||
db.mainConn = Open()
|
||||
db.conn = Open()
|
||||
|
||||
// Used for recording updates, so the max required size should be < 128 bytes. See queue.go for details
|
||||
db.bufferPool = util.NewBufferPool(128)
|
||||
|
||||
var err error
|
||||
|
||||
db.loadUsersStmt, err = db.mainConn.sqlDb.Prepare(
|
||||
db.loadUsersStmt, err = db.conn.Prepare(
|
||||
"SELECT ID, torrent_pass, DownMultiplier, UpMultiplier, DisableDownload, TrackerHide " +
|
||||
"FROM users_main WHERE Enabled = '1'")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db.loadHnrStmt, err = db.mainConn.sqlDb.Prepare(
|
||||
db.loadHnrStmt, err = db.conn.Prepare(
|
||||
"SELECT h.uid, h.fid FROM transfer_history AS h " +
|
||||
"JOIN users_main AS u ON u.ID = h.uid WHERE h.hnr = 1 AND u.Enabled = '1'")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db.loadTorrentsStmt, err = db.mainConn.sqlDb.Prepare(
|
||||
db.loadTorrentsStmt, err = db.conn.Prepare(
|
||||
"SELECT ID, info_hash, DownMultiplier, UpMultiplier, Snatched, Status, GroupID, TorrentType FROM torrents")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db.loadTorrentGroupFreeleechStmt, err = db.mainConn.sqlDb.Prepare(
|
||||
db.loadTorrentGroupFreeleechStmt, err = db.conn.Prepare(
|
||||
"SELECT GroupID, `Type`, DownMultiplier, UpMultiplier FROM torrent_group_freeleech")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db.loadClientsStmt, err = db.mainConn.sqlDb.Prepare(
|
||||
db.loadClientsStmt, err = db.conn.Prepare(
|
||||
"SELECT id, peer_id FROM approved_clients WHERE archived = 0")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db.loadFreeleechStmt, err = db.mainConn.sqlDb.Prepare(
|
||||
db.loadFreeleechStmt, err = db.conn.Prepare(
|
||||
"SELECT mod_setting FROM mod_core WHERE mod_option = 'global_freeleech'")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db.cleanStalePeersStmt, err = db.mainConn.sqlDb.Prepare(
|
||||
db.cleanStalePeersStmt, err = db.conn.Prepare(
|
||||
"UPDATE transfer_history SET active = 0 WHERE last_announce < ? AND active = 1")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db.unPruneTorrentStmt, err = db.mainConn.sqlDb.Prepare(
|
||||
db.unPruneTorrentStmt, err = db.conn.Prepare(
|
||||
"UPDATE torrents SET Status = 0 WHERE ID = ?")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -180,6 +172,7 @@ func (db *Database) Terminate() {
|
|||
log.Info.Print("Terminating database connection...")
|
||||
|
||||
db.terminate.Store(true)
|
||||
db.ctxCancel()
|
||||
|
||||
log.Info.Print("Closing all flush channels...")
|
||||
db.closeFlushChannels()
|
||||
|
@ -190,13 +183,11 @@ func (db *Database) Terminate() {
|
|||
}()
|
||||
|
||||
db.waitGroup.Wait()
|
||||
db.mainConn.mutex.Lock()
|
||||
_ = db.mainConn.Close()
|
||||
db.mainConn.mutex.Unlock()
|
||||
_ = db.conn.Close()
|
||||
db.serialize()
|
||||
}
|
||||
|
||||
func Open() *Connection {
|
||||
func Open() *sql.DB {
|
||||
databaseConfig := config.Section("database")
|
||||
deadlockWaitTime, _ = databaseConfig.GetInt("deadlock_pause", 1)
|
||||
maxDeadlockRetries, _ = databaseConfig.GetInt("deadlock_retries", 5)
|
||||
|
@ -212,18 +203,7 @@ func Open() *Connection {
|
|||
// First try to load the DSN from environment. USeful for tests.
|
||||
databaseDsn := os.Getenv("DB_DSN")
|
||||
if databaseDsn == "" {
|
||||
dbUsername, _ := databaseConfig.Get("username", defaultDsn["username"])
|
||||
dbPassword, _ := databaseConfig.Get("password", defaultDsn["password"])
|
||||
dbProto, _ := databaseConfig.Get("proto", defaultDsn["proto"])
|
||||
dbAddr, _ := databaseConfig.Get("addr", defaultDsn["addr"])
|
||||
dbDatabase, _ := databaseConfig.Get("database", defaultDsn["database"])
|
||||
databaseDsn = fmt.Sprintf("%s:%s@%s(%s)/%s",
|
||||
dbUsername,
|
||||
dbPassword,
|
||||
dbProto,
|
||||
dbAddr,
|
||||
dbDatabase,
|
||||
)
|
||||
databaseDsn, _ = databaseConfig.Get("dsn", defaultDsn)
|
||||
}
|
||||
|
||||
sqlDb, err := sql.Open("mysql", databaseDsn)
|
||||
|
@ -235,16 +215,10 @@ func Open() *Connection {
|
|||
log.Fatal.Fatalf("Couldn't ping database - %s", err)
|
||||
}
|
||||
|
||||
return &Connection{
|
||||
sqlDb: sqlDb,
|
||||
}
|
||||
return sqlDb
|
||||
}
|
||||
|
||||
func (db *Connection) Close() error {
|
||||
return db.sqlDb.Close()
|
||||
}
|
||||
|
||||
func (db *Connection) query(stmt *sql.Stmt, args ...interface{}) *sql.Rows { //nolint:unparam
|
||||
func (db *Database) query(stmt *sql.Stmt, args ...interface{}) *sql.Rows { //nolint:unparam
|
||||
rows, _ := perform(func() (interface{}, error) {
|
||||
return stmt.Query(args...)
|
||||
}).(*sql.Rows)
|
||||
|
@ -252,7 +226,7 @@ func (db *Connection) query(stmt *sql.Stmt, args ...interface{}) *sql.Rows { //n
|
|||
return rows
|
||||
}
|
||||
|
||||
func (db *Connection) execute(stmt *sql.Stmt, args ...interface{}) sql.Result {
|
||||
func (db *Database) execute(stmt *sql.Stmt, args ...interface{}) sql.Result {
|
||||
result, _ := perform(func() (interface{}, error) {
|
||||
return stmt.Exec(args...)
|
||||
}).(sql.Result)
|
||||
|
@ -260,9 +234,9 @@ func (db *Connection) execute(stmt *sql.Stmt, args ...interface{}) sql.Result {
|
|||
return result
|
||||
}
|
||||
|
||||
func (db *Connection) exec(query *bytes.Buffer, args ...interface{}) sql.Result { //nolint:unparam
|
||||
func (db *Database) exec(query *bytes.Buffer, args ...interface{}) sql.Result { //nolint:unparam
|
||||
result, _ := perform(func() (interface{}, error) {
|
||||
return db.sqlDb.Exec(query.String(), args...)
|
||||
return db.conn.Exec(query.String(), args...)
|
||||
}).(sql.Result)
|
||||
|
||||
return result
|
||||
|
|
|
@ -46,7 +46,7 @@ func TestMain(m *testing.M) {
|
|||
db.Init()
|
||||
|
||||
fixtures, err = testfixtures.New(
|
||||
testfixtures.Database(db.mainConn.sqlDb),
|
||||
testfixtures.Database(db.conn),
|
||||
testfixtures.Dialect("mariadb"),
|
||||
testfixtures.Directory("fixtures"),
|
||||
testfixtures.DangerousSkipTestDatabaseCheck(),
|
||||
|
@ -188,6 +188,10 @@ func TestLoadTorrents(t *testing.T) {
|
|||
{89, 252, 84, 49, 177, 28, 118, 28, 148, 205, 62, 185, 8, 37, 234, 110, 109, 200, 165, 241}: t3,
|
||||
}
|
||||
|
||||
for k := range torrents {
|
||||
torrents[k].InitializeLock()
|
||||
}
|
||||
|
||||
// Test with fresh data
|
||||
db.loadTorrents()
|
||||
|
||||
|
@ -375,7 +379,7 @@ func TestRecordAndFlushUsers(t *testing.T) {
|
|||
deltaDownload = int64(float64(deltaRawDownload) * math.Float64frombits(testUser.DownMultiplier.Load()))
|
||||
deltaUpload = int64(float64(deltaRawUpload) * math.Float64frombits(testUser.UpMultiplier.Load()))
|
||||
|
||||
row := db.mainConn.sqlDb.QueryRow("SELECT Uploaded, Downloaded, rawup, rawdl "+
|
||||
row := db.conn.QueryRow("SELECT Uploaded, Downloaded, rawup, rawdl "+
|
||||
"FROM users_main WHERE ID = ?", testUser.ID.Load())
|
||||
|
||||
err := row.Scan(&initUpload, &initDownload, &initRawUpload, &initRawDownload)
|
||||
|
@ -390,7 +394,7 @@ func TestRecordAndFlushUsers(t *testing.T) {
|
|||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
row = db.mainConn.sqlDb.QueryRow("SELECT Uploaded, Downloaded, rawup, rawdl "+
|
||||
row = db.conn.QueryRow("SELECT Uploaded, Downloaded, rawup, rawdl "+
|
||||
"FROM users_main WHERE ID = ?", testUser.ID.Load())
|
||||
|
||||
err = row.Scan(&upload, &download, &rawUpload, &rawDownload)
|
||||
|
@ -471,7 +475,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
deltaActiveTime = 267
|
||||
deltaSeedTime = 15
|
||||
|
||||
row := db.mainConn.sqlDb.QueryRow("SELECT uploaded, downloaded, activetime, seedtime, active, snatched "+
|
||||
row := db.conn.QueryRow("SELECT uploaded, downloaded, activetime, seedtime, active, snatched "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", testPeer.UserID, testPeer.TorrentID)
|
||||
|
||||
err := row.Scan(&initRawUpload, &initRawDownload, &initActiveTime, &initSeedTime, &initActive, &initSnatch)
|
||||
|
@ -492,7 +496,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
row = db.mainConn.sqlDb.QueryRow("SELECT uploaded, downloaded, activetime, seedtime, active, snatched "+
|
||||
row = db.conn.QueryRow("SELECT uploaded, downloaded, activetime, seedtime, active, snatched "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", testPeer.UserID, testPeer.TorrentID)
|
||||
|
||||
err = row.Scan(&rawUpload, &rawDownload, &activeTime, &seedTime, &active, &snatch)
|
||||
|
@ -553,7 +557,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
|
||||
var gotStartTime int64
|
||||
|
||||
row = db.mainConn.sqlDb.QueryRow("SELECT seeding, starttime, last_announce, remaining "+
|
||||
row = db.conn.QueryRow("SELECT seeding, starttime, last_announce, remaining "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", gotPeer.UserID, gotPeer.TorrentID)
|
||||
|
||||
err = row.Scan(&gotPeer.Seeding, &gotStartTime, &gotPeer.LastAnnounce, &gotPeer.Left)
|
||||
|
@ -598,7 +602,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
row = db.mainConn.sqlDb.QueryRow("SELECT seeding, starttime, last_announce, remaining "+
|
||||
row = db.conn.QueryRow("SELECT seeding, starttime, last_announce, remaining "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", gotPeer.UserID, gotPeer.TorrentID)
|
||||
|
||||
err = row.Scan(&gotPeer.Seeding, &gotPeer.StartTime, &gotPeer.LastAnnounce, &gotPeer.Left)
|
||||
|
@ -635,7 +639,7 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
deltaDownload = 236
|
||||
deltaUpload = 3262
|
||||
|
||||
row := db.mainConn.sqlDb.QueryRow("SELECT uploaded, downloaded "+
|
||||
row := db.conn.QueryRow("SELECT uploaded, downloaded "+
|
||||
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
|
||||
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
|
||||
|
@ -651,7 +655,7 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
row = db.mainConn.sqlDb.QueryRow("SELECT uploaded, downloaded "+
|
||||
row = db.conn.QueryRow("SELECT uploaded, downloaded "+
|
||||
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
|
||||
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
|
||||
|
@ -687,7 +691,7 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
|
||||
var gotStartTime int64
|
||||
|
||||
row = db.mainConn.sqlDb.QueryRow("SELECT port, starttime, last_announce "+
|
||||
row = db.conn.QueryRow("SELECT port, starttime, last_announce "+
|
||||
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
|
||||
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
|
||||
|
@ -732,7 +736,7 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
Addr: cdb.NewPeerAddressFromIPPort(testPeer.Addr.IP(), 0),
|
||||
}
|
||||
|
||||
row = db.mainConn.sqlDb.QueryRow("SELECT port, starttime, last_announce "+
|
||||
row = db.conn.QueryRow("SELECT port, starttime, last_announce "+
|
||||
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
|
||||
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
|
||||
|
@ -770,7 +774,7 @@ func TestRecordAndFlushSnatch(t *testing.T) {
|
|||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
row := db.mainConn.sqlDb.QueryRow("SELECT snatched_time "+
|
||||
row := db.conn.QueryRow("SELECT snatched_time "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", testPeer.UserID, testPeer.TorrentID)
|
||||
|
||||
err := row.Scan(&snatchTime)
|
||||
|
@ -823,7 +827,7 @@ func TestRecordAndFlushTorrents(t *testing.T) {
|
|||
numSeeders int
|
||||
)
|
||||
|
||||
row := db.mainConn.sqlDb.QueryRow("SELECT Snatched, last_action, Seeders, Leechers "+
|
||||
row := db.conn.QueryRow("SELECT Snatched, last_action, Seeders, Leechers "+
|
||||
"FROM torrents WHERE ID = ?", torrent.ID.Load())
|
||||
err := row.Scan(&snatched, &lastAction, &numSeeders, &numLeechers)
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package database
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"chihaya/util"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
|
@ -111,25 +112,9 @@ func (db *Database) flushTorrents() {
|
|||
count int
|
||||
)
|
||||
|
||||
conn := Open()
|
||||
|
||||
for {
|
||||
query.Reset()
|
||||
query.WriteString("CREATE TEMPORARY TABLE IF NOT EXISTS flush_torrents (" +
|
||||
"ID int unsigned NOT NULL, " +
|
||||
"Snatched int unsigned NOT NULL DEFAULT 0, " +
|
||||
"Seeders int unsigned NOT NULL DEFAULT 0, " +
|
||||
"Leechers int unsigned NOT NULL DEFAULT 0, " +
|
||||
"last_action int NOT NULL DEFAULT 0, " +
|
||||
"PRIMARY KEY (ID)) ENGINE=MEMORY")
|
||||
conn.exec(&query)
|
||||
|
||||
query.Reset()
|
||||
query.WriteString("TRUNCATE flush_torrents")
|
||||
conn.exec(&query)
|
||||
|
||||
query.Reset()
|
||||
query.WriteString("INSERT INTO flush_torrents VALUES ")
|
||||
query.WriteString("INSERT IGNORE INTO torrents (ID, Snatched, Seeders, Leechers, last_action) VALUES ")
|
||||
|
||||
length := len(db.torrentChannel)
|
||||
|
||||
|
@ -158,16 +143,7 @@ func (db *Database) flushTorrents() {
|
|||
query.WriteString(" ON DUPLICATE KEY UPDATE Snatched = Snatched + VALUE(Snatched), " +
|
||||
"Seeders = VALUE(Seeders), Leechers = VALUE(Leechers), " +
|
||||
"last_action = IF(last_action < VALUE(last_action), VALUE(last_action), last_action)")
|
||||
conn.exec(&query)
|
||||
|
||||
query.Reset()
|
||||
query.WriteString("UPDATE torrents t, flush_torrents ft SET " +
|
||||
"t.Snatched = t.Snatched + ft.Snatched, " +
|
||||
"t.Seeders = ft.Seeders, " +
|
||||
"t.Leechers = ft.Leechers, " +
|
||||
"t.last_action = IF(t.last_action < ft.last_action, ft.last_action, t.last_action)" +
|
||||
"WHERE t.ID = ft.ID")
|
||||
conn.exec(&query)
|
||||
db.exec(&query)
|
||||
|
||||
if !db.terminate.Load() {
|
||||
elapsedTime := time.Since(startTime)
|
||||
|
@ -184,8 +160,6 @@ func (db *Database) flushTorrents() {
|
|||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func (db *Database) flushUsers() {
|
||||
|
@ -197,25 +171,9 @@ func (db *Database) flushUsers() {
|
|||
count int
|
||||
)
|
||||
|
||||
conn := Open()
|
||||
|
||||
for {
|
||||
query.Reset()
|
||||
query.WriteString("CREATE TEMPORARY TABLE IF NOT EXISTS flush_users (" +
|
||||
"ID int unsigned NOT NULL, " +
|
||||
"Uploaded bigint unsigned NOT NULL DEFAULT 0, " +
|
||||
"Downloaded bigint unsigned NOT NULL DEFAULT 0, " +
|
||||
"rawdl bigint unsigned NOT NULL DEFAULT 0, " +
|
||||
"rawup bigint unsigned NOT NULL DEFAULT 0, " +
|
||||
"PRIMARY KEY (ID)) ENGINE=MEMORY")
|
||||
conn.exec(&query)
|
||||
|
||||
query.Reset()
|
||||
query.WriteString("TRUNCATE flush_users")
|
||||
conn.exec(&query)
|
||||
|
||||
query.Reset()
|
||||
query.WriteString("INSERT INTO flush_users VALUES ")
|
||||
query.WriteString("INSERT IGNORE INTO users_main (ID, Uploaded, Downloaded, rawdl, rawup) VALUES ")
|
||||
|
||||
length := len(db.userChannel)
|
||||
|
||||
|
@ -243,16 +201,7 @@ func (db *Database) flushUsers() {
|
|||
|
||||
query.WriteString(" ON DUPLICATE KEY UPDATE Uploaded = Uploaded + VALUE(Uploaded), " +
|
||||
"Downloaded = Downloaded + VALUE(Downloaded), rawdl = rawdl + VALUE(rawdl), rawup = rawup + VALUE(rawup)")
|
||||
conn.exec(&query)
|
||||
|
||||
query.Reset()
|
||||
query.WriteString("UPDATE users_main u, flush_users fu SET " +
|
||||
"u.Uploaded = u.Uploaded + fu.Uploaded, " +
|
||||
"u.Downloaded = u.Downloaded + fu.Downloaded, " +
|
||||
"u.rawdl = u.rawdl + fu.rawdl, " +
|
||||
"u.rawup = u.rawup + fu.rawup " +
|
||||
"WHERE u.ID = fu.ID")
|
||||
conn.exec(&query)
|
||||
db.exec(&query)
|
||||
|
||||
if !db.terminate.Load() {
|
||||
elapsedTime := time.Since(startTime)
|
||||
|
@ -269,8 +218,6 @@ func (db *Database) flushUsers() {
|
|||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func (db *Database) flushTransferHistory() {
|
||||
|
@ -282,8 +229,6 @@ func (db *Database) flushTransferHistory() {
|
|||
count int
|
||||
)
|
||||
|
||||
conn := Open()
|
||||
|
||||
for {
|
||||
length, err := func() (int, error) {
|
||||
db.transferHistoryLock.Lock()
|
||||
|
@ -323,7 +268,7 @@ func (db *Database) flushTransferHistory() {
|
|||
"seedtime = seedtime + VALUE(seedtime), last_announce = VALUE(last_announce), " +
|
||||
"active = VALUE(active), snatched = snatched + VALUE(snatched);")
|
||||
|
||||
conn.exec(&query)
|
||||
db.exec(&query)
|
||||
|
||||
if !db.terminate.Load() {
|
||||
elapsedTime := time.Since(startTime)
|
||||
|
@ -347,8 +292,6 @@ func (db *Database) flushTransferHistory() {
|
|||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func (db *Database) flushTransferIps() {
|
||||
|
@ -360,8 +303,6 @@ func (db *Database) flushTransferIps() {
|
|||
count int
|
||||
)
|
||||
|
||||
conn := Open()
|
||||
|
||||
for {
|
||||
query.Reset()
|
||||
query.WriteString("INSERT INTO transfer_ips (uid, fid, client_id, ip, port, uploaded, downloaded, " +
|
||||
|
@ -394,7 +335,7 @@ func (db *Database) flushTransferIps() {
|
|||
// todo: port should be part of PK
|
||||
query.WriteString("\nON DUPLICATE KEY UPDATE port = VALUE(port), downloaded = downloaded + VALUE(downloaded), " +
|
||||
"uploaded = uploaded + VALUE(uploaded), last_announce = VALUE(last_announce)")
|
||||
conn.exec(&query)
|
||||
db.exec(&query)
|
||||
|
||||
if !db.terminate.Load() {
|
||||
elapsedTime := time.Since(startTime)
|
||||
|
@ -411,8 +352,6 @@ func (db *Database) flushTransferIps() {
|
|||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func (db *Database) flushSnatches() {
|
||||
|
@ -424,8 +363,6 @@ func (db *Database) flushSnatches() {
|
|||
count int
|
||||
)
|
||||
|
||||
conn := Open()
|
||||
|
||||
for {
|
||||
query.Reset()
|
||||
query.WriteString("INSERT INTO transfer_history (uid, fid, snatched_time) VALUES\n")
|
||||
|
@ -455,7 +392,7 @@ func (db *Database) flushSnatches() {
|
|||
startTime := time.Now()
|
||||
|
||||
query.WriteString("\nON DUPLICATE KEY UPDATE snatched_time = VALUE(snatched_time)")
|
||||
conn.exec(&query)
|
||||
db.exec(&query)
|
||||
|
||||
if !db.terminate.Load() {
|
||||
elapsedTime := time.Since(startTime)
|
||||
|
@ -472,8 +409,6 @@ func (db *Database) flushSnatches() {
|
|||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func (db *Database) purgeInactivePeers() {
|
||||
|
@ -483,7 +418,7 @@ func (db *Database) purgeInactivePeers() {
|
|||
count int
|
||||
)
|
||||
|
||||
for !db.terminate.Load() {
|
||||
util.ContextTick(db.ctx, time.Duration(purgeInactivePeersInterval)*time.Second, func() {
|
||||
start = time.Now()
|
||||
now = start.Unix()
|
||||
count = 0
|
||||
|
@ -544,11 +479,8 @@ func (db *Database) purgeInactivePeers() {
|
|||
db.transferHistoryLock.Lock()
|
||||
defer db.transferHistoryLock.Unlock()
|
||||
|
||||
db.mainConn.mutex.Lock()
|
||||
defer db.mainConn.mutex.Unlock()
|
||||
|
||||
start = time.Now()
|
||||
result := db.mainConn.execute(db.cleanStalePeersStmt, oldestActive)
|
||||
result := db.execute(db.cleanStalePeersStmt, oldestActive)
|
||||
|
||||
if result != nil {
|
||||
rows, err := result.RowsAffected()
|
||||
|
@ -559,7 +491,5 @@ func (db *Database) purgeInactivePeers() {
|
|||
}
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(time.Duration(purgeInactivePeersInterval) * time.Second)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -179,7 +179,5 @@ func (db *Database) QueueSnatch(peer *cdb.Peer, now int64) {
|
|||
}
|
||||
|
||||
func (db *Database) UnPrune(torrent *cdb.Torrent) {
|
||||
db.mainConn.mutex.Lock()
|
||||
db.mainConn.execute(db.unPruneTorrentStmt, torrent.ID.Load())
|
||||
db.mainConn.mutex.Unlock()
|
||||
db.execute(db.unPruneTorrentStmt, torrent.ID.Load())
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"chihaya/config"
|
||||
cdb "chihaya/database/types"
|
||||
"chihaya/log"
|
||||
"chihaya/util"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -50,31 +51,26 @@ func init() {
|
|||
*/
|
||||
func (db *Database) startReloading() {
|
||||
go func() {
|
||||
for !db.terminate.Load() {
|
||||
time.Sleep(time.Duration(reloadInterval) * time.Second)
|
||||
|
||||
util.ContextTick(db.ctx, time.Duration(reloadInterval)*time.Second, func() {
|
||||
db.waitGroup.Add(1)
|
||||
defer db.waitGroup.Done()
|
||||
db.loadUsers()
|
||||
db.loadHitAndRuns()
|
||||
db.loadTorrents()
|
||||
db.loadGroupsFreeleech()
|
||||
db.loadConfig()
|
||||
db.loadClients()
|
||||
db.waitGroup.Done()
|
||||
}
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
func (db *Database) loadUsers() {
|
||||
db.mainConn.mutex.Lock()
|
||||
defer db.mainConn.mutex.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
dbUsers := *db.Users.Load()
|
||||
newUsers := make(map[string]*cdb.User, len(dbUsers))
|
||||
|
||||
rows := db.mainConn.query(db.loadUsersStmt)
|
||||
rows := db.query(db.loadUsersStmt)
|
||||
if rows == nil {
|
||||
log.Error.Print("Failed to load hit and runs from database")
|
||||
log.WriteStack()
|
||||
|
@ -126,13 +122,10 @@ func (db *Database) loadUsers() {
|
|||
}
|
||||
|
||||
func (db *Database) loadHitAndRuns() {
|
||||
db.mainConn.mutex.Lock()
|
||||
defer db.mainConn.mutex.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
newHnr := make(map[cdb.UserTorrentPair]struct{})
|
||||
|
||||
rows := db.mainConn.query(db.loadHnrStmt)
|
||||
rows := db.query(db.loadHnrStmt)
|
||||
if rows == nil {
|
||||
log.Error.Print("Failed to load hit and runs from database")
|
||||
log.WriteStack()
|
||||
|
@ -173,12 +166,9 @@ func (db *Database) loadTorrents() {
|
|||
|
||||
newTorrents := make(map[cdb.TorrentHash]*cdb.Torrent, len(dbTorrents))
|
||||
|
||||
db.mainConn.mutex.Lock()
|
||||
defer db.mainConn.mutex.Unlock()
|
||||
|
||||
start = time.Now()
|
||||
|
||||
rows := db.mainConn.query(db.loadTorrentsStmt)
|
||||
rows := db.query(db.loadTorrentsStmt)
|
||||
if rows == nil {
|
||||
log.Error.Print("Failed to load torrents from database")
|
||||
log.WriteStack()
|
||||
|
@ -261,13 +251,10 @@ func (db *Database) loadTorrents() {
|
|||
}
|
||||
|
||||
func (db *Database) loadGroupsFreeleech() {
|
||||
db.mainConn.mutex.Lock()
|
||||
defer db.mainConn.mutex.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
newTorrentGroupFreeleech := make(map[cdb.TorrentGroupKey]*cdb.TorrentGroupFreeleech)
|
||||
|
||||
rows := db.mainConn.query(db.loadTorrentGroupFreeleechStmt)
|
||||
rows := db.query(db.loadTorrentGroupFreeleechStmt)
|
||||
if rows == nil {
|
||||
log.Error.Print("Failed to load torrent group freeleech data from database")
|
||||
log.WriteStack()
|
||||
|
@ -311,10 +298,7 @@ func (db *Database) loadGroupsFreeleech() {
|
|||
}
|
||||
|
||||
func (db *Database) loadConfig() {
|
||||
db.mainConn.mutex.Lock()
|
||||
defer db.mainConn.mutex.Unlock()
|
||||
|
||||
rows := db.mainConn.query(db.loadFreeleechStmt)
|
||||
rows := db.query(db.loadFreeleechStmt)
|
||||
if rows == nil {
|
||||
log.Error.Print("Failed to load config from database")
|
||||
log.WriteStack()
|
||||
|
@ -339,13 +323,10 @@ func (db *Database) loadConfig() {
|
|||
}
|
||||
|
||||
func (db *Database) loadClients() {
|
||||
db.mainConn.mutex.Lock()
|
||||
defer db.mainConn.mutex.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
newClients := make(map[uint16]string)
|
||||
|
||||
rows := db.mainConn.query(db.loadClientsStmt)
|
||||
rows := db.query(db.loadClientsStmt)
|
||||
if rows == nil {
|
||||
log.Error.Print("Failed to load clients from database")
|
||||
log.WriteStack()
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"chihaya/util"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
@ -37,10 +38,9 @@ func init() {
|
|||
|
||||
func (db *Database) startSerializing() {
|
||||
go func() {
|
||||
for !db.terminate.Load() {
|
||||
time.Sleep(time.Duration(serializeInterval) * time.Second)
|
||||
util.ContextTick(db.ctx, time.Duration(serializeInterval)*time.Second, func() {
|
||||
db.serialize()
|
||||
}
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
|
|
20
util/context_ticker.go
Normal file
20
util/context_ticker.go
Normal file
|
@ -0,0 +1,20 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
func ContextTick(ctx context.Context, d time.Duration, onTick func()) {
|
||||
ticker := time.NewTicker(d)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
onTick()
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue