pgo-collector/collector.go

141 lines
4.3 KiB
Go

package main
import (
"flag"
"fmt"
pprofile "github.com/google/pprof/profile"
"io"
"log"
"net/http"
"os"
"path"
"slices"
"strings"
"sync"
"time"
)
type stringList []string
func (i *stringList) String() string {
return strings.Join(*i, ", ")
}
func (i *stringList) Set(value string) error {
*i = append(*i, value)
return nil
}
func main() {
profileInterval := flag.Duration("profile-interval", time.Hour, "Profile every given duration")
profileDuration := flag.Duration("profile-duration", time.Second*30, "Profile each time for this given duration")
var endpoints stringList
flag.Var(&endpoints, "endpoint", "Endpoint where a pprof HTTP API is exposed at, for example, http://127.0.0.1:6060. Can specify multiple")
profileDirectory := flag.String("profile-directory", "/tmp", "Place where to dump generated merged profiles")
profileName := flag.String("profile-name", "pgo-profile-latest.pprof", "Prefix to give latest generated file")
flag.Parse()
log.Printf("Profiling for %s every %s", *profileDuration, *profileInterval)
var compactProfile *pprofile.Profile
for range time.Tick(*profileInterval) {
var wg sync.WaitGroup
profiles := make([]*pprofile.Profile, len(endpoints))
for i, endpoint := range endpoints {
wg.Add(1)
go func(i int, endpoint string) {
defer wg.Done()
log.Printf("attempting profile collection on %s", endpoint)
if response, err := http.Get(fmt.Sprintf("%s/debug/pprof/profile?seconds=%d", endpoint, uint64(profileDuration.Seconds()))); err != nil {
log.Printf("error collecting profile on %s: %s", endpoint, err)
return
} else {
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
if response.Header.Get("X-Go-Pprof") != "" && strings.Contains(response.Header.Get("Content-Type"), "text/plain") {
// error is from pprof endpoint
if body, err := io.ReadAll(response.Body); err == nil {
log.Printf("error collecting profile on %s: got %d %s - %s", endpoint, response.StatusCode, response.Status, string(body))
return
}
}
log.Printf("error collecting profile on %s: expected status %d, got %d %s", endpoint, http.StatusOK, response.StatusCode, response.Status)
return
} else if profile, err := pprofile.Parse(response.Body); err != nil {
log.Printf("error collecting profile on %s while reading: %s", endpoint, err)
return
} else {
profiles[i] = profile
}
}
}(i, endpoint)
}
wg.Wait()
//remove any errored ones
for i := len(profiles) - 1; i >= 0; i-- {
if profiles[i] == nil {
profiles = slices.Delete(profiles, i, i+1)
}
}
if mergedProfile, err := pprofile.Merge(profiles); err != nil {
log.Printf("could not merge profiles: %s", err)
} else {
if compactProfile != nil {
if mergedWithPreviousProfile, err := pprofile.Merge([]*pprofile.Profile{mergedProfile, compactProfile}); err != nil {
log.Printf("could not merge with previous profile: %s", err)
//maybe redundant
compactProfile = mergedProfile.Compact()
} else {
//maybe redundant
compactProfile = mergedWithPreviousProfile.Compact()
}
} else {
//maybe redundant
compactProfile = mergedProfile.Compact()
}
func() {
temporaryFile, err := os.CreateTemp("/tmp", "pgo-*")
if err != nil {
log.Printf("error opening temporary output profile: %s", err)
} else {
defer os.Remove(temporaryFile.Name())
func() {
defer temporaryFile.Sync()
if err = compactProfile.Write(temporaryFile); err != nil {
log.Printf("error writing temporary output profile: %s", err)
}
}()
//copies to latest atomically
log.Printf("writing profile at %d", time.Now().UTC().Unix())
if err := func() error {
if _, err = temporaryFile.Seek(0, io.SeekStart); err != nil {
return err
}
w, err := os.Create(path.Join(*profileDirectory, *profileName+".tmp"))
if err != nil {
return err
}
defer w.Close()
if _, err := io.Copy(w, temporaryFile); err != nil {
return err
}
return nil
}(); err != nil {
log.Printf("error writing latest profile: %s", err)
} else {
_ = os.Rename(path.Join(*profileDirectory, *profileName+".tmp"), path.Join(*profileDirectory, *profileName))
}
}
}()
}
}
}