390 lines
9 KiB
Go
390 lines
9 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"flag"
|
|
encode_utils "git.gammaspectra.live/S.O.N.G/Ignite/cli/encode-utils"
|
|
"git.gammaspectra.live/S.O.N.G/Ignite/utilities"
|
|
"github.com/shirou/gopsutil/cpu"
|
|
"github.com/shirou/gopsutil/host"
|
|
"github.com/shirou/gopsutil/load"
|
|
"github.com/shirou/gopsutil/mem"
|
|
"gopkg.in/yaml.v3"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"runtime"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
func main() {
|
|
maxJobs := flag.Uint64("max-jobs", 8, "Maximum number of active jobs")
|
|
authKeysStr := flag.String("authKeys", "", "Allowed auth keys, comma separated. Leave empty to disable")
|
|
listenAddr := flag.String("listen", "0.0.0.0:8383", "Listen address for the server")
|
|
flag.Parse()
|
|
|
|
serveMux := http.NewServeMux()
|
|
|
|
var authKeys []string
|
|
if strings.TrimSpace(*authKeysStr) != "" {
|
|
authKeys = strings.Split(*authKeysStr, ",")
|
|
}
|
|
|
|
writeHeaders := func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("x-encoder-max-jobs", strconv.FormatUint(*maxJobs, 10))
|
|
w.Header().Set("x-encoder-current-jobs", strconv.FormatUint(uint64(len(jobs)), 10))
|
|
}
|
|
|
|
serveMux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
|
|
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
var statusData encode_utils.StatusData
|
|
|
|
for _, e := range Encoders {
|
|
statusData.Encoders = append(statusData.Encoders, encode_utils.StatusEncoderData{
|
|
Name: e.Name,
|
|
Version: e.Version(),
|
|
})
|
|
}
|
|
|
|
cpuInfo, err := cpu.Info()
|
|
if err != nil {
|
|
//fallback
|
|
statusData.CPU = make([]cpu.InfoStat, runtime.NumCPU())
|
|
} else {
|
|
statusData.CPU = cpuInfo
|
|
}
|
|
|
|
hostInfo, err := host.Info()
|
|
if err == nil {
|
|
statusData.Host = hostInfo
|
|
}
|
|
|
|
loadInfo, err := load.Avg()
|
|
if err == nil {
|
|
statusData.Load = loadInfo
|
|
}
|
|
|
|
memInfo, err := mem.VirtualMemory()
|
|
if err == nil {
|
|
statusData.Memory = memInfo
|
|
}
|
|
|
|
data, err := json.Marshal(statusData)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
JobsMutex.Lock()
|
|
defer JobsMutex.Unlock()
|
|
writeHeaders(w, r)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10))
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(data)
|
|
})
|
|
|
|
serveMux.HandleFunc("/create", func(w http.ResponseWriter, r *http.Request) {
|
|
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
if r.Method != "POST" {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
buf, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
|
|
var cfg encode_utils.JobConfig
|
|
|
|
err = yaml.Unmarshal(buf, &cfg)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
|
|
if cfg.TimecodesV1 != "" {
|
|
cfg.Timecodes, err = utilities.ParseTimecodesV1(strings.NewReader(cfg.TimecodesV1))
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
}
|
|
|
|
if job := func() *Job {
|
|
JobsMutex.Lock()
|
|
defer JobsMutex.Unlock()
|
|
if uint64(len(jobs)) >= *maxJobs {
|
|
writeHeaders(w, r)
|
|
w.WriteHeader(http.StatusTooManyRequests)
|
|
return nil
|
|
}
|
|
|
|
job, err := CreateJob(cfg)
|
|
if err != nil {
|
|
writeHeaders(w, r)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return nil
|
|
}
|
|
|
|
writeHeaders(w, r)
|
|
return job
|
|
}(); job == nil {
|
|
return
|
|
} else {
|
|
job.Lock()
|
|
defer job.Unlock()
|
|
data, err := json.Marshal(job)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10))
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(data)
|
|
log.Printf("[job %s] created", job.Id)
|
|
return
|
|
}
|
|
})
|
|
|
|
serveMux.HandleFunc("/start", func(w http.ResponseWriter, r *http.Request) {
|
|
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
if r.Method != "POST" {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
query := r.URL.Query()
|
|
|
|
jobId := query.Get("jobId")
|
|
job := GetJob(JobId(jobId))
|
|
if job == nil {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|
|
if !job.TryLock() {
|
|
w.WriteHeader(http.StatusLocked)
|
|
return
|
|
}
|
|
defer job.Unlock()
|
|
|
|
reader, err := handleDecompress(r.Header.Get("Content-Encoding"), r.Body)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
defer r.Body.Close()
|
|
|
|
log.Printf("[job %s] started POST", job.Id)
|
|
encodeFromReader(reader, job, r.Header.Get("Content-Type"), w)
|
|
})
|
|
|
|
serveMux.HandleFunc("/startURL", func(w http.ResponseWriter, r *http.Request) {
|
|
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
if r.Method != "GET" {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
query := r.URL.Query()
|
|
|
|
jobId := query.Get("jobId")
|
|
job := GetJob(JobId(jobId))
|
|
if job == nil {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|
|
if !job.TryLock() {
|
|
w.WriteHeader(http.StatusLocked)
|
|
return
|
|
}
|
|
defer job.Unlock()
|
|
|
|
urlVal, err := url.Parse(query.Get("url"))
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
|
|
response, err := http.DefaultClient.Do(&http.Request{
|
|
Method: "GET",
|
|
URL: urlVal,
|
|
Header: http.Header{
|
|
"Accept-Encoding": {"gzip, deflate, bzip2, xz"},
|
|
},
|
|
})
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
defer io.ReadAll(response.Body)
|
|
defer response.Body.Close()
|
|
|
|
reader := response.Body
|
|
if response.Header.Get("Content-Encoding") != "" {
|
|
reader, err = handleDecompress(response.Header.Get("Content-Encoding"), reader)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
}
|
|
|
|
// Handle filenames
|
|
switch strings.ToLower(path.Ext(urlVal.Path)) {
|
|
case ".gz":
|
|
reader, err = handleDecompress("gz", reader)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
case ".bz2", ".bzip2":
|
|
reader, err = handleDecompress("bz2", reader)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
case ".xz":
|
|
reader, err = handleDecompress("xz", reader)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
}
|
|
|
|
fname := strings.TrimSuffix(urlVal.Path, path.Ext(urlVal.Path))
|
|
inputMimeType := response.Header.Get("Content-Type")
|
|
switch strings.ToLower(path.Ext(fname)) {
|
|
case ".y4m":
|
|
inputMimeType = "video/x-yuv4mpeg2"
|
|
case ".av1":
|
|
inputMimeType = "video/x-ivf"
|
|
case ".h264", ".x264":
|
|
inputMimeType = "video/h264"
|
|
}
|
|
|
|
log.Printf("[job %s] started URL", job.Id)
|
|
encodeFromReader(reader, job, inputMimeType, w)
|
|
})
|
|
|
|
serveMux.HandleFunc("/job", func(w http.ResponseWriter, r *http.Request) {
|
|
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
query := r.URL.Query()
|
|
|
|
jobId := query.Get("jobId")
|
|
job := GetJob(JobId(jobId))
|
|
if job == nil {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
var jobData struct {
|
|
Start uint64 `json:"start"`
|
|
Read uint64 `json:"in"`
|
|
Processed uint64 `json:"out"`
|
|
}
|
|
jobData.Start = job.Status.Start.Load()
|
|
jobData.Read = job.Status.Read.Load()
|
|
jobData.Processed = job.Status.Processed.Load()
|
|
|
|
data, err := json.Marshal(jobData)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10))
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(data)
|
|
})
|
|
|
|
serveMux.HandleFunc("/remove", func(w http.ResponseWriter, r *http.Request) {
|
|
if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
query := r.URL.Query()
|
|
|
|
jobId := query.Get("jobId")
|
|
job := GetJob(JobId(jobId))
|
|
if job == nil {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|
|
if !job.TryLock() {
|
|
w.WriteHeader(http.StatusLocked)
|
|
return
|
|
}
|
|
defer job.Unlock()
|
|
|
|
err := job.Close()
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
err = RemoveJob(job.Id)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
log.Printf("[job %s] removed", job.Id)
|
|
})
|
|
|
|
s := http.Server{
|
|
ReadTimeout: 0,
|
|
IdleTimeout: time.Second * 60,
|
|
WriteTimeout: 0,
|
|
Addr: *listenAddr,
|
|
Handler: serveMux,
|
|
}
|
|
s.SetKeepAlivesEnabled(true)
|
|
|
|
if err := s.ListenAndServe(); err != nil {
|
|
panic(err)
|
|
}
|
|
defer s.Close()
|
|
|
|
}
|