database: made terminate atomic.Bool

This commit is contained in:
DataHoarder 2023-06-09 14:54:47 +02:00
parent 4e9c8cae0e
commit cbdbc7f410
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
4 changed files with 21 additions and 21 deletions

View file

@ -68,7 +68,7 @@ type Database struct {
transferHistoryLock sync.Mutex
terminate bool
terminate atomic.Bool
waitGroup sync.WaitGroup
}
@ -86,7 +86,7 @@ var defaultDsn = map[string]string{
}
func (db *Database) Init() {
db.terminate = false
db.terminate.Store(false)
log.Info.Print("Opening database connection...")
@ -179,7 +179,7 @@ func (db *Database) Init() {
func (db *Database) Terminate() {
log.Info.Print("Terminating database connection...")
db.terminate = true
db.terminate.Store(true)
log.Info.Print("Closing all flush channels...")
db.closeFlushChannels()

View file

@ -149,7 +149,7 @@ func (db *Database) flushTorrents() {
if count > 0 {
logFlushes, _ := config.GetBool("log_flushes", true)
if logFlushes && !db.terminate {
if logFlushes && !db.terminate.Load() {
log.Info.Printf("{torrents} Flushing %d", count)
}
@ -169,7 +169,7 @@ func (db *Database) flushTorrents() {
"WHERE t.ID = ft.ID")
conn.exec(&query)
if !db.terminate {
if !db.terminate.Load() {
elapsedTime := time.Since(startTime)
collectors.UpdateFlushTime("torrents", elapsedTime)
collectors.UpdateChannelsLen("torrents", count)
@ -178,7 +178,7 @@ func (db *Database) flushTorrents() {
if length < (torrentFlushBufferSize >> 1) {
time.Sleep(time.Duration(flushSleepInterval) * time.Second)
}
} else if db.terminate {
} else if db.terminate.Load() {
break
} else {
time.Sleep(time.Second)
@ -235,7 +235,7 @@ func (db *Database) flushUsers() {
if count > 0 {
logFlushes, _ := config.GetBool("log_flushes", true)
if logFlushes && !db.terminate {
if logFlushes && !db.terminate.Load() {
log.Info.Printf("{users_main} Flushing %d", count)
}
@ -254,7 +254,7 @@ func (db *Database) flushUsers() {
"WHERE u.ID = fu.ID")
conn.exec(&query)
if !db.terminate {
if !db.terminate.Load() {
elapsedTime := time.Since(startTime)
collectors.UpdateFlushTime("users", elapsedTime)
collectors.UpdateChannelsLen("users", count)
@ -263,7 +263,7 @@ func (db *Database) flushUsers() {
if length < (userFlushBufferSize >> 1) {
time.Sleep(time.Duration(flushSleepInterval) * time.Second)
}
} else if db.terminate {
} else if db.terminate.Load() {
break
} else {
time.Sleep(time.Second)
@ -311,7 +311,7 @@ func (db *Database) flushTransferHistory() {
if count > 0 {
logFlushes, _ := config.GetBool("log_flushes", true)
if logFlushes && !db.terminate {
if logFlushes && !db.terminate.Load() {
log.Info.Printf("{transfer_history} Flushing %d", count)
}
@ -325,14 +325,14 @@ func (db *Database) flushTransferHistory() {
conn.exec(&query)
if !db.terminate {
if !db.terminate.Load() {
elapsedTime := time.Since(startTime)
collectors.UpdateFlushTime("transfer_history", elapsedTime)
collectors.UpdateChannelsLen("transfer_history", count)
}
return length, nil
} else if db.terminate {
} else if db.terminate.Load() {
return 0, errDbTerminate
}
@ -385,7 +385,7 @@ func (db *Database) flushTransferIps() {
if count > 0 {
logFlushes, _ := config.GetBool("log_flushes", true)
if logFlushes && !db.terminate {
if logFlushes && !db.terminate.Load() {
log.Info.Printf("{transfer_ips} Flushing %d", count)
}
@ -396,7 +396,7 @@ func (db *Database) flushTransferIps() {
"uploaded = uploaded + VALUE(uploaded), last_announce = VALUE(last_announce)")
conn.exec(&query)
if !db.terminate {
if !db.terminate.Load() {
elapsedTime := time.Since(startTime)
collectors.UpdateFlushTime("transfer_ips", elapsedTime)
collectors.UpdateChannelsLen("transfer_ips", count)
@ -405,7 +405,7 @@ func (db *Database) flushTransferIps() {
if length < (transferIpsFlushBufferSize >> 1) {
time.Sleep(time.Duration(flushSleepInterval) * time.Second)
}
} else if db.terminate {
} else if db.terminate.Load() {
break
} else {
time.Sleep(time.Second)
@ -448,7 +448,7 @@ func (db *Database) flushSnatches() {
if count > 0 {
logFlushes, _ := config.GetBool("log_flushes", true)
if logFlushes && !db.terminate {
if logFlushes && !db.terminate.Load() {
log.Info.Printf("{snatches} Flushing %d", count)
}
@ -457,7 +457,7 @@ func (db *Database) flushSnatches() {
query.WriteString("\nON DUPLICATE KEY UPDATE snatched_time = VALUE(snatched_time)")
conn.exec(&query)
if !db.terminate {
if !db.terminate.Load() {
elapsedTime := time.Since(startTime)
collectors.UpdateFlushTime("snatches", elapsedTime)
collectors.UpdateChannelsLen("snatches", count)
@ -466,7 +466,7 @@ func (db *Database) flushSnatches() {
if length < (snatchFlushBufferSize >> 1) {
time.Sleep(time.Duration(flushSleepInterval) * time.Second)
}
} else if db.terminate {
} else if db.terminate.Load() {
break
} else {
time.Sleep(time.Second)
@ -483,7 +483,7 @@ func (db *Database) purgeInactivePeers() {
count int
)
for !db.terminate {
for !db.terminate.Load() {
start = time.Now()
now = start.Unix()
count = 0

View file

@ -50,7 +50,7 @@ func init() {
*/
func (db *Database) startReloading() {
go func() {
for !db.terminate {
for !db.terminate.Load() {
time.Sleep(time.Duration(reloadInterval) * time.Second)
db.waitGroup.Add(1)

View file

@ -37,7 +37,7 @@ func init() {
func (db *Database) startSerializing() {
go func() {
for !db.terminate {
for !db.terminate.Load() {
time.Sleep(time.Duration(serializeInterval) * time.Second)
db.serialize()
}