database/types: Peer is now 48 bytes, removed TorrentID / UserID / ID, made LastAnnounce relative
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
parent
f39eb883f9
commit
baeac751cb
|
@ -159,30 +159,30 @@ func main() {
|
|||
for _, torrent := range t {
|
||||
newSeeders := make(map[cdb.PeerKey]*cdb.Peer)
|
||||
|
||||
for _, s := range torrent.Seeders {
|
||||
s.UserID = anonUserMapping[s.UserID]
|
||||
for k, s := range torrent.Seeders {
|
||||
userID := anonUserMapping[k.ID()]
|
||||
// Replace IP
|
||||
binary.BigEndian.PutUint32(s.Addr[:], util.UnsafeUint32())
|
||||
// Replace Port with valid random port
|
||||
binary.BigEndian.PutUint16(s.Addr[4:], uint16(util.UnsafeRand(1024, math.MaxUint16-1)))
|
||||
|
||||
// Replaces userID in map key
|
||||
newSeeders[cdb.NewPeerKey(s.UserID, s.ID)] = s
|
||||
newSeeders[cdb.NewPeerKey(userID, k.PeerID())] = s
|
||||
}
|
||||
|
||||
torrent.Seeders = newSeeders
|
||||
|
||||
newLeechers := make(map[cdb.PeerKey]*cdb.Peer)
|
||||
|
||||
for _, s := range torrent.Leechers {
|
||||
s.UserID = anonUserMapping[s.UserID]
|
||||
for k, s := range torrent.Leechers {
|
||||
userID := anonUserMapping[k.ID()]
|
||||
// Replace IP
|
||||
binary.BigEndian.PutUint32(s.Addr[:], util.UnsafeUint32())
|
||||
// Replace Port with valid random port
|
||||
binary.BigEndian.PutUint16(s.Addr[4:], uint16(util.UnsafeRand(1024, math.MaxUint16-1)))
|
||||
|
||||
// Replaces userID in map key
|
||||
newLeechers[cdb.NewPeerKey(s.UserID, s.ID)] = s
|
||||
newLeechers[cdb.NewPeerKey(userID, k.PeerID())] = s
|
||||
}
|
||||
|
||||
torrent.Leechers = newLeechers
|
||||
|
|
|
@ -438,13 +438,13 @@ func TestRecordAndFlushUsers(t *testing.T) {
|
|||
func TestRecordAndFlushTransferHistory(t *testing.T) {
|
||||
prepareTestDatabase()
|
||||
|
||||
userID := uint32(1)
|
||||
torrentID := uint32(1)
|
||||
|
||||
testPeer := &cdb.Peer{
|
||||
UserID: 1,
|
||||
TorrentID: 1,
|
||||
Seeding: true,
|
||||
StartTime: time.Now().Unix(),
|
||||
LastAnnounce: time.Now().Unix(),
|
||||
Left: 65000,
|
||||
Seeding: true,
|
||||
StartTime: time.Now().Unix(),
|
||||
Left: 65000,
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -476,7 +476,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
deltaSeedTime = 15
|
||||
|
||||
row := db.conn.QueryRow("SELECT uploaded, downloaded, activetime, seedtime, active, snatched "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", testPeer.UserID, testPeer.TorrentID)
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
|
||||
|
||||
err := row.Scan(&initRawUpload, &initRawDownload, &initActiveTime, &initSeedTime, &initActive, &initSnatch)
|
||||
if err != nil {
|
||||
|
@ -484,6 +484,8 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
}
|
||||
|
||||
db.QueueTransferHistory(testPeer,
|
||||
userID,
|
||||
torrentID,
|
||||
deltaRawUpload,
|
||||
deltaRawDownload,
|
||||
deltaActiveTime,
|
||||
|
@ -497,7 +499,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
row = db.conn.QueryRow("SELECT uploaded, downloaded, activetime, seedtime, active, snatched "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", testPeer.UserID, testPeer.TorrentID)
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
|
||||
|
||||
err = row.Scan(&rawUpload, &rawDownload, &activeTime, &seedTime, &active, &snatch)
|
||||
if err != nil {
|
||||
|
@ -550,21 +552,21 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
|
||||
// Check if existing peer being updated properly
|
||||
gotPeer := &cdb.Peer{
|
||||
UserID: testPeer.UserID,
|
||||
TorrentID: testPeer.TorrentID,
|
||||
StartTime: testPeer.StartTime,
|
||||
}
|
||||
|
||||
var gotStartTime int64
|
||||
var gotStartTime, gotLastAnnounce int64
|
||||
|
||||
row = db.conn.QueryRow("SELECT seeding, starttime, last_announce, remaining "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", gotPeer.UserID, gotPeer.TorrentID)
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
|
||||
|
||||
err = row.Scan(&gotPeer.Seeding, &gotStartTime, &gotPeer.LastAnnounce, &gotPeer.Left)
|
||||
err = row.Scan(&gotPeer.Seeding, &gotStartTime, &gotLastAnnounce, &gotPeer.Left)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
gotPeer.SetLastAnnounce(gotLastAnnounce)
|
||||
|
||||
if !reflect.DeepEqual(testPeer, gotPeer) {
|
||||
t.Fatal(fixtureFailure("Existing peer incorrectly updated in the database", testPeer, gotPeer))
|
||||
}
|
||||
|
@ -573,18 +575,20 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
t.Fatal(fixtureFailure("Start time incorrectly updated for existing peer", 1584996101, gotStartTime))
|
||||
}
|
||||
|
||||
userID = 0
|
||||
torrentID = 2
|
||||
|
||||
// Now test for new peer not in database
|
||||
testPeer = &cdb.Peer{
|
||||
UserID: 0,
|
||||
TorrentID: 2,
|
||||
Seeding: true,
|
||||
StartTime: time.Now().Unix(),
|
||||
LastAnnounce: time.Now().Unix(),
|
||||
Left: 65000,
|
||||
Seeding: true,
|
||||
StartTime: time.Now().Unix(),
|
||||
Left: 65000,
|
||||
}
|
||||
|
||||
db.QueueTransferHistory(
|
||||
testPeer,
|
||||
userID,
|
||||
torrentID,
|
||||
0,
|
||||
1000,
|
||||
1,
|
||||
|
@ -592,10 +596,7 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
1,
|
||||
true)
|
||||
|
||||
gotPeer = &cdb.Peer{
|
||||
UserID: testPeer.UserID,
|
||||
TorrentID: testPeer.TorrentID,
|
||||
}
|
||||
gotPeer = &cdb.Peer{}
|
||||
|
||||
for len(db.transferHistoryChannel) > 0 {
|
||||
time.Sleep(time.Second)
|
||||
|
@ -603,13 +604,15 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
row = db.conn.QueryRow("SELECT seeding, starttime, last_announce, remaining "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", gotPeer.UserID, gotPeer.TorrentID)
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
|
||||
|
||||
err = row.Scan(&gotPeer.Seeding, &gotPeer.StartTime, &gotPeer.LastAnnounce, &gotPeer.Left)
|
||||
err = row.Scan(&gotPeer.Seeding, &gotPeer.StartTime, &gotLastAnnounce, &gotPeer.Left)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
gotPeer.SetLastAnnounce(gotLastAnnounce)
|
||||
|
||||
if !reflect.DeepEqual(testPeer, gotPeer) {
|
||||
t.Fatal(fixtureFailure("New peer incorrectly inserted in the database", testPeer, gotPeer))
|
||||
}
|
||||
|
@ -618,13 +621,13 @@ func TestRecordAndFlushTransferHistory(t *testing.T) {
|
|||
func TestRecordAndFlushTransferIP(t *testing.T) {
|
||||
prepareTestDatabase()
|
||||
|
||||
userID := uint32(0)
|
||||
torrentID := uint32(0)
|
||||
|
||||
testPeer := &cdb.Peer{
|
||||
UserID: 0,
|
||||
TorrentID: 0,
|
||||
ClientID: 1,
|
||||
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
|
||||
StartTime: time.Now().Unix(),
|
||||
LastAnnounce: time.Now().Unix(),
|
||||
ClientID: 1,
|
||||
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
|
||||
StartTime: time.Now().Unix(),
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -641,14 +644,14 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
|
||||
row := db.conn.QueryRow("SELECT uploaded, downloaded "+
|
||||
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
|
||||
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
userID, torrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
|
||||
err := row.Scan(&initUpload, &initDownload)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db.QueueTransferIP(testPeer, deltaUpload, deltaDownload)
|
||||
db.QueueTransferIP(testPeer, userID, torrentID, deltaUpload, deltaDownload)
|
||||
|
||||
for len(db.transferIpsChannel) > 0 {
|
||||
time.Sleep(time.Second)
|
||||
|
@ -657,7 +660,7 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
|
||||
row = db.conn.QueryRow("SELECT uploaded, downloaded "+
|
||||
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
|
||||
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
userID, torrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
|
||||
err = row.Scan(&upload, &download)
|
||||
if err != nil {
|
||||
|
@ -682,26 +685,26 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
|
||||
// Check if existing peer being updated properly
|
||||
gotPeer := &cdb.Peer{
|
||||
UserID: testPeer.UserID,
|
||||
TorrentID: testPeer.TorrentID,
|
||||
ClientID: testPeer.ClientID,
|
||||
Addr: cdb.NewPeerAddressFromIPPort(testPeer.Addr.IP(), 0),
|
||||
StartTime: testPeer.StartTime,
|
||||
}
|
||||
|
||||
var gotStartTime int64
|
||||
var gotStartTime, gotLastAnnounce int64
|
||||
|
||||
row = db.conn.QueryRow("SELECT port, starttime, last_announce "+
|
||||
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
|
||||
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
userID, torrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
|
||||
var port uint16
|
||||
|
||||
err = row.Scan(&port, &gotStartTime, &gotPeer.LastAnnounce)
|
||||
err = row.Scan(&port, &gotStartTime, &gotLastAnnounce)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
gotPeer.SetLastAnnounce(gotLastAnnounce)
|
||||
|
||||
gotPeer.Addr = cdb.NewPeerAddressFromIPPort(gotPeer.Addr.IP(), port)
|
||||
|
||||
if !reflect.DeepEqual(testPeer, gotPeer) {
|
||||
|
@ -712,17 +715,17 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
t.Fatal(fixtureFailure("Start time incorrectly updated for existing peer", 1584802402, gotStartTime))
|
||||
}
|
||||
|
||||
userID = 1
|
||||
torrentID = 2
|
||||
|
||||
// Now test for new peer not in database
|
||||
testPeer = &cdb.Peer{
|
||||
UserID: 1,
|
||||
TorrentID: 2,
|
||||
ClientID: 2,
|
||||
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
|
||||
StartTime: time.Now().Unix(),
|
||||
LastAnnounce: time.Now().Unix(),
|
||||
ClientID: 2,
|
||||
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
|
||||
StartTime: time.Now().Unix(),
|
||||
}
|
||||
|
||||
db.QueueTransferIP(testPeer, 0, 0)
|
||||
db.QueueTransferIP(testPeer, userID, torrentID, 0, 0)
|
||||
|
||||
for len(db.transferIpsChannel) > 0 {
|
||||
time.Sleep(time.Second)
|
||||
|
@ -730,21 +733,21 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
gotPeer = &cdb.Peer{
|
||||
UserID: testPeer.UserID,
|
||||
TorrentID: testPeer.TorrentID,
|
||||
ClientID: testPeer.ClientID,
|
||||
Addr: cdb.NewPeerAddressFromIPPort(testPeer.Addr.IP(), 0),
|
||||
ClientID: testPeer.ClientID,
|
||||
Addr: cdb.NewPeerAddressFromIPPort(testPeer.Addr.IP(), 0),
|
||||
}
|
||||
|
||||
row = db.conn.QueryRow("SELECT port, starttime, last_announce "+
|
||||
"FROM transfer_ips WHERE uid = ? AND fid = ? AND ip = ? AND client_id = ?",
|
||||
testPeer.UserID, testPeer.TorrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
userID, torrentID, testPeer.Addr.IPNumeric(), testPeer.ClientID)
|
||||
|
||||
err = row.Scan(&port, &gotPeer.StartTime, &gotPeer.LastAnnounce)
|
||||
err = row.Scan(&port, &gotPeer.StartTime, &gotLastAnnounce)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
gotPeer.SetLastAnnounce(gotLastAnnounce)
|
||||
|
||||
gotPeer.Addr = cdb.NewPeerAddressFromIPPort(gotPeer.Addr.IP(), port)
|
||||
|
||||
if !reflect.DeepEqual(testPeer, gotPeer) {
|
||||
|
@ -755,10 +758,8 @@ func TestRecordAndFlushTransferIP(t *testing.T) {
|
|||
func TestRecordAndFlushSnatch(t *testing.T) {
|
||||
prepareTestDatabase()
|
||||
|
||||
testPeer := &cdb.Peer{
|
||||
UserID: 1,
|
||||
TorrentID: 1,
|
||||
}
|
||||
userID := uint32(1)
|
||||
torrentID := uint32(1)
|
||||
|
||||
var (
|
||||
snatchTime int64
|
||||
|
@ -767,7 +768,7 @@ func TestRecordAndFlushSnatch(t *testing.T) {
|
|||
|
||||
currTime = time.Now().Unix()
|
||||
|
||||
db.QueueSnatch(testPeer, currTime)
|
||||
db.QueueSnatch(userID, torrentID, currTime)
|
||||
|
||||
for len(db.snatchChannel) > 0 {
|
||||
time.Sleep(time.Second)
|
||||
|
@ -775,7 +776,7 @@ func TestRecordAndFlushSnatch(t *testing.T) {
|
|||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
row := db.conn.QueryRow("SELECT snatched_time "+
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", testPeer.UserID, testPeer.TorrentID)
|
||||
"FROM transfer_history WHERE uid = ? AND fid = ?", userID, torrentID)
|
||||
|
||||
err := row.Scan(&snatchTime)
|
||||
if err != nil {
|
||||
|
@ -797,18 +798,12 @@ func TestRecordAndFlushTorrents(t *testing.T) {
|
|||
torrent := (*db.Torrents.Load())[h]
|
||||
torrent.LastAction.Store(time.Now().Unix())
|
||||
torrent.Seeders[cdb.NewPeerKey(1, cdb.PeerIDFromRawString("test_peer_id_num_one"))] = &cdb.Peer{
|
||||
UserID: 1,
|
||||
TorrentID: torrent.ID.Load(),
|
||||
ClientID: 1,
|
||||
StartTime: time.Now().Unix(),
|
||||
LastAnnounce: time.Now().Unix(),
|
||||
ClientID: 1,
|
||||
StartTime: time.Now().Unix(),
|
||||
}
|
||||
torrent.Leechers[cdb.NewPeerKey(3, cdb.PeerIDFromRawString("test_peer_id_num_two"))] = &cdb.Peer{
|
||||
UserID: 3,
|
||||
TorrentID: torrent.ID.Load(),
|
||||
ClientID: 2,
|
||||
StartTime: time.Now().Unix(),
|
||||
LastAnnounce: time.Now().Unix(),
|
||||
ClientID: 2,
|
||||
StartTime: time.Now().Unix(),
|
||||
}
|
||||
torrent.SeedersLength.Store(uint32(len(torrent.Seeders)))
|
||||
torrent.LeechersLength.Store(uint32(len(torrent.Leechers)))
|
||||
|
|
|
@ -436,7 +436,7 @@ func (db *Database) purgeInactivePeers() {
|
|||
countThisTorrent := count
|
||||
|
||||
for id, peer := range torrent.Leechers {
|
||||
if peer.LastAnnounce < oldestActive {
|
||||
if peer.LastAnnounce() < oldestActive {
|
||||
delete(torrent.Leechers, id)
|
||||
count++
|
||||
}
|
||||
|
@ -450,7 +450,7 @@ func (db *Database) purgeInactivePeers() {
|
|||
}
|
||||
|
||||
for id, peer := range torrent.Seeders {
|
||||
if peer.LastAnnounce < oldestActive {
|
||||
if peer.LastAnnounce() < oldestActive {
|
||||
delete(torrent.Seeders, id)
|
||||
count++
|
||||
}
|
||||
|
|
|
@ -87,14 +87,16 @@ func (db *Database) QueueUser(
|
|||
}
|
||||
}
|
||||
|
||||
func (db *Database) QueueTransferHistory(peer *cdb.Peer, rawDeltaUp, rawDeltaDown, deltaTime, deltaSeedTime int64,
|
||||
func (db *Database) QueueTransferHistory(
|
||||
peer *cdb.Peer, userID, torrentID uint32,
|
||||
rawDeltaUp, rawDeltaDown, deltaTime, deltaSeedTime int64,
|
||||
deltaSnatch uint8, active bool) {
|
||||
th := db.bufferPool.Take()
|
||||
|
||||
th.WriteString("(")
|
||||
th.WriteString(strconv.FormatUint(uint64(peer.UserID), 10))
|
||||
th.WriteString(strconv.FormatUint(uint64(userID), 10))
|
||||
th.WriteString(",")
|
||||
th.WriteString(strconv.FormatUint(uint64(peer.TorrentID), 10))
|
||||
th.WriteString(strconv.FormatUint(uint64(torrentID), 10))
|
||||
th.WriteString(",")
|
||||
th.WriteString(strconv.FormatInt(rawDeltaUp, 10))
|
||||
th.WriteString(",")
|
||||
|
@ -104,7 +106,7 @@ func (db *Database) QueueTransferHistory(peer *cdb.Peer, rawDeltaUp, rawDeltaDow
|
|||
th.WriteString(",")
|
||||
th.WriteString(strconv.FormatInt(peer.StartTime, 10))
|
||||
th.WriteString(",")
|
||||
th.WriteString(strconv.FormatInt(peer.LastAnnounce, 10))
|
||||
th.WriteString(strconv.FormatInt(peer.LastAnnounce(), 10))
|
||||
th.WriteString(",")
|
||||
th.WriteString(strconv.FormatInt(deltaTime, 10))
|
||||
th.WriteString(",")
|
||||
|
@ -126,13 +128,13 @@ func (db *Database) QueueTransferHistory(peer *cdb.Peer, rawDeltaUp, rawDeltaDow
|
|||
}
|
||||
}
|
||||
|
||||
func (db *Database) QueueTransferIP(peer *cdb.Peer, rawDeltaUp, rawDeltaDown int64) {
|
||||
func (db *Database) QueueTransferIP(peer *cdb.Peer, userID, torrentID uint32, rawDeltaUp, rawDeltaDown int64) {
|
||||
ti := db.bufferPool.Take()
|
||||
|
||||
ti.WriteString("(")
|
||||
ti.WriteString(strconv.FormatUint(uint64(peer.UserID), 10))
|
||||
ti.WriteString(strconv.FormatUint(uint64(userID), 10))
|
||||
ti.WriteString(",")
|
||||
ti.WriteString(strconv.FormatUint(uint64(peer.TorrentID), 10))
|
||||
ti.WriteString(strconv.FormatUint(uint64(torrentID), 10))
|
||||
ti.WriteString(",")
|
||||
ti.WriteString(strconv.FormatUint(uint64(peer.ClientID), 10))
|
||||
ti.WriteString(",")
|
||||
|
@ -146,7 +148,7 @@ func (db *Database) QueueTransferIP(peer *cdb.Peer, rawDeltaUp, rawDeltaDown int
|
|||
ti.WriteString(",")
|
||||
ti.WriteString(strconv.FormatInt(peer.StartTime, 10))
|
||||
ti.WriteString(",")
|
||||
ti.WriteString(strconv.FormatInt(peer.LastAnnounce, 10))
|
||||
ti.WriteString(strconv.FormatInt(peer.LastAnnounce(), 10))
|
||||
ti.WriteString(")")
|
||||
|
||||
select {
|
||||
|
@ -158,13 +160,13 @@ func (db *Database) QueueTransferIP(peer *cdb.Peer, rawDeltaUp, rawDeltaDown int
|
|||
}
|
||||
}
|
||||
|
||||
func (db *Database) QueueSnatch(peer *cdb.Peer, now int64) {
|
||||
func (db *Database) QueueSnatch(userID, torrentID uint32, now int64) {
|
||||
sn := db.bufferPool.Take()
|
||||
|
||||
sn.WriteString("(")
|
||||
sn.WriteString(strconv.FormatUint(uint64(peer.UserID), 10))
|
||||
sn.WriteString(strconv.FormatUint(uint64(userID), 10))
|
||||
sn.WriteString(",")
|
||||
sn.WriteString(strconv.FormatUint(uint64(peer.TorrentID), 10))
|
||||
sn.WriteString(strconv.FormatUint(uint64(torrentID), 10))
|
||||
sn.WriteString(",")
|
||||
sn.WriteString(strconv.FormatInt(now, 10))
|
||||
sn.WriteString(")")
|
||||
|
|
|
@ -44,17 +44,13 @@ func TestSerializer(t *testing.T) {
|
|||
testUsers["mUztWMpBYNCqzmge6vGeEUGSrctJbgpQ"] = testUser
|
||||
|
||||
testPeer := &cdb.Peer{
|
||||
UserID: 12,
|
||||
TorrentID: 10,
|
||||
ClientID: 4,
|
||||
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
|
||||
StartTime: time.Now().Unix(),
|
||||
LastAnnounce: time.Now().Unix(),
|
||||
Seeding: true,
|
||||
Left: 0,
|
||||
Uploaded: 100,
|
||||
Downloaded: 1000,
|
||||
ID: cdb.PeerIDFromRawString("12-10-2130706433-4"),
|
||||
ClientID: 4,
|
||||
Addr: cdb.NewPeerAddressFromIPPort(net.IP{127, 0, 0, 1}, 63448),
|
||||
StartTime: time.Now().Unix(),
|
||||
Seeding: true,
|
||||
Left: 0,
|
||||
Uploaded: 100,
|
||||
Downloaded: 1000,
|
||||
}
|
||||
|
||||
testTorrentHash := cdb.TorrentHash{
|
||||
|
|
|
@ -56,23 +56,35 @@ func (k PeerKey) PeerID() (id PeerID) {
|
|||
|
||||
//goland:noinspection GoMixedReceiverTypes
|
||||
func (k PeerKey) MarshalText() ([]byte, error) {
|
||||
var buf [(4 + 20) * 2]byte
|
||||
peerIDBuffer, err := k.PeerID().MarshalText()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hex.Encode(buf[:], k[:])
|
||||
|
||||
return buf[:], nil
|
||||
return []byte(strconv.FormatUint(uint64(k.ID()), 10) + "-" + string(peerIDBuffer)), nil
|
||||
}
|
||||
|
||||
//goland:noinspection GoMixedReceiverTypes
|
||||
func (k *PeerKey) UnmarshalText(b []byte) error {
|
||||
if len(b) != (4+20)*2 {
|
||||
s := bytes.Split(b, []byte{'-'})
|
||||
if len(s) != 2 {
|
||||
return errWrongPeerKeySize
|
||||
}
|
||||
|
||||
if _, err := hex.Decode(k[:], b[:]); err != nil {
|
||||
n, err := strconv.ParseUint(string(s[0]), 10, 32)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var peerID PeerID
|
||||
|
||||
if err = peerID.UnmarshalText(s[1]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
binary.LittleEndian.PutUint32(k[:], uint32(n))
|
||||
copy(k[4:], peerID[:])
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -206,34 +218,41 @@ func (a *PeerAddress) UnmarshalText(b []byte) error {
|
|||
}
|
||||
|
||||
// Peer
|
||||
// Theoretical min layout size: 6 + 8 + 8 + 8 + 8 + 8 + 8 + 4 + 4 + 6 + 2 + 1 = 71 bytes
|
||||
// Current layout size go1.20.4: 80 bytes via unsafe.Sizeof(Peer{})
|
||||
// Theoretical min layout size: 45 bytes
|
||||
// Current layout size go1.20.4: 48 bytes via unsafe.Sizeof(Peer{})
|
||||
type Peer struct {
|
||||
Addr PeerAddress
|
||||
|
||||
Uploaded uint64
|
||||
Downloaded uint64
|
||||
Left uint64
|
||||
|
||||
StartTime int64 // unix time
|
||||
LastAnnounce int64
|
||||
StartTime int64 // unix time
|
||||
// RelativeLastAnnounce This is the difference of absolute value relative to StartTime
|
||||
// To get the absolute value use LastAnnounce()
|
||||
RelativeLastAnnounce int32
|
||||
|
||||
TorrentID uint32
|
||||
UserID uint32
|
||||
|
||||
// ID placed here so in-memory layout is smaller
|
||||
ID PeerID
|
||||
Addr PeerAddress
|
||||
|
||||
ClientID uint16
|
||||
|
||||
Seeding bool
|
||||
}
|
||||
|
||||
func (p *Peer) SetLastAnnounce(lastAnnounce int64) {
|
||||
p.RelativeLastAnnounce = int32(lastAnnounce - p.StartTime)
|
||||
}
|
||||
|
||||
func (p *Peer) LastAnnounce() int64 {
|
||||
return p.StartTime + int64(p.RelativeLastAnnounce)
|
||||
}
|
||||
|
||||
var errInvalidAddrLength = errors.New("invalid Addr length")
|
||||
|
||||
func (p *Peer) Load(version uint64, reader readerAndByteReader) (err error) {
|
||||
if _, err = io.ReadFull(reader, p.ID[:]); err != nil {
|
||||
return err
|
||||
if version <= 3 {
|
||||
var peerID PeerID
|
||||
if _, err = io.ReadFull(reader, peerID[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if version == 1 {
|
||||
|
@ -297,16 +316,29 @@ func (p *Peer) Load(version uint64, reader readerAndByteReader) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
if err = binary.Read(reader, binary.LittleEndian, &p.LastAnnounce); err != nil {
|
||||
return err
|
||||
}
|
||||
if version <= 3 {
|
||||
var (
|
||||
torrentID, userID uint32
|
||||
lastAnnounce int64
|
||||
)
|
||||
|
||||
if err = binary.Read(reader, binary.LittleEndian, &p.TorrentID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = binary.Read(reader, binary.LittleEndian, &lastAnnounce); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = binary.Read(reader, binary.LittleEndian, &p.UserID); err != nil {
|
||||
return err
|
||||
p.SetLastAnnounce(lastAnnounce)
|
||||
|
||||
if err = binary.Read(reader, binary.LittleEndian, &torrentID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = binary.Read(reader, binary.LittleEndian, &userID); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err = binary.Read(reader, binary.LittleEndian, &p.RelativeLastAnnounce); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = binary.Read(reader, binary.LittleEndian, &p.ClientID); err != nil {
|
||||
|
@ -318,15 +350,12 @@ func (p *Peer) Load(version uint64, reader readerAndByteReader) (err error) {
|
|||
|
||||
func (p *Peer) Append(preAllocatedBuffer []byte) (buf []byte) {
|
||||
buf = preAllocatedBuffer
|
||||
buf = append(buf, p.ID[:]...)
|
||||
buf = append(buf, p.Addr[:]...)
|
||||
buf = binary.LittleEndian.AppendUint64(buf, p.Uploaded)
|
||||
buf = binary.LittleEndian.AppendUint64(buf, p.Downloaded)
|
||||
buf = binary.LittleEndian.AppendUint64(buf, p.Left)
|
||||
buf = binary.LittleEndian.AppendUint64(buf, uint64(p.StartTime))
|
||||
buf = binary.LittleEndian.AppendUint64(buf, uint64(p.LastAnnounce))
|
||||
buf = binary.LittleEndian.AppendUint32(buf, p.TorrentID)
|
||||
buf = binary.LittleEndian.AppendUint32(buf, p.UserID)
|
||||
buf = binary.LittleEndian.AppendUint32(buf, uint32(p.RelativeLastAnnounce))
|
||||
buf = binary.LittleEndian.AppendUint16(buf, p.ClientID)
|
||||
|
||||
if p.Seeding {
|
||||
|
|
|
@ -256,7 +256,8 @@ func announce(
|
|||
event := qp.Params.Event
|
||||
completed := event == "completed"
|
||||
|
||||
peerKey := cdb.NewPeerKey(user.ID.Load(), cdb.PeerIDFromRawString(peerID))
|
||||
userID := user.ID.Load()
|
||||
peerKey := cdb.NewPeerKey(userID, cdb.PeerIDFromRawString(peerID))
|
||||
|
||||
// Take torrent peers lock to read/write on it to prevent race conditions
|
||||
if !torrent.TryLockWithContext(ctx) {
|
||||
|
@ -321,11 +322,8 @@ func announce(
|
|||
|
||||
// Update peer info/stats
|
||||
if newPeer {
|
||||
peer.ID = peerKey.PeerID()
|
||||
peer.UserID = user.ID.Load()
|
||||
peer.TorrentID = torrent.ID.Load()
|
||||
peer.StartTime = now
|
||||
peer.LastAnnounce = now
|
||||
peer.RelativeLastAnnounce = 0
|
||||
peer.Uploaded = uploaded
|
||||
peer.Downloaded = downloaded
|
||||
}
|
||||
|
@ -362,7 +360,7 @@ func announce(
|
|||
peer.Downloaded = downloaded
|
||||
peer.Left = left
|
||||
peer.Seeding = seeding
|
||||
deltaTime := now - peer.LastAnnounce
|
||||
deltaTime := now - peer.LastAnnounce()
|
||||
|
||||
if deltaTime > int64(peerInactivityInterval) {
|
||||
deltaTime = 0
|
||||
|
@ -370,14 +368,14 @@ func announce(
|
|||
|
||||
var deltaSeedTime int64
|
||||
if seeding {
|
||||
deltaSeedTime = now - peer.LastAnnounce
|
||||
deltaSeedTime = now - peer.LastAnnounce()
|
||||
}
|
||||
|
||||
if deltaSeedTime > int64(peerInactivityInterval) {
|
||||
deltaSeedTime = 0
|
||||
}
|
||||
|
||||
peer.LastAnnounce = now
|
||||
peer.SetLastAnnounce(now)
|
||||
/* Update torrent last_action only if announced action is seeding.
|
||||
This allows dead torrents without seeder but with leecher to be proeprly pruned */
|
||||
if seeding {
|
||||
|
@ -400,7 +398,7 @@ func announce(
|
|||
|
||||
active = false
|
||||
} else if completed {
|
||||
db.QueueSnatch(peer, now)
|
||||
db.QueueSnatch(userID, torrent.ID.Load(), now)
|
||||
deltaSnatch = 1
|
||||
}
|
||||
|
||||
|
@ -414,13 +412,15 @@ func announce(
|
|||
|
||||
// If the channels are already full, block until a flush occurs
|
||||
db.QueueTorrent(torrent, deltaSnatch)
|
||||
db.QueueTransferHistory(peer, rawDeltaUpload, rawDeltaDownload, deltaTime, deltaSeedTime, deltaSnatch, active)
|
||||
db.QueueTransferHistory(
|
||||
peer, userID, torrent.ID.Load(), rawDeltaUpload,
|
||||
rawDeltaDownload, deltaTime, deltaSeedTime, deltaSnatch, active)
|
||||
db.QueueUser(user, rawDeltaUpload, rawDeltaDownload, deltaUpload, deltaDownload)
|
||||
db.QueueTransferIP(peer, rawDeltaUpload, rawDeltaDownload)
|
||||
db.QueueTransferIP(peer, userID, torrent.ID.Load(), rawDeltaUpload, rawDeltaDownload)
|
||||
|
||||
go record.Record(
|
||||
peer.TorrentID,
|
||||
user.ID.Load(),
|
||||
torrent.ID.Load(),
|
||||
userID,
|
||||
ipAddr,
|
||||
port,
|
||||
event,
|
||||
|
@ -460,7 +460,12 @@ func announce(
|
|||
peerCount = util.Min(int(numWant), leechCount+seedCount-1)
|
||||
}
|
||||
|
||||
peersToSend := make([]*cdb.Peer, 0, peerCount)
|
||||
type peerToSend struct {
|
||||
Peer *cdb.Peer
|
||||
Key cdb.PeerKey
|
||||
}
|
||||
|
||||
peersToSend := make([]peerToSend, 0, peerCount)
|
||||
|
||||
/*
|
||||
* The iteration is already "random", so we don't need to randomize ourselves:
|
||||
|
@ -468,41 +473,54 @@ func announce(
|
|||
* - Each time you range over the map, it starts at a random offset into the map's elements
|
||||
*/
|
||||
if seeding {
|
||||
for _, leech := range torrent.Leechers {
|
||||
for k, leech := range torrent.Leechers {
|
||||
if len(peersToSend) >= int(numWant) {
|
||||
break
|
||||
}
|
||||
|
||||
if leech.UserID == peer.UserID {
|
||||
if k.ID() == userID {
|
||||
continue
|
||||
}
|
||||
|
||||
peersToSend = append(peersToSend, leech)
|
||||
peersToSend = append(peersToSend, peerToSend{
|
||||
Peer: leech,
|
||||
Key: k,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
/* Send only one peer per user. This is to ensure that users seeding at multiple locations don't end up
|
||||
exclusively acting as peers. */
|
||||
uniqueSeeders := make(map[uint32]*cdb.Peer)
|
||||
for _, seed := range torrent.Seeders {
|
||||
uniqueSeeders := make(map[uint32]struct{})
|
||||
for k, seed := range torrent.Seeders {
|
||||
if len(peersToSend) >= int(numWant) {
|
||||
break
|
||||
}
|
||||
if seed.UserID == peer.UserID {
|
||||
|
||||
seedUserID := k.ID()
|
||||
|
||||
if seedUserID == userID {
|
||||
continue
|
||||
}
|
||||
if _, exists = uniqueSeeders[seed.UserID]; !exists {
|
||||
uniqueSeeders[seed.UserID] = seed
|
||||
peersToSend = append(peersToSend, seed)
|
||||
|
||||
if _, exists = uniqueSeeders[seedUserID]; !exists {
|
||||
uniqueSeeders[seedUserID] = struct{}{}
|
||||
peersToSend = append(peersToSend, peerToSend{
|
||||
Peer: seed,
|
||||
Key: k,
|
||||
})
|
||||
}
|
||||
}
|
||||
for _, leech := range torrent.Leechers {
|
||||
for k, leech := range torrent.Leechers {
|
||||
if len(peersToSend) >= int(numWant) {
|
||||
break
|
||||
}
|
||||
if leech.UserID == peer.UserID {
|
||||
if k.ID() == userID {
|
||||
continue
|
||||
}
|
||||
peersToSend = append(peersToSend, leech)
|
||||
peersToSend = append(peersToSend, peerToSend{
|
||||
Peer: leech,
|
||||
Key: k,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -510,7 +528,7 @@ func announce(
|
|||
peerBuff := make([]byte, 0, len(peersToSend)*cdb.PeerAddressSize)
|
||||
|
||||
for _, other := range peersToSend {
|
||||
peerBuff = append(peerBuff, other.Addr[:]...)
|
||||
peerBuff = append(peerBuff, other.Peer.Addr[:]...)
|
||||
}
|
||||
|
||||
response["peers"] = peerBuff
|
||||
|
@ -518,10 +536,11 @@ func announce(
|
|||
peerList := make([]map[string]interface{}, len(peersToSend))
|
||||
for i, other := range peersToSend {
|
||||
peerMap := make(map[string]interface{})
|
||||
peerMap["ip"] = other.Addr.IPString()
|
||||
peerMap["port"] = other.Addr.Port()
|
||||
peerMap["ip"] = other.Peer.Addr.IPString()
|
||||
peerMap["port"] = other.Peer.Addr.Port()
|
||||
if !noPeerID {
|
||||
peerMap["peer id"] = other.ID[:]
|
||||
thisPeerID := other.Key.PeerID()
|
||||
peerMap["peer id"] = thisPeerID[:]
|
||||
}
|
||||
peerList[i] = peerMap
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue