Ignite/cli/encode-server/main.go

390 lines
9 KiB
Go

package main
import (
"encoding/json"
"flag"
encode_utils "git.gammaspectra.live/S.O.N.G/Ignite/cli/encode-utils"
"git.gammaspectra.live/S.O.N.G/Ignite/utilities"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/host"
"github.com/shirou/gopsutil/load"
"github.com/shirou/gopsutil/mem"
"gopkg.in/yaml.v3"
"io"
"log"
"net/http"
"net/url"
"path"
"runtime"
"slices"
"strconv"
"strings"
"time"
)
func main() {
maxJobs := flag.Uint64("max-jobs", 8, "Maximum number of active jobs")
authKeysStr := flag.String("authKeys", "", "Allowed auth keys, comma separated. Leave empty to disable")
listenAddr := flag.String("listen", "0.0.0.0:8383", "Listen address for the server")
flag.Parse()
serveMux := http.NewServeMux()
var authKeys []string
if strings.TrimSpace(*authKeysStr) != "" {
authKeys = strings.Split(*authKeysStr, ",")
}
writeHeaders := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("x-encoder-max-jobs", strconv.FormatUint(*maxJobs, 10))
w.Header().Set("x-encoder-current-jobs", strconv.FormatUint(uint64(len(jobs)), 10))
}
serveMux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
w.WriteHeader(http.StatusForbidden)
return
}
var statusData encode_utils.StatusData
for _, e := range Encoders {
statusData.Encoders = append(statusData.Encoders, encode_utils.StatusEncoderData{
Name: e.Name,
Version: e.Version(),
})
}
cpuInfo, err := cpu.Info()
if err != nil {
//fallback
statusData.CPU = make([]cpu.InfoStat, runtime.NumCPU())
} else {
statusData.CPU = cpuInfo
}
hostInfo, err := host.Info()
if err == nil {
statusData.Host = hostInfo
}
loadInfo, err := load.Avg()
if err == nil {
statusData.Load = loadInfo
}
memInfo, err := mem.VirtualMemory()
if err == nil {
statusData.Memory = memInfo
}
data, err := json.Marshal(statusData)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
JobsMutex.Lock()
defer JobsMutex.Unlock()
writeHeaders(w, r)
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10))
w.WriteHeader(http.StatusOK)
_, _ = w.Write(data)
})
serveMux.HandleFunc("/create", func(w http.ResponseWriter, r *http.Request) {
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
w.WriteHeader(http.StatusForbidden)
return
}
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
buf, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
var cfg encode_utils.JobConfig
err = yaml.Unmarshal(buf, &cfg)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
if cfg.TimecodesV1 != "" {
cfg.Timecodes, err = utilities.ParseTimecodesV1(strings.NewReader(cfg.TimecodesV1))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
}
if job := func() *Job {
JobsMutex.Lock()
defer JobsMutex.Unlock()
if uint64(len(jobs)) >= *maxJobs {
writeHeaders(w, r)
w.WriteHeader(http.StatusTooManyRequests)
return nil
}
job, err := CreateJob(cfg)
if err != nil {
writeHeaders(w, r)
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return nil
}
writeHeaders(w, r)
return job
}(); job == nil {
return
} else {
job.Lock()
defer job.Unlock()
data, err := json.Marshal(job)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10))
w.WriteHeader(http.StatusOK)
_, _ = w.Write(data)
log.Printf("[job %s] created", job.Id)
return
}
})
serveMux.HandleFunc("/start", func(w http.ResponseWriter, r *http.Request) {
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
w.WriteHeader(http.StatusForbidden)
return
}
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
query := r.URL.Query()
jobId := query.Get("jobId")
job := GetJob(JobId(jobId))
if job == nil {
w.WriteHeader(http.StatusNotFound)
return
}
if !job.TryLock() {
w.WriteHeader(http.StatusLocked)
return
}
defer job.Unlock()
reader, err := handleDecompress(r.Header.Get("Content-Encoding"), r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
defer r.Body.Close()
log.Printf("[job %s] started POST", job.Id)
encodeFromReader(reader, job, r.Header.Get("Content-Type"), w)
})
serveMux.HandleFunc("/startURL", func(w http.ResponseWriter, r *http.Request) {
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
w.WriteHeader(http.StatusForbidden)
return
}
if r.Method != "GET" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
query := r.URL.Query()
jobId := query.Get("jobId")
job := GetJob(JobId(jobId))
if job == nil {
w.WriteHeader(http.StatusNotFound)
return
}
if !job.TryLock() {
w.WriteHeader(http.StatusLocked)
return
}
defer job.Unlock()
urlVal, err := url.Parse(query.Get("url"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
response, err := http.DefaultClient.Do(&http.Request{
Method: "GET",
URL: urlVal,
Header: http.Header{
"Accept-Encoding": {"gzip, deflate, bzip2, xz"},
},
})
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
defer io.ReadAll(response.Body)
defer response.Body.Close()
reader := response.Body
if response.Header.Get("Content-Encoding") != "" {
reader, err = handleDecompress(response.Header.Get("Content-Encoding"), reader)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
}
// Handle filenames
switch strings.ToLower(path.Ext(urlVal.Path)) {
case ".gz":
reader, err = handleDecompress("gz", reader)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
case ".bz2", ".bzip2":
reader, err = handleDecompress("bz2", reader)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
case ".xz":
reader, err = handleDecompress("xz", reader)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
}
fname := strings.TrimSuffix(urlVal.Path, path.Ext(urlVal.Path))
inputMimeType := response.Header.Get("Content-Type")
switch strings.ToLower(path.Ext(fname)) {
case ".y4m":
inputMimeType = "video/x-yuv4mpeg2"
case ".av1":
inputMimeType = "video/x-ivf"
case ".h264", ".x264":
inputMimeType = "video/h264"
}
log.Printf("[job %s] started URL", job.Id)
encodeFromReader(reader, job, inputMimeType, w)
})
serveMux.HandleFunc("/job", func(w http.ResponseWriter, r *http.Request) {
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
w.WriteHeader(http.StatusForbidden)
return
}
query := r.URL.Query()
jobId := query.Get("jobId")
job := GetJob(JobId(jobId))
if job == nil {
w.WriteHeader(http.StatusNotFound)
return
}
var jobData struct {
Start uint64 `json:"start"`
Read uint64 `json:"in"`
Processed uint64 `json:"out"`
}
jobData.Start = job.Status.Start.Load()
jobData.Read = job.Status.Read.Load()
jobData.Processed = job.Status.Processed.Load()
data, err := json.Marshal(jobData)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10))
w.WriteHeader(http.StatusOK)
_, _ = w.Write(data)
})
serveMux.HandleFunc("/remove", func(w http.ResponseWriter, r *http.Request) {
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
w.WriteHeader(http.StatusForbidden)
return
}
query := r.URL.Query()
jobId := query.Get("jobId")
job := GetJob(JobId(jobId))
if job == nil {
w.WriteHeader(http.StatusNotFound)
return
}
if !job.TryLock() {
w.WriteHeader(http.StatusLocked)
return
}
defer job.Unlock()
err := job.Close()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
err = RemoveJob(job.Id)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
log.Printf("[job %s] removed", job.Id)
})
s := http.Server{
ReadTimeout: 0,
IdleTimeout: time.Second * 60,
WriteTimeout: 0,
Addr: *listenAddr,
Handler: serveMux,
}
s.SetKeepAlivesEnabled(true)
if err := s.ListenAndServe(); err != nil {
panic(err)
}
defer s.Close()
}