database/types: Peer is now 48 bytes, removed TorrentID / UserID / ID, made LastAnnounce relative
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
DataHoarder 2023-06-16 22:20:39 +02:00
parent f39eb883f9
commit 4135b8d1a5
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
7 changed files with 200 additions and 159 deletions

View file

@ -159,30 +159,30 @@ func main() {
for _, torrent := range t {
newSeeders := make(map[cdb.PeerKey]*cdb.Peer)
for _, s := range torrent.Seeders {
s.UserID = anonUserMapping[s.UserID]
for k, s := range torrent.Seeders {
userID := anonUserMapping[k.ID()]
// Replace IP
binary.BigEndian.PutUint32(s.Addr[:], util.UnsafeUint32())
// Replace Port with valid random port
binary.BigEndian.PutUint16(s.Addr[4:], uint16(util.UnsafeRand(1024, math.MaxUint16-1)))
// Replaces userID in map key
newSeeders[cdb.NewPeerKey(s.UserID, s.ID)] = s
newSeeders[cdb.NewPeerKey(userID, k.PeerID())] = s
}
torrent.Seeders = newSeeders
newLeechers := make(map[cdb.PeerKey]*cdb.Peer)
for _, s := range torrent.Leechers {
s.UserID = anonUserMapping[s.UserID]
for k, s := range torrent.Leechers {
userID := anonUserMapping[k.ID()]
// Replace IP
binary.BigEndian.PutUint32(s.Addr[:], util.UnsafeUint32())
// Replace Port with valid random port
binary.BigEndian.PutUint16(s.Addr[4:], uint16(util.UnsafeRand(1024, math.MaxUint16-1)))
// Replaces userID in map key
newLeechers[cdb.NewPeerKey(s.UserID, s.ID)] = s
newLeechers[cdb.NewPeerKey(userID, k.PeerID())] = s
}
torrent.Leechers = newLeechers

View file

@ -438,13 +438,13 @@ func TestRecordAndFlushUsers(t *testing.T) {
func TestRecordAndFlushTransferHistory(t *testing.T) {
prepareTestDatabase()
userID := uint32(1)
torrentID := uint32(1)
testPeer := &cdb.Peer{
UserID: 1,
TorrentID: 1,
Seeding: true,
StartTime: time.Now().Unix(),
LastAnnounce: time.Now().Unix(),
Left: 65000,
Seeding: true,
StartTime: time.Now().Unix(),
Left: 65000,
}
var (
@ -476,7 +476,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
deltaSeedTime = 15
row := db.conn.QueryRow("SELECT uploaded, downloaded, activetime, seedtime, active, snatched "+
"FROM transfer_history WHERE uid = ? AND fid = ?", testPeer.UserID, testPeer.TorrentID)
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
err := row.Scan(&initRawUpload, &initRawDownload, &initActiveTime, &initSeedTime, &initActive, &initSnatch)
if err != nil {
@ -484,6 +484,8 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
}
db.QueueTransferHistory(testPeer,
userID,
torrentID,
deltaRawUpload,
deltaRawDownload,
deltaActiveTime,
@ -497,7 +499,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
time.Sleep(200 * time.Millisecond)
row = db.conn.QueryRow("SELECT uploaded, downloaded, activetime, seedtime, active, snatched "+
"FROM transfer_history WHERE uid = ? AND fid = ?", testPeer.UserID, testPeer.TorrentID)
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
err = row.Scan(&rawUpload, &rawDownload, &activeTime, &seedTime, &active, &snatch)
if err != nil {
@ -550,21 +552,21 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
// Check if existing peer being updated properly
gotPeer := &cdb.Peer{
UserID: testPeer.UserID,
TorrentID: testPeer.TorrentID,
StartTime: testPeer.StartTime,
}
var gotStartTime int64
var gotStartTime, gotLastAnnounce int64
row = db.conn.QueryRow("SELECT seeding, starttime, last_announce, remaining "+
"FROM transfer_history WHERE uid = ? AND fid = ?", gotPeer.UserID, gotPeer.TorrentID)
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
err = row.Scan(&gotPeer.Seeding, &gotStartTime, &gotPeer.LastAnnounce, &gotPeer.Left)
err = row.Scan(&gotPeer.Seeding, &gotStartTime, gotLastAnnounce, &gotPeer.Left)
if err != nil {
panic(err)
}
gotPeer.SetLastAnnounce(gotLastAnnounce)
if !reflect.DeepEqual(testPeer, gotPeer) {
t.Fatal(fixtureFailure("Existing peer incorrectly updated in the database", testPeer, gotPeer))
}
@ -573,18 +575,20 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
t.Fatal(fixtureFailure("Start time incorrectly updated for existing peer", 1584996101, gotStartTime))
}
userID = 0
torrentID = 2
// Now test for new peer not in database
testPeer = &cdb.Peer{
UserID: 0,
TorrentID: 2,
Seeding: true,
StartTime: time.Now().Unix(),
LastAnnounce: time.Now().Unix(),
Left: 65000,
Seeding: true,
StartTime: time.Now().Unix(),
Left: 65000,
}
db.QueueTransferHistory(
testPeer,
userID,
torrentID,
0,
1000,
1,
@ -592,10 +596,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
1,
true)
gotPeer = &cdb.Peer{
UserID: testPeer.UserID,
TorrentID: testPeer.TorrentID,
}
gotPeer = &cdb.Peer{}
for len(db.transferHistoryChannel) > 0 {
time.Sleep(time.Second)
@ -603,13 +604,15 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
time.Sleep(200 * time.Millisecond)
row = db.conn.QueryRow("SELECT seeding, starttime, last_announce, remaining "+
"FROM transfer_history WHERE uid = ? AND fid = ?", gotPeer.UserID, gotPeer.TorrentID)
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
err = row.Scan(&gotPeer.Seeding, &gotPeer.StartTime, &gotPeer.LastAnnounce, &gotPeer.Left)
err = row.Scan(&gotPeer.Seeding, &gotPeer.StartTime, &gotLastAnnounce, &gotPeer.Left)
if err != nil {
panic(err)
}
gotPeer.SetLastAnnounce(gotLastAnnounce)
if !reflect.DeepEqual(testPeer, gotPeer) {
t.Fatal(fixtureFailure("New peer incorrectly inserted in the database", testPeer, gotPeer))
}
@ -618,13 +621,13 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
func TestRecordAndFlushTransferIP(t *testing.T) {
prepareTestDatabase()
userID := uint32(0)
torrentID := uint32(0)
testPeer := &cdb.Peer{
UserID: 0,
TorrentID: 0,
ClientID: 1,
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
StartTime: time.Now().Unix(),
LastAnnounce: time.Now().Unix(),
ClientID: 1,
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
StartTime: time.Now().Unix(),
}
var (
@ -641,14 +644,14 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
row := db.conn.QueryRow("SELECT uploaded, downloaded "+
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
userID, torrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
err := row.Scan(&initUpload, &initDownload)
if err != nil {
panic(err)
}
db.QueueTransferIP(testPeer, deltaUpload, deltaDownload)
db.QueueTransferIP(testPeer, userID, torrentID, deltaUpload, deltaDownload)
for len(db.transferIpsChannel) > 0 {
time.Sleep(time.Second)
@ -657,7 +660,7 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
row = db.conn.QueryRow("SELECT uploaded, downloaded "+
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
userID, torrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
err = row.Scan(&upload, &download)
if err != nil {
@ -682,26 +685,26 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
// Check if existing peer being updated properly
gotPeer := &cdb.Peer{
UserID: testPeer.UserID,
TorrentID: testPeer.TorrentID,
ClientID: testPeer.ClientID,
Addr: cdb.NewPeerAddressFromIPPort(testPeer.Addr.IP(), 0),
StartTime: testPeer.StartTime,
}
var gotStartTime int64
var gotStartTime, gotLastAnnounce int64
row = db.conn.QueryRow("SELECT port, starttime, last_announce "+
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
userID, torrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
var port uint16
err = row.Scan(&port, &gotStartTime, &gotPeer.LastAnnounce)
err = row.Scan(&port, &gotStartTime, &gotLastAnnounce)
if err != nil {
panic(err)
}
gotPeer.SetLastAnnounce(gotLastAnnounce)
gotPeer.Addr = cdb.NewPeerAddressFromIPPort(gotPeer.Addr.IP(), port)
if !reflect.DeepEqual(testPeer, gotPeer) {
@ -712,17 +715,17 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
t.Fatal(fixtureFailure("Start time incorrectly updated for existing peer", 1584802402, gotStartTime))
}
userID = 1
torrentID = 2
// Now test for new peer not in database
testPeer = &cdb.Peer{
UserID: 1,
TorrentID: 2,
ClientID: 2,
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
StartTime: time.Now().Unix(),
LastAnnounce: time.Now().Unix(),
ClientID: 2,
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
StartTime: time.Now().Unix(),
}
db.QueueTransferIP(testPeer, 0, 0)
db.QueueTransferIP(testPeer, userID, torrentID, 0, 0)
for len(db.transferIpsChannel) > 0 {
time.Sleep(time.Second)
@ -730,21 +733,21 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
time.Sleep(200 * time.Millisecond)
gotPeer = &cdb.Peer{
UserID: testPeer.UserID,
TorrentID: testPeer.TorrentID,
ClientID: testPeer.ClientID,
Addr: cdb.NewPeerAddressFromIPPort(testPeer.Addr.IP(), 0),
ClientID: testPeer.ClientID,
Addr: cdb.NewPeerAddressFromIPPort(testPeer.Addr.IP(), 0),
}
row = db.conn.QueryRow("SELECT port, starttime, last_announce "+
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
userID, torrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
err = row.Scan(&port, &gotPeer.StartTime, &gotPeer.LastAnnounce)
err = row.Scan(&port, &gotPeer.StartTime, &gotLastAnnounce)
if err != nil {
panic(err)
}
gotPeer.SetLastAnnounce(gotLastAnnounce)
gotPeer.Addr = cdb.NewPeerAddressFromIPPort(gotPeer.Addr.IP(), port)
if !reflect.DeepEqual(testPeer, gotPeer) {
@ -755,10 +758,8 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
func TestRecordAndFlushSnatch(t *testing.T) {
prepareTestDatabase()
testPeer := &cdb.Peer{
UserID: 1,
TorrentID: 1,
}
userID := uint32(1)
torrentID := uint32(1)
var (
snatchTime int64
@ -767,7 +768,7 @@ func TestRecordAndFlushSnatch(t *testing.T) {
currTime = time.Now().Unix()
db.QueueSnatch(testPeer, currTime)
db.QueueSnatch(userID, torrentID, currTime)
for len(db.snatchChannel) > 0 {
time.Sleep(time.Second)
@ -775,7 +776,7 @@ func TestRecordAndFlushSnatch(t *testing.T) {
time.Sleep(200 * time.Millisecond)
row := db.conn.QueryRow("SELECT snatched_time "+
"FROM transfer_history WHERE uid = ? AND fid = ?", testPeer.UserID, testPeer.TorrentID)
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
err := row.Scan(&snatchTime)
if err != nil {
@ -797,18 +798,12 @@ func TestRecordAndFlushTorrents(t *testing.T) {
torrent := (*db.Torrents.Load())[h]
torrent.LastAction.Store(time.Now().Unix())
torrent.Seeders[cdb.NewPeerKey(1, cdb.PeerIDFromRawString("test_peer_id_num_one"))] = &cdb.Peer{
UserID: 1,
TorrentID: torrent.ID.Load(),
ClientID: 1,
StartTime: time.Now().Unix(),
LastAnnounce: time.Now().Unix(),
ClientID: 1,
StartTime: time.Now().Unix(),
}
torrent.Leechers[cdb.NewPeerKey(3, cdb.PeerIDFromRawString("test_peer_id_num_two"))] = &cdb.Peer{
UserID: 3,
TorrentID: torrent.ID.Load(),
ClientID: 2,
StartTime: time.Now().Unix(),
LastAnnounce: time.Now().Unix(),
ClientID: 2,
StartTime: time.Now().Unix(),
}
torrent.SeedersLength.Store(uint32(len(torrent.Seeders)))
torrent.LeechersLength.Store(uint32(len(torrent.Leechers)))

View file

@ -436,7 +436,7 @@ func (db *Database) purgeInactivePeers() {
countThisTorrent := count
for id, peer := range torrent.Leechers {
if peer.LastAnnounce < oldestActive {
if peer.LastAnnounce() < oldestActive {
delete(torrent.Leechers, id)
count++
}
@ -450,7 +450,7 @@ func (db *Database) purgeInactivePeers() {
}
for id, peer := range torrent.Seeders {
if peer.LastAnnounce < oldestActive {
if peer.LastAnnounce() < oldestActive {
delete(torrent.Seeders, id)
count++
}

View file

@ -87,14 +87,16 @@ func (db *Database) QueueUser(
}
}
func (db *Database) QueueTransferHistory(peer *cdb.Peer, rawDeltaUp, rawDeltaDown, deltaTime, deltaSeedTime int64,
func (db *Database) QueueTransferHistory(
peer *cdb.Peer, userID, torrentID uint32,
rawDeltaUp, rawDeltaDown, deltaTime, deltaSeedTime int64,
deltaSnatch uint8, active bool) {
th := db.bufferPool.Take()
th.WriteString("(")
th.WriteString(strconv.FormatUint(uint64(peer.UserID), 10))
th.WriteString(strconv.FormatUint(uint64(userID), 10))
th.WriteString(",")
th.WriteString(strconv.FormatUint(uint64(peer.TorrentID), 10))
th.WriteString(strconv.FormatUint(uint64(torrentID), 10))
th.WriteString(",")
th.WriteString(strconv.FormatInt(rawDeltaUp, 10))
th.WriteString(",")
@ -104,7 +106,7 @@ func (db *Database) QueueTransferHistory(peer *cdb.Peer, rawDeltaUp, rawDeltaDow
th.WriteString(",")
th.WriteString(strconv.FormatInt(peer.StartTime, 10))
th.WriteString(",")
th.WriteString(strconv.FormatInt(peer.LastAnnounce, 10))
th.WriteString(strconv.FormatInt(peer.LastAnnounce(), 10))
th.WriteString(",")
th.WriteString(strconv.FormatInt(deltaTime, 10))
th.WriteString(",")
@ -126,13 +128,13 @@ func (db *Database) QueueTransferHistory(peer *cdb.Peer, rawDeltaUp, rawDeltaDow
}
}
func (db *Database) QueueTransferIP(peer *cdb.Peer, rawDeltaUp, rawDeltaDown int64) {
func (db *Database) QueueTransferIP(peer *cdb.Peer, userID, torrentID uint32, rawDeltaUp, rawDeltaDown int64) {
ti := db.bufferPool.Take()
ti.WriteString("(")
ti.WriteString(strconv.FormatUint(uint64(peer.UserID), 10))
ti.WriteString(strconv.FormatUint(uint64(userID), 10))
ti.WriteString(",")
ti.WriteString(strconv.FormatUint(uint64(peer.TorrentID), 10))
ti.WriteString(strconv.FormatUint(uint64(torrentID), 10))
ti.WriteString(",")
ti.WriteString(strconv.FormatUint(uint64(peer.ClientID), 10))
ti.WriteString(",")
@ -146,7 +148,7 @@ func (db *Database) QueueTransferIP(peer *cdb.Peer, rawDeltaUp, rawDeltaDown int
ti.WriteString(",")
ti.WriteString(strconv.FormatInt(peer.StartTime, 10))
ti.WriteString(",")
ti.WriteString(strconv.FormatInt(peer.LastAnnounce, 10))
ti.WriteString(strconv.FormatInt(peer.LastAnnounce(), 10))
ti.WriteString(")")
select {
@ -158,13 +160,13 @@ func (db *Database) QueueTransferIP(peer *cdb.Peer, rawDeltaUp, rawDeltaDown int
}
}
func (db *Database) QueueSnatch(peer *cdb.Peer, now int64) {
func (db *Database) QueueSnatch(userID, torrentID uint32, now int64) {
sn := db.bufferPool.Take()
sn.WriteString("(")
sn.WriteString(strconv.FormatUint(uint64(peer.UserID), 10))
sn.WriteString(strconv.FormatUint(uint64(userID), 10))
sn.WriteString(",")
sn.WriteString(strconv.FormatUint(uint64(peer.TorrentID), 10))
sn.WriteString(strconv.FormatUint(uint64(torrentID), 10))
sn.WriteString(",")
sn.WriteString(strconv.FormatInt(now, 10))
sn.WriteString(")")

View file

@ -44,17 +44,13 @@ func TestSerializer(t *testing.T) {
testUsers["mUztWMpBYNCqzmge6vGeEUGSrctJbgpQ"] = testUser
testPeer := &cdb.Peer{
UserID: 12,
TorrentID: 10,
ClientID: 4,
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
StartTime: time.Now().Unix(),
LastAnnounce: time.Now().Unix(),
Seeding: true,
Left: 0,
Uploaded: 100,
Downloaded: 1000,
ID: cdb.PeerIDFromRawString("12-10-2130706433-4"),
ClientID: 4,
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
StartTime: time.Now().Unix(),
Seeding: true,
Left: 0,
Uploaded: 100,
Downloaded: 1000,
}
testTorrentHash := cdb.TorrentHash{

View file

@ -56,23 +56,35 @@ func (k PeerKey) PeerID() (id PeerID) {
//goland:noinspection GoMixedReceiverTypes
func (k PeerKey) MarshalText() ([]byte, error) {
var buf [(4 + 20) * 2]byte
peerIDBuffer, err := k.PeerID().MarshalText()
if err != nil {
return nil, err
}
hex.Encode(buf[:], k[:])
return buf[:], nil
return []byte(strconv.FormatUint(uint64(k.ID()), 10) + "-" + string(peerIDBuffer)), nil
}
//goland:noinspection GoMixedReceiverTypes
func (k *PeerKey) UnmarshalText(b []byte) error {
if len(b) != (4+20)*2 {
s := bytes.Split(b, []byte{'-'})
if len(s) != 2 {
return errWrongPeerKeySize
}
if _, err := hex.Decode(k[:], b[:]); err != nil {
n, err := strconv.ParseUint(string(s[0]), 10, 32)
if err != nil {
return err
}
var peerID PeerID
if err = peerID.UnmarshalText(s[1]); err != nil {
return err
}
binary.LittleEndian.PutUint32(k[:], uint32(n))
copy(k[4:], peerID[:])
return nil
}
@ -206,34 +218,41 @@ func (a *PeerAddress) UnmarshalText(b []byte) error {
}
// Peer
// Theoretical min layout size: 6 + 8 + 8 + 8 + 8 + 8 + 8 + 4 + 4 + 6 + 2 + 1 = 71 bytes
// Current layout size go1.20.4: 80 bytes via unsafe.Sizeof(Peer{})
// Theoretical min layout size: 45 bytes
// Current layout size go1.20.4: 48 bytes via unsafe.Sizeof(Peer{})
type Peer struct {
Addr PeerAddress
Uploaded uint64
Downloaded uint64
Left uint64
StartTime int64 // unix time
LastAnnounce int64
StartTime int64 // unix time
// RelativeLastAnnounce This is the difference of absolute value relative to StartTime
// To get the absolute value use LastAnnounce()
RelativeLastAnnounce int32
TorrentID uint32
UserID uint32
// ID placed here so in-memory layout is smaller
ID PeerID
Addr PeerAddress
ClientID uint16
Seeding bool
}
func (p *Peer) SetLastAnnounce(lastAnnounce int64) {
p.RelativeLastAnnounce = int32(lastAnnounce - p.StartTime)
}
func (p *Peer) LastAnnounce() int64 {
return p.StartTime + int64(p.RelativeLastAnnounce)
}
var errInvalidAddrLength = errors.New("invalid Addr length")
func (p *Peer) Load(version uint64, reader readerAndByteReader) (err error) {
if _, err = io.ReadFull(reader, p.ID[:]); err != nil {
return err
if version <= 3 {
var peerID PeerID
if _, err = io.ReadFull(reader, peerID[:]); err != nil {
return err
}
}
if version == 1 {
@ -297,16 +316,29 @@ func (p *Peer) Load(version uint64, reader readerAndByteReader) (err error) {
return err
}
if err = binary.Read(reader, binary.LittleEndian, &p.LastAnnounce); err != nil {
return err
}
if version <= 3 {
var (
torrentID, userID uint32
lastAnnounce int64
)
if err = binary.Read(reader, binary.LittleEndian, &p.TorrentID); err != nil {
return err
}
if err = binary.Read(reader, binary.LittleEndian, lastAnnounce); err != nil {
return err
}
if err = binary.Read(reader, binary.LittleEndian, &p.UserID); err != nil {
return err
p.SetLastAnnounce(lastAnnounce)
if err = binary.Read(reader, binary.LittleEndian, &torrentID); err != nil {
return err
}
if err = binary.Read(reader, binary.LittleEndian, &userID); err != nil {
return err
}
} else {
if err = binary.Read(reader, binary.LittleEndian, &p.RelativeLastAnnounce); err != nil {
return err
}
}
if err = binary.Read(reader, binary.LittleEndian, &p.ClientID); err != nil {
@ -318,15 +350,12 @@ func (p *Peer) Load(version uint64, reader readerAndByteReader) (err error) {
func (p *Peer) Append(preAllocatedBuffer []byte) (buf []byte) {
buf = preAllocatedBuffer
buf = append(buf, p.ID[:]...)
buf = append(buf, p.Addr[:]...)
buf = binary.LittleEndian.AppendUint64(buf, p.Uploaded)
buf = binary.LittleEndian.AppendUint64(buf, p.Downloaded)
buf = binary.LittleEndian.AppendUint64(buf, p.Left)
buf = binary.LittleEndian.AppendUint64(buf, uint64(p.StartTime))
buf = binary.LittleEndian.AppendUint64(buf, uint64(p.LastAnnounce))
buf = binary.LittleEndian.AppendUint32(buf, p.TorrentID)
buf = binary.LittleEndian.AppendUint32(buf, p.UserID)
buf = binary.LittleEndian.AppendUint32(buf, uint32(p.RelativeLastAnnounce))
buf = binary.LittleEndian.AppendUint16(buf, p.ClientID)
if p.Seeding {

View file

@ -256,7 +256,8 @@ func announce(
event := qp.Params.Event
completed := event == "completed"
peerKey := cdb.NewPeerKey(user.ID.Load(), cdb.PeerIDFromRawString(peerID))
userID := user.ID.Load()
peerKey := cdb.NewPeerKey(userID, cdb.PeerIDFromRawString(peerID))
// Take torrent peers lock to read/write on it to prevent race conditions
if !torrent.TryLockWithContext(ctx) {
@ -321,11 +322,8 @@ func announce(
// Update peer info/stats
if newPeer {
peer.ID = peerKey.PeerID()
peer.UserID = user.ID.Load()
peer.TorrentID = torrent.ID.Load()
peer.StartTime = now
peer.LastAnnounce = now
peer.RelativeLastAnnounce = 0
peer.Uploaded = uploaded
peer.Downloaded = downloaded
}
@ -362,7 +360,7 @@ func announce(
peer.Downloaded = downloaded
peer.Left = left
peer.Seeding = seeding
deltaTime := now - peer.LastAnnounce
deltaTime := now - peer.LastAnnounce()
if deltaTime > int64(peerInactivityInterval) {
deltaTime = 0
@ -370,14 +368,14 @@ func announce(
var deltaSeedTime int64
if seeding {
deltaSeedTime = now - peer.LastAnnounce
deltaSeedTime = now - peer.LastAnnounce()
}
if deltaSeedTime > int64(peerInactivityInterval) {
deltaSeedTime = 0
}
peer.LastAnnounce = now
peer.SetLastAnnounce(now)
/* Update torrent last_action only if announced action is seeding.
This allows dead torrents without seeder but with leecher to be proeprly pruned */
if seeding {
@ -400,7 +398,7 @@ func announce(
active = false
} else if completed {
db.QueueSnatch(peer, now)
db.QueueSnatch(userID, torrent.ID.Load(), now)
deltaSnatch = 1
}
@ -414,13 +412,15 @@ func announce(
// If the channels are already full, block until a flush occurs
db.QueueTorrent(torrent, deltaSnatch)
db.QueueTransferHistory(peer, rawDeltaUpload, rawDeltaDownload, deltaTime, deltaSeedTime, deltaSnatch, active)
db.QueueTransferHistory(
peer, userID, torrent.ID.Load(), rawDeltaUpload,
rawDeltaDownload, deltaTime, deltaSeedTime, deltaSnatch, active)
db.QueueUser(user, rawDeltaUpload, rawDeltaDownload, deltaUpload, deltaDownload)
db.QueueTransferIP(peer, rawDeltaUpload, rawDeltaDownload)
db.QueueTransferIP(peer, userID, torrent.ID.Load(), rawDeltaUpload, rawDeltaDownload)
go record.Record(
peer.TorrentID,
user.ID.Load(),
torrent.ID.Load(),
userID,
ipAddr,
port,
event,
@ -460,7 +460,12 @@ func announce(
peerCount = util.Min(int(numWant), leechCount+seedCount-1)
}
peersToSend := make([]*cdb.Peer, 0, peerCount)
type peerToSend struct {
Peer *cdb.Peer
Key cdb.PeerKey
}
peersToSend := make([]peerToSend, 0, peerCount)
/*
* The iteration is already "random", so we don't need to randomize ourselves:
@ -468,41 +473,54 @@ func announce(
* - Each time you range over the map, it starts at a random offset into the map's elements
*/
if seeding {
for _, leech := range torrent.Leechers {
for k, leech := range torrent.Leechers {
if len(peersToSend) >= int(numWant) {
break
}
if leech.UserID == peer.UserID {
if k.ID() == userID {
continue
}
peersToSend = append(peersToSend, leech)
peersToSend = append(peersToSend, peerToSend{
Peer: leech,
Key: k,
})
}
} else {
/* Send only one peer per user. This is to ensure that users seeding at multiple locations don't end up
exclusively acting as peers. */
uniqueSeeders := make(map[uint32]*cdb.Peer)
for _, seed := range torrent.Seeders {
uniqueSeeders := make(map[uint32]struct{})
for k, seed := range torrent.Seeders {
if len(peersToSend) >= int(numWant) {
break
}
if seed.UserID == peer.UserID {
seedUserID := k.ID()
if seedUserID == userID {
continue
}
if _, exists = uniqueSeeders[seed.UserID]; !exists {
uniqueSeeders[seed.UserID] = seed
peersToSend = append(peersToSend, seed)
if _, exists = uniqueSeeders[seedUserID]; !exists {
uniqueSeeders[seedUserID] = struct{}{}
peersToSend = append(peersToSend, peerToSend{
Peer: seed,
Key: k,
})
}
}
for _, leech := range torrent.Leechers {
for k, leech := range torrent.Leechers {
if len(peersToSend) >= int(numWant) {
break
}
if leech.UserID == peer.UserID {
if k.ID() == userID {
continue
}
peersToSend = append(peersToSend, leech)
peersToSend = append(peersToSend, peerToSend{
Peer: leech,
Key: k,
})
}
}
@ -510,7 +528,7 @@ func announce(
peerBuff := make([]byte, 0, len(peersToSend)*cdb.PeerAddressSize)
for _, other := range peersToSend {
peerBuff = append(peerBuff, other.Addr[:]...)
peerBuff = append(peerBuff, other.Peer.Addr[:]...)
}
response["peers"] = peerBuff
@ -518,10 +536,11 @@ func announce(
peerList := make([]map[string]interface{}, len(peersToSend))
for i, other := range peersToSend {
peerMap := make(map[string]interface{})
peerMap["ip"] = other.Addr.IPString()
peerMap["port"] = other.Addr.Port()
peerMap["ip"] = other.Peer.Addr.IPString()
peerMap["port"] = other.Peer.Addr.Port()
if !noPeerID {
peerMap["peer id"] = other.ID[:]
thisPeerID := other.Key.PeerID()
peerMap["peer id"] = thisPeerID[:]
}
peerList[i] = peerMap
}