package main import ( "encoding/json" "flag" encode_utils "git.gammaspectra.live/S.O.N.G/Ignite/cli/encode-utils" "git.gammaspectra.live/S.O.N.G/Ignite/utilities" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/load" "github.com/shirou/gopsutil/mem" "gopkg.in/yaml.v3" "io" "log" "net/http" "net/url" "path" "runtime" "slices" "strconv" "strings" "time" ) func main() { maxJobs := flag.Uint64("max-jobs", 8, "Maximum number of active jobs") authKeysStr := flag.String("authKeys", "", "Allowed auth keys, comma separated. Leave empty to disable") listenAddr := flag.String("listen", "0.0.0.0:8383", "Listen address for the server") flag.Parse() serveMux := http.NewServeMux() var authKeys []string if strings.TrimSpace(*authKeysStr) != "" { authKeys = strings.Split(*authKeysStr, ",") } writeHeaders := func(w http.ResponseWriter, r *http.Request) { w.Header().Set("x-encoder-max-jobs", strconv.FormatUint(*maxJobs, 10)) w.Header().Set("x-encoder-current-jobs", strconv.FormatUint(uint64(len(jobs)), 10)) } serveMux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) { w.WriteHeader(http.StatusForbidden) return } var statusData encode_utils.StatusData for _, e := range Encoders { statusData.Encoders = append(statusData.Encoders, encode_utils.StatusEncoderData{ Name: e.Name, Version: e.Version(), }) } cpuInfo, err := cpu.Info() if err != nil { //fallback statusData.CPU = make([]cpu.InfoStat, runtime.NumCPU()) } else { statusData.CPU = cpuInfo } hostInfo, err := host.Info() if err == nil { statusData.Host = hostInfo } loadInfo, err := load.Avg() if err == nil { statusData.Load = loadInfo } memInfo, err := mem.VirtualMemory() if err == nil { statusData.Memory = memInfo } data, err := json.Marshal(statusData) if err != nil { w.WriteHeader(http.StatusBadRequest) return } JobsMutex.Lock() defer JobsMutex.Unlock() writeHeaders(w, r) w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10)) w.WriteHeader(http.StatusOK) _, _ = w.Write(data) }) serveMux.HandleFunc("/create", func(w http.ResponseWriter, r *http.Request) { if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) { w.WriteHeader(http.StatusForbidden) return } if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } buf, err := io.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } var cfg encode_utils.JobConfig err = yaml.Unmarshal(buf, &cfg) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } if cfg.TimecodesV1 != "" { cfg.Timecodes, err = utilities.ParseTimecodesV1(strings.NewReader(cfg.TimecodesV1)) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } } if job := func() *Job { JobsMutex.Lock() defer JobsMutex.Unlock() if uint64(len(jobs)) >= *maxJobs { writeHeaders(w, r) w.WriteHeader(http.StatusTooManyRequests) return nil } job, err := CreateJob(cfg) if err != nil { writeHeaders(w, r) w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return nil } writeHeaders(w, r) return job }(); job == nil { return } else { job.Lock() defer job.Unlock() data, err := json.Marshal(job) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10)) w.WriteHeader(http.StatusOK) _, _ = w.Write(data) log.Printf("[job %s] created", job.Id) return } }) serveMux.HandleFunc("/start", func(w http.ResponseWriter, r *http.Request) { if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) { w.WriteHeader(http.StatusForbidden) return } if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } query := r.URL.Query() jobId := query.Get("jobId") job := GetJob(JobId(jobId)) if job == nil { w.WriteHeader(http.StatusNotFound) return } if !job.TryLock() { w.WriteHeader(http.StatusLocked) return } defer job.Unlock() reader, err := handleDecompress(r.Header.Get("Content-Encoding"), r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } defer r.Body.Close() log.Printf("[job %s] started POST", job.Id) encodeFromReader(reader, job, r.Header.Get("Content-Type"), w) }) serveMux.HandleFunc("/startURL", func(w http.ResponseWriter, r *http.Request) { if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) { w.WriteHeader(http.StatusForbidden) return } if r.Method != "GET" { w.WriteHeader(http.StatusMethodNotAllowed) return } query := r.URL.Query() jobId := query.Get("jobId") job := GetJob(JobId(jobId)) if job == nil { w.WriteHeader(http.StatusNotFound) return } if !job.TryLock() { w.WriteHeader(http.StatusLocked) return } defer job.Unlock() urlVal, err := url.Parse(query.Get("url")) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } response, err := http.DefaultClient.Do(&http.Request{ Method: "GET", URL: urlVal, Header: http.Header{ "Accept-Encoding": {"gzip, deflate, bzip2, xz"}, }, }) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } defer io.ReadAll(response.Body) defer response.Body.Close() reader := response.Body if response.Header.Get("Content-Encoding") != "" { reader, err = handleDecompress(response.Header.Get("Content-Encoding"), reader) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } } // Handle filenames switch strings.ToLower(path.Ext(urlVal.Path)) { case ".gz": reader, err = handleDecompress("gz", reader) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } case ".bz2", ".bzip2": reader, err = handleDecompress("bz2", reader) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } case ".xz": reader, err = handleDecompress("xz", reader) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } } fname := strings.TrimSuffix(urlVal.Path, path.Ext(urlVal.Path)) inputMimeType := response.Header.Get("Content-Type") switch strings.ToLower(path.Ext(fname)) { case ".y4m": inputMimeType = "video/x-yuv4mpeg2" case ".av1": inputMimeType = "video/x-ivf" case ".h264", ".x264": inputMimeType = "video/h264" } log.Printf("[job %s] started URL", job.Id) encodeFromReader(reader, job, inputMimeType, w) }) serveMux.HandleFunc("/job", func(w http.ResponseWriter, r *http.Request) { if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) { w.WriteHeader(http.StatusForbidden) return } query := r.URL.Query() jobId := query.Get("jobId") job := GetJob(JobId(jobId)) if job == nil { w.WriteHeader(http.StatusNotFound) return } var jobData struct { Start uint64 `json:"start"` Read uint64 `json:"in"` Processed uint64 `json:"out"` } jobData.Start = job.Status.Start.Load() jobData.Read = job.Status.Read.Load() jobData.Processed = job.Status.Processed.Load() data, err := json.Marshal(jobData) if err != nil { w.WriteHeader(http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10)) w.WriteHeader(http.StatusOK) _, _ = w.Write(data) }) serveMux.HandleFunc("/remove", func(w http.ResponseWriter, r *http.Request) { if len(authKeys) > 0 && !slices.Contains(authKeys, r.URL.Query().Get("k")) { w.WriteHeader(http.StatusForbidden) return } query := r.URL.Query() jobId := query.Get("jobId") job := GetJob(JobId(jobId)) if job == nil { w.WriteHeader(http.StatusNotFound) return } if !job.TryLock() { w.WriteHeader(http.StatusLocked) return } defer job.Unlock() err := job.Close() if err != nil { w.WriteHeader(http.StatusBadRequest) return } err = RemoveJob(job.Id) if err != nil { w.WriteHeader(http.StatusBadRequest) return } log.Printf("[job %s] removed", job.Id) }) s := http.Server{ ReadTimeout: 0, IdleTimeout: time.Second * 60, WriteTimeout: 0, Addr: *listenAddr, Handler: serveMux, } s.SetKeepAlivesEnabled(true) if err := s.ListenAndServe(); err != nil { panic(err) } defer s.Close() }