Sync with 20ab505937c31615d8f385737ccb3706848016bb

This commit is contained in:
AnimeBytes 2023-06-13 03:08:32 +02:00
parent 4e88c8babd
commit 8d901abcb8
18 changed files with 267 additions and 281 deletions

Binary file not shown.

View file

@ -21,6 +21,7 @@ import (
"bytes"
"database/sql"
"fmt"
"github.com/viney-shih/go-lock"
"os"
"sync"
"sync/atomic"
@ -41,9 +42,8 @@ type Connection struct {
}
type Database struct {
TorrentsSemaphore util.Semaphore
ClientsSemaphore util.Semaphore
UsersSemaphore util.Semaphore
TorrentsLock lock.RWMutex
UsersLock lock.RWMutex
snatchChannel chan *bytes.Buffer
transferHistoryChannel chan *bytes.Buffer
@ -70,7 +70,7 @@ type Database struct {
bufferPool *util.BufferPool
transferHistorySemaphore util.Semaphore
transferHistoryLock lock.Mutex
terminate bool
waitGroup sync.WaitGroup
@ -96,10 +96,9 @@ func (db *Database) Init() {
db.mainConn = Open()
// Initializing semaphore channels
db.TorrentsSemaphore = util.NewSemaphore()
db.UsersSemaphore = util.NewSemaphore()
db.ClientsSemaphore = util.NewSemaphore()
// Initializing locks
db.TorrentsLock = lock.NewCASMutex()
db.UsersLock = lock.NewCASMutex()
// Used for recording updates, so the max required size should be < 128 bytes. See queue.go for details
db.bufferPool = util.NewBufferPool(128)

View file

@ -298,7 +298,17 @@ func TestUnPrune(t *testing.T) {
prepareTestDatabase()
h := cdb.TorrentHash{114, 239, 32, 237, 220, 181, 67, 143, 115, 182, 216, 141, 120, 196, 223, 193, 102, 123, 137, 56}
torrent := *db.Torrents[h]
torrent := cdb.Torrent{
Seeders: db.Torrents[h].Seeders,
Leechers: db.Torrents[h].Leechers,
Group: db.Torrents[h].Group,
ID: db.Torrents[h].ID,
Snatched: db.Torrents[h].Snatched,
Status: db.Torrents[h].Status,
LastAction: db.Torrents[h].LastAction,
UpMultiplier: db.Torrents[h].UpMultiplier,
DownMultiplier: db.Torrents[h].DownMultiplier,
}
torrent.Status = 0
db.UnPrune(db.Torrents[h])

View file

@ -20,13 +20,13 @@ package database
import (
"bytes"
"errors"
"github.com/viney-shih/go-lock"
"time"
"chihaya/collectors"
"chihaya/config"
cdb "chihaya/database/types"
"chihaya/log"
"chihaya/util"
)
var (
@ -82,7 +82,7 @@ func (db *Database) startFlushing() {
db.transferIpsChannel = make(chan *bytes.Buffer, transferIpsFlushBufferSize)
db.snatchChannel = make(chan *bytes.Buffer, snatchFlushBufferSize)
db.transferHistorySemaphore = util.NewSemaphore()
db.transferHistoryLock = lock.NewCASMutex()
go db.flushTorrents()
go db.flushUsers()
@ -289,8 +289,8 @@ func (db *Database) flushTransferHistory() {
for {
length, err := func() (int, error) {
util.TakeSemaphore(db.transferHistorySemaphore)
defer util.ReturnSemaphore(db.transferHistorySemaphore)
db.transferHistoryLock.Lock()
defer db.transferHistoryLock.Unlock()
query.Reset()
query.WriteString("INSERT INTO transfer_history (uid, fid, uploaded, downloaded, " +
@ -495,36 +495,42 @@ func (db *Database) purgeInactivePeers() {
// First, remove inactive peers from memory
func() {
util.TakeSemaphore(db.TorrentsSemaphore)
defer util.ReturnSemaphore(db.TorrentsSemaphore)
db.TorrentsLock.RLock()
defer db.TorrentsLock.RUnlock()
for _, torrent := range db.Torrents {
countThisTorrent := count
func() {
//Take write lock to operate on entries
torrent.Lock()
defer torrent.Unlock()
for id, peer := range torrent.Leechers {
if peer.LastAnnounce < oldestActive {
delete(torrent.Leechers, id)
count++
countThisTorrent := count
for id, peer := range torrent.Leechers {
if peer.LastAnnounce < oldestActive {
delete(torrent.Leechers, id)
count++
}
}
}
if countThisTorrent != count && len(torrent.Leechers) == 0 {
/* Deallocate previous map since Go does not free space used on maps when deleting objects.
We're doing it only for Leechers as potential advantage from freeing one or two Seeders is
virtually nil, while Leechers can incur significant memory leaks due to initial swarm activity. */
torrent.Leechers = make(map[cdb.PeerKey]*cdb.Peer)
}
for id, peer := range torrent.Seeders {
if peer.LastAnnounce < oldestActive {
delete(torrent.Seeders, id)
count++
if countThisTorrent != count && len(torrent.Leechers) == 0 {
/* Deallocate previous map since Go does not free space used on maps when deleting objects.
We're doing it only for Leechers as potential advantage from freeing one or two Seeders is
virtually nil, while Leechers can incur significant memory leaks due to initial swarm activity. */
torrent.Leechers = make(map[cdb.PeerKey]*cdb.Peer)
}
}
if countThisTorrent != count {
db.QueueTorrent(torrent, 0)
}
for id, peer := range torrent.Seeders {
if peer.LastAnnounce < oldestActive {
delete(torrent.Seeders, id)
count++
}
}
if countThisTorrent != count {
db.QueueTorrent(torrent, 0)
}
}()
}
}()
@ -538,8 +544,8 @@ func (db *Database) purgeInactivePeers() {
defer db.waitGroup.Done()
// Wait to prevent a race condition where the user has announced but their announce time hasn't been flushed yet
util.TakeSemaphore(db.transferHistorySemaphore)
defer util.ReturnSemaphore(db.transferHistorySemaphore)
db.transferHistoryLock.Lock()
defer db.transferHistoryLock.Unlock()
db.mainConn.mutex.Lock()
defer db.mainConn.mutex.Unlock()

View file

@ -49,7 +49,13 @@ func (db *Database) QueueTorrent(torrent *cdb.Torrent, deltaSnatch uint8) {
tq.WriteString(strconv.FormatInt(torrent.LastAction, 10))
tq.WriteString(")")
db.torrentChannel <- tq
select {
case db.torrentChannel <- tq:
default:
go func() {
db.torrentChannel <- tq
}()
}
}
func (db *Database) QueueUser(user *cdb.User, rawDeltaUp, rawDeltaDown, deltaUp, deltaDown int64) {
@ -67,7 +73,13 @@ func (db *Database) QueueUser(user *cdb.User, rawDeltaUp, rawDeltaDown, deltaUp,
uq.WriteString(strconv.FormatInt(rawDeltaUp, 10))
uq.WriteString(")")
db.userChannel <- uq
select {
case db.userChannel <- uq:
default:
go func() {
db.userChannel <- uq
}()
}
}
func (db *Database) QueueTransferHistory(peer *cdb.Peer, rawDeltaUp, rawDeltaDown, deltaTime, deltaSeedTime int64,
@ -100,7 +112,13 @@ func (db *Database) QueueTransferHistory(peer *cdb.Peer, rawDeltaUp, rawDeltaDow
th.WriteString(strconv.FormatUint(peer.Left, 10))
th.WriteString(")")
db.transferHistoryChannel <- th
select {
case db.transferHistoryChannel <- th:
default:
go func() {
db.transferHistoryChannel <- th
}()
}
}
func (db *Database) QueueTransferIP(peer *cdb.Peer, rawDeltaUp, rawDeltaDown int64) {
@ -126,7 +144,13 @@ func (db *Database) QueueTransferIP(peer *cdb.Peer, rawDeltaUp, rawDeltaDown int
ti.WriteString(strconv.FormatInt(peer.LastAnnounce, 10))
ti.WriteString(")")
db.transferIpsChannel <- ti
select {
case db.transferIpsChannel <- ti:
default:
go func() {
db.transferIpsChannel <- ti
}()
}
}
func (db *Database) QueueSnatch(peer *cdb.Peer, now int64) {
@ -140,7 +164,13 @@ func (db *Database) QueueSnatch(peer *cdb.Peer, now int64) {
sn.WriteString(strconv.FormatInt(now, 10))
sn.WriteString(")")
db.snatchChannel <- sn
select {
case db.snatchChannel <- sn:
default:
go func() {
db.snatchChannel <- sn
}()
}
}
func (db *Database) UnPrune(torrent *cdb.Torrent) {

View file

@ -25,7 +25,6 @@ import (
"chihaya/config"
cdb "chihaya/database/types"
"chihaya/log"
"chihaya/util"
)
var (
@ -66,8 +65,8 @@ func (db *Database) startReloading() {
}
func (db *Database) loadUsers() {
util.TakeSemaphore(db.UsersSemaphore)
defer util.ReturnSemaphore(db.UsersSemaphore)
db.UsersLock.Lock()
defer db.UsersLock.Unlock()
db.mainConn.mutex.Lock()
defer db.mainConn.mutex.Unlock()
@ -168,75 +167,87 @@ func (db *Database) loadHitAndRuns() {
}
func (db *Database) loadTorrents() {
util.TakeSemaphore(db.TorrentsSemaphore)
defer util.ReturnSemaphore(db.TorrentsSemaphore)
var start time.Time
db.mainConn.mutex.Lock()
defer db.mainConn.mutex.Unlock()
start := time.Now()
newTorrents := make(map[cdb.TorrentHash]*cdb.Torrent)
rows := db.mainConn.query(db.loadTorrentsStmt)
if rows == nil {
log.Error.Print("Failed to load torrents from database")
log.WriteStack()
func() {
db.TorrentsLock.RLock()
defer db.TorrentsLock.RUnlock()
return
}
db.mainConn.mutex.Lock()
defer db.mainConn.mutex.Unlock()
defer func() {
_ = rows.Close()
}()
start = time.Now()
for rows.Next() {
var (
infoHash cdb.TorrentHash
id uint32
downMultiplier, upMultiplier float64
snatched uint16
status uint8
group cdb.TorrentGroup
)
if err := rows.Scan(
&id,
&infoHash,
&downMultiplier,
&upMultiplier,
&snatched,
&status,
&group.GroupID,
&group.TorrentType,
); err != nil {
log.Error.Printf("Error scanning torrent row: %s", err)
rows := db.mainConn.query(db.loadTorrentsStmt)
if rows == nil {
log.Error.Print("Failed to load torrents from database")
log.WriteStack()
return
}
if old, exists := db.Torrents[infoHash]; exists && old != nil {
old.ID = id
old.DownMultiplier = downMultiplier
old.UpMultiplier = upMultiplier
old.Snatched = snatched
old.Status = status
old.Group = group
defer func() {
_ = rows.Close()
}()
newTorrents[infoHash] = old
} else {
newTorrents[infoHash] = &cdb.Torrent{
ID: id,
UpMultiplier: upMultiplier,
DownMultiplier: downMultiplier,
Snatched: snatched,
Status: status,
Group: group,
for rows.Next() {
var (
infoHash cdb.TorrentHash
id uint32
downMultiplier, upMultiplier float64
snatched uint16
status uint8
group cdb.TorrentGroup
)
Seeders: make(map[cdb.PeerKey]*cdb.Peer),
Leechers: make(map[cdb.PeerKey]*cdb.Peer),
if err := rows.Scan(
&id,
&infoHash,
&downMultiplier,
&upMultiplier,
&snatched,
&status,
&group.GroupID,
&group.TorrentType,
); err != nil {
log.Error.Printf("Error scanning torrent row: %s", err)
log.WriteStack()
}
if old, exists := db.Torrents[infoHash]; exists && old != nil {
func() {
old.Lock()
defer old.Unlock()
old.ID = id
old.DownMultiplier = downMultiplier
old.UpMultiplier = upMultiplier
old.Snatched = snatched
old.Status = status
old.Group = group
}()
newTorrents[infoHash] = old
} else {
newTorrents[infoHash] = &cdb.Torrent{
ID: id,
UpMultiplier: upMultiplier,
DownMultiplier: downMultiplier,
Snatched: snatched,
Status: status,
Group: group,
Seeders: make(map[cdb.PeerKey]*cdb.Peer),
Leechers: make(map[cdb.PeerKey]*cdb.Peer),
}
}
}
}
}()
db.TorrentsLock.Lock()
defer db.TorrentsLock.Unlock()
db.Torrents = newTorrents
elapsedTime := time.Since(start)
@ -316,9 +327,6 @@ func (db *Database) loadConfig() {
}
func (db *Database) loadClients() {
util.TakeSemaphore(db.ClientsSemaphore)
defer util.ReturnSemaphore(db.ClientsSemaphore)
db.mainConn.mutex.Lock()
defer db.mainConn.mutex.Unlock()

View file

@ -26,7 +26,6 @@ import (
"chihaya/config"
cdb "chihaya/database/types"
"chihaya/log"
"chihaya/util"
)
var serializeInterval int
@ -69,8 +68,8 @@ func (db *Database) serialize() {
torrentFile.Close()
}()
util.TakeSemaphore(db.TorrentsSemaphore)
defer util.ReturnSemaphore(db.TorrentsSemaphore)
db.TorrentsLock.RLock()
defer db.TorrentsLock.RUnlock()
if err = cdb.WriteTorrents(torrentFile, db.Torrents); err != nil {
log.Error.Print("Failed to encode torrents for serialization: ", err)
@ -97,8 +96,8 @@ func (db *Database) serialize() {
userFile.Close()
}()
util.TakeSemaphore(db.UsersSemaphore)
defer util.ReturnSemaphore(db.UsersSemaphore)
db.UsersLock.RLock()
defer db.UsersLock.RUnlock()
if err = cdb.WriteUsers(userFile, db.Users); err != nil {
log.Error.Print("Failed to encode users for serialization: ", err)
@ -135,8 +134,8 @@ func (db *Database) deserialize() {
//goland:noinspection GoUnhandledErrorResult
defer torrentFile.Close()
util.TakeSemaphore(db.TorrentsSemaphore)
defer util.ReturnSemaphore(db.TorrentsSemaphore)
db.TorrentsLock.Lock()
defer db.TorrentsLock.Unlock()
if err = cdb.LoadTorrents(torrentFile, db.Torrents); err != nil {
log.Error.Print("Failed to deserialize torrent cache: ", err)
@ -154,8 +153,8 @@ func (db *Database) deserialize() {
//goland:noinspection GoUnhandledErrorResult
defer userFile.Close()
util.TakeSemaphore(db.UsersSemaphore)
defer util.ReturnSemaphore(db.UsersSemaphore)
db.UsersLock.Lock()
defer db.UsersLock.Unlock()
if err = cdb.LoadUsers(userFile, db.Users); err != nil {
log.Error.Print("Failed to deserialize user cache: ", err)
@ -163,20 +162,25 @@ func (db *Database) deserialize() {
}
}()
util.TakeSemaphore(db.TorrentsSemaphore)
defer util.ReturnSemaphore(db.TorrentsSemaphore)
util.TakeSemaphore(db.UsersSemaphore)
defer util.ReturnSemaphore(db.UsersSemaphore)
db.TorrentsLock.RLock()
defer db.TorrentsLock.RUnlock()
torrents := len(db.Torrents)
users := len(db.Users)
peers := 0
for _, t := range db.Torrents {
peers += len(t.Leechers) + len(t.Seeders)
func() {
t.RLock()
defer t.RUnlock()
peers += len(t.Leechers) + len(t.Seeders)
}()
}
db.UsersLock.RLock()
defer db.UsersLock.RUnlock()
users := len(db.Users)
log.Info.Printf("Loaded %d users, %d torrents and %d peers (%s)",
users, torrents, peers, time.Since(start).String())
}

View file

@ -24,6 +24,7 @@ import (
"errors"
"io"
"math"
"sync"
)
const TorrentHashSize = 20
@ -128,6 +129,26 @@ type Torrent struct {
UpMultiplier float64
DownMultiplier float64
// lock This must be taken whenever read or write is made to fields on this torrent.
// Maybe single sync.Mutex is easier to handle, but prevent concurrent access.
lock sync.RWMutex
}
func (t *Torrent) Lock() {
t.lock.Lock()
}
func (t *Torrent) Unlock() {
t.lock.Unlock()
}
func (t *Torrent) RLock() {
t.lock.RLock()
}
func (t *Torrent) RUnlock() {
t.lock.RUnlock()
}
func (t *Torrent) Load(version uint64, reader readerAndByteReader) (err error) {
@ -202,6 +223,9 @@ func (t *Torrent) Load(version uint64, reader readerAndByteReader) (err error) {
}
func (t *Torrent) Append(preAllocatedBuffer []byte) (buf []byte) {
t.RLock()
defer t.RUnlock()
buf = preAllocatedBuffer
buf = binary.AppendUvarint(buf, uint64(len(t.Seeders)))

2
go.mod
View file

@ -8,6 +8,7 @@ require (
github.com/jinzhu/copier v0.3.5
github.com/prometheus/client_golang v1.15.1
github.com/prometheus/common v0.44.0
github.com/viney-shih/go-lock v1.1.2
github.com/zeebo/bencode v1.0.0
)
@ -33,6 +34,7 @@ require (
github.com/shopspring/decimal v1.3.1 // indirect
go.opentelemetry.io/otel v1.15.0 // indirect
go.opentelemetry.io/otel/trace v1.15.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.8.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

7
go.sum
View file

@ -84,9 +84,12 @@ github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/viney-shih/go-lock v1.1.2 h1:3TdGTiHZCPqBdTvFbQZQN/TRZzKF3KWw2rFEyKz3YqA=
github.com/viney-shih/go-lock v1.1.2/go.mod h1:Yijm78Ljteb3kRiJrbLAxVntkUukGu5uzSxq/xV7OO8=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
@ -115,8 +118,11 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -147,6 +153,7 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View file

@ -29,7 +29,7 @@ import (
)
var (
enabled = false
enabled = false // global for testing purposes
initialized = false
channel chan []byte
)
@ -39,7 +39,7 @@ func getFile(t time.Time) (*os.File, error) {
}
func Init() {
if enabled, _ = config.GetBool("record", enabled); !enabled {
if enabled, _ := config.GetBool("record", enabled); !enabled {
return
}
@ -92,7 +92,7 @@ func Record(
up,
down,
left uint64) {
if enabled, _ = config.GetBool("record", enabled); !enabled {
if enabled, _ := config.GetBool("record", enabled); !enabled {
return
}

View file

@ -215,10 +215,10 @@ func announce(ctx context.Context, qs string, header http.Header, remoteAddr str
return http.StatusOK // Required by torrent clients to interpret failure response
}
if !util.TryTakeSemaphore(ctx, db.TorrentsSemaphore) {
if !db.TorrentsLock.RTryLockWithContext(ctx) {
return http.StatusRequestTimeout
}
defer util.ReturnSemaphore(db.TorrentsSemaphore)
defer db.TorrentsLock.RUnlock()
torrent, exists := db.Torrents[infoHashes[0]]
if !exists {
@ -226,6 +226,10 @@ func announce(ctx context.Context, qs string, header http.Header, remoteAddr str
return http.StatusOK // Required by torrent clients to interpret failure response
}
// Take torrent lock to read/write on it to prevent race conditions
torrent.Lock()
defer torrent.Unlock()
if torrent.Status == 1 && left == 0 {
log.Info.Printf("Unpruning torrent %d", torrent.ID)
@ -395,10 +399,11 @@ func announce(ctx context.Context, qs string, header http.Header, remoteAddr str
peer.ClientID = clientID
// If the channels are already full, block until a flush occurs
go db.QueueTorrent(torrent, deltaSnatch)
go db.QueueTransferHistory(peer, rawDeltaUpload, rawDeltaDownload, deltaTime, deltaSeedTime, deltaSnatch, active)
go db.QueueUser(user, rawDeltaUpload, rawDeltaDownload, deltaUpload, deltaDownload)
go db.QueueTransferIP(peer, rawDeltaUpload, rawDeltaDownload)
db.QueueTorrent(torrent, deltaSnatch)
db.QueueTransferHistory(peer, rawDeltaUpload, rawDeltaDownload, deltaTime, deltaSeedTime, deltaSnatch, active)
db.QueueUser(user, rawDeltaUpload, rawDeltaDownload, deltaUpload, deltaDownload)
db.QueueTransferIP(peer, rawDeltaUpload, rawDeltaDownload)
go record.Record(
peer.TorrentID,
user.ID,

View file

@ -27,8 +27,6 @@ import (
"chihaya/config"
"chihaya/database"
"chihaya/log"
"chihaya/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
)
@ -36,19 +34,24 @@ import (
var bearerPrefix = "Bearer "
func metrics(ctx context.Context, auth string, db *database.Database, buf *bytes.Buffer) int {
if !util.TryTakeSemaphore(ctx, db.UsersSemaphore) {
if !db.UsersLock.RTryLockWithContext(ctx) {
return http.StatusRequestTimeout
}
defer util.ReturnSemaphore(db.UsersSemaphore)
defer db.UsersLock.RUnlock()
if !util.TryTakeSemaphore(ctx, db.TorrentsSemaphore) {
if !db.TorrentsLock.RTryLockWithContext(ctx) {
return http.StatusRequestTimeout
}
defer util.ReturnSemaphore(db.TorrentsSemaphore)
defer db.TorrentsLock.RUnlock()
peers := 0
for _, t := range db.Torrents {
peers += len(t.Leechers) + len(t.Seeders)
func() {
t.RLock()
defer t.RUnlock()
peers += len(t.Leechers) + len(t.Seeders)
}()
}
// Early exit before response write

View file

@ -26,8 +26,6 @@ import (
"chihaya/database"
cdb "chihaya/database/types"
"chihaya/server/params"
"chihaya/util"
"github.com/zeebo/bencode"
)
@ -39,6 +37,9 @@ func init() {
}
func writeScrapeInfo(torrent *cdb.Torrent) map[string]interface{} {
torrent.RLock()
defer torrent.RUnlock()
ret := make(map[string]interface{})
ret["complete"] = len(torrent.Seeders)
ret["downloaded"] = torrent.Snatched
@ -56,10 +57,10 @@ func scrape(ctx context.Context, qs string, user *cdb.User, db *database.Databas
scrapeData := make(map[string]interface{})
fileData := make(map[cdb.TorrentHash]interface{})
if !util.TryTakeSemaphore(ctx, db.TorrentsSemaphore) {
if !db.TorrentsLock.RTryLockWithContext(ctx) {
return http.StatusRequestTimeout
}
defer util.ReturnSemaphore(db.TorrentsSemaphore)
defer db.TorrentsLock.RUnlock()
if qp.InfoHashes() != nil {
for _, infoHash := range qp.InfoHashes() {

View file

@ -24,7 +24,6 @@ import (
"chihaya/database"
cdb "chihaya/database/types"
"chihaya/util"
"github.com/zeebo/bencode"
)
@ -44,9 +43,6 @@ func failure(err string, buf *bytes.Buffer, interval time.Duration) {
}
func clientApproved(peerID string, db *database.Database) (uint16, bool) {
util.TakeSemaphore(db.ClientsSemaphore)
defer util.ReturnSemaphore(db.ClientsSemaphore)
var (
widLen, i int
matched bool
@ -74,10 +70,10 @@ func clientApproved(peerID string, db *database.Database) (uint16, bool) {
}
func isPasskeyValid(ctx context.Context, passkey string, db *database.Database) (*cdb.User, error) {
if !util.TryTakeSemaphore(ctx, db.UsersSemaphore) {
if !db.UsersLock.RTryLockWithContext(ctx) {
return nil, ctx.Err()
}
defer util.ReturnSemaphore(db.UsersSemaphore)
defer db.UsersLock.RUnlock()
user, exists := db.Users[passkey]
if !exists {

View file

@ -1,53 +0,0 @@
/*
* This file is part of Chihaya.
*
* Chihaya is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Chihaya is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Chihaya. If not, see <http://www.gnu.org/licenses/>.
*/
package util
import (
"context"
)
type Semaphore chan struct{}
func NewSemaphore() (s Semaphore) {
s = make(Semaphore, 1)
s <- struct{}{}
return
}
func TakeSemaphore(s Semaphore) {
<-s
}
func TryTakeSemaphore(ctx context.Context, s Semaphore) bool {
select {
case <-s:
return true
case <-ctx.Done():
return false
}
}
func ReturnSemaphore(s Semaphore) {
select {
case s <- struct{}{}:
return
default:
panic("Attempting to return semaphore to an already full channel")
}
}

View file

@ -1,81 +0,0 @@
/*
* This file is part of Chihaya.
*
* Chihaya is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Chihaya is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Chihaya. If not, see <http://www.gnu.org/licenses/>.
*/
package util
import (
"context"
"testing"
"time"
)
func TestTakeReturnSemaphore(t *testing.T) {
var (
testSemaphore = NewSemaphore()
panickedOnFull = false
)
TakeSemaphore(testSemaphore)
if len(testSemaphore) != 0 {
t.Fatalf(
"Semaphore channel length incorrect after consuming synchronously: is %v but should be 0",
len(testSemaphore))
}
ReturnSemaphore(testSemaphore)
if len(testSemaphore) != 1 {
t.Fatalf(
"Semaphore channel length incorrect after returning synchronously: is %v but should be 1",
len(testSemaphore))
}
defer func() {
if r := recover(); r != nil {
panickedOnFull = true
}
}()
ReturnSemaphore(testSemaphore)
if !panickedOnFull {
t.Fatalf("ReturnSemaphore must panic when attempting to return to an already full channel")
}
}
func TestTryTakeSemaphore(t *testing.T) {
var (
testSemaphore = NewSemaphore()
ctx context.Context
cancelFunc context.CancelFunc
)
ctx, cancelFunc = context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancelFunc()
if !TryTakeSemaphore(ctx, testSemaphore) {
t.Fatalf("Failed to consume semaphore: %v; channel length at %v", ctx.Err(), len(testSemaphore))
}
if len(testSemaphore) != 0 {
t.Fatalf(
"Semaphore channel length incorrect after taking asynchronously: is %v but should be 0",
len(testSemaphore))
}
}

View file

@ -19,31 +19,56 @@ package util
import (
unsafeRandom "math/rand"
"sync"
"time"
)
var randomSource = unsafeRandom.New(unsafeRandom.NewSource(time.Now().Unix()))
var randomSourcePool sync.Pool
func init() {
randomSourcePool.New = func() any {
return unsafeRandom.New(unsafeRandom.NewSource(time.Now().UnixNano()))
}
}
func UnsafeInt() int {
randomSource := randomSourcePool.Get().(*unsafeRandom.Rand)
defer randomSourcePool.Put(randomSource)
return randomSource.Int()
}
func UnsafeIntn(n int) int {
randomSource := randomSourcePool.Get().(*unsafeRandom.Rand)
defer randomSourcePool.Put(randomSource)
return randomSource.Intn(n)
}
func UnsafeUint32() uint32 {
randomSource := randomSourcePool.Get().(*unsafeRandom.Rand)
defer randomSourcePool.Put(randomSource)
return randomSource.Uint32()
}
func UnsafeUint64() uint64 {
randomSource := randomSourcePool.Get().(*unsafeRandom.Rand)
defer randomSourcePool.Put(randomSource)
return randomSource.Uint64()
}
func UnsafeRand(min int, max int) int {
randomSource := randomSourcePool.Get().(*unsafeRandom.Rand)
defer randomSourcePool.Put(randomSource)
return randomSource.Intn(max-min+1) + min
}
func UnsafeReadRand(p []byte) (n int, err error) {
randomSource := randomSourcePool.Get().(*unsafeRandom.Rand)
defer randomSourcePool.Put(randomSource)
return randomSource.Read(p)
}