Ignite/cli/encode-pool/main.go

289 lines
6.7 KiB
Go

package main
import (
"crypto/sha256"
"crypto/tls"
"encoding/base64"
"encoding/json"
"flag"
encode_utils "git.gammaspectra.live/S.O.N.G/Ignite/cli/encode-utils"
"gopkg.in/yaml.v3"
"io"
"net/http"
"net/url"
"os"
"slices"
"strconv"
"sync"
"sync/atomic"
"time"
)
var DefaultClient = &http.Client{
Timeout: time.Second * 5,
}
var InsecureDefaultClient = &http.Client{
Timeout: time.Second * 5,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
type ServerConfig struct {
URL string `yaml:"url"`
Key string `yaml:"key"`
InsecureSSL bool `yaml:"insecure_ssl"`
lastStatus atomic.Pointer[encode_utils.StatusData]
nextStatus atomic.Int64
}
func (s *ServerConfig) Status() *encode_utils.StatusData {
if time.Unix(s.nextStatus.Load(), 0).Compare(time.Now()) < 0 {
defer func() {
s.nextStatus.Store(time.Now().Add(time.Second * 30).Unix())
}()
u, _ := url.Parse(s.URL + "/status?k=" + s.Key)
response, err := s.Do(&http.Request{
Method: "GET",
URL: u,
})
if err != nil {
return s.lastStatus.Load()
}
defer response.Body.Close()
dataBuf, err := io.ReadAll(response.Body)
if err != nil {
return s.lastStatus.Load()
}
var status encode_utils.StatusData
err = json.Unmarshal(dataBuf, &status)
if err != nil {
return s.lastStatus.Load()
}
s.lastStatus.Store(&status)
}
return s.lastStatus.Load()
}
func (s *ServerConfig) Do(r *http.Request) (*http.Response, error) {
if s.InsecureSSL {
return InsecureDefaultClient.Do(r)
} else {
return DefaultClient.Do(r)
}
}
func (s *ServerConfig) Pass(r *http.Request, replaceValues ...[2]string) (*http.Response, error) {
urlPath := r.URL
newUrl := s.URL + urlPath.String()
urlPath, _ = url.Parse(newUrl)
q := urlPath.Query()
for _, v := range replaceValues {
q.Set(v[0], v[1])
}
urlPath.RawQuery = q.Encode()
return s.Do(&http.Request{
Method: r.Method,
URL: urlPath,
Header: r.Header,
Body: r.Body,
})
}
func (s *ServerConfig) Redirect(w http.ResponseWriter, r *http.Request, replaceValues ...[2]string) {
urlPath := r.URL
newUrl := s.URL + urlPath.String()
urlPath, _ = url.Parse(newUrl)
q := urlPath.Query()
for _, v := range replaceValues {
q.Set(v[0], v[1])
}
urlPath.RawQuery = q.Encode()
http.Redirect(w, r, urlPath.String(), http.StatusTemporaryRedirect)
}
type PoolConfig struct {
Servers []*ServerConfig `yaml:"servers"`
}
var poolConfig PoolConfig
var jobMutex sync.Mutex
type Job struct {
Id string `json:"id"`
Config encode_utils.JobConfig `json:"config"`
ServerId string `json:"server_id"`
Server *ServerConfig `json:"server"`
}
var jobs []*Job
func main() {
configPath := flag.String("config", "pool.yml", "Configuration path for server pool")
listenAddr := flag.String("listen", "0.0.0.0:8484", "Listen address for the server")
flag.Parse()
fData, err := os.ReadFile(*configPath)
if err != nil {
panic(err)
}
err = yaml.Unmarshal(fData, &poolConfig)
if err != nil {
panic(err)
}
serveMux := http.NewServeMux()
serveMux.HandleFunc("/create", func(w http.ResponseWriter, r *http.Request) {
for {
for _, s := range poolConfig.Servers {
if job := func(s *ServerConfig) *Job {
response, err := s.Pass(r, [2]string{"k", s.Key})
if err != nil {
return nil
}
defer response.Body.Close()
defer io.ReadAll(response.Body)
if response.StatusCode == http.StatusOK {
var job struct {
Id string `json:"id"`
Config encode_utils.JobConfig `json:"config"`
}
dataBuf, err := io.ReadAll(response.Body)
if err != nil {
return nil
}
err = json.Unmarshal(dataBuf, &job)
if err != nil {
return nil
}
jobId := sha256.Sum256(append([]byte(job.Id), s.URL...))
return &Job{
Id: base64.RawURLEncoding.EncodeToString(jobId[:]),
Config: job.Config,
ServerId: job.Id,
Server: s,
}
}
return nil
}(s); job != nil {
jobMutex.Lock()
defer jobMutex.Unlock()
jobs = append(jobs, job)
data, err := json.Marshal(job)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10))
w.WriteHeader(http.StatusOK)
_, _ = w.Write(data)
return
}
}
// Check every few seconds for any space for new jobs
// TODO: do this globally aware + pool via channels
time.Sleep(time.Second * 30)
}
})
serveMux.HandleFunc("/start", func(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
jobId := query.Get("jobId")
jobMutex.Lock()
defer jobMutex.Unlock()
if i := slices.IndexFunc(jobs, func(job *Job) bool {
return job.Id == jobId
}); i == -1 {
w.WriteHeader(http.StatusNotFound)
return
} else {
job := jobs[i]
job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId})
}
})
serveMux.HandleFunc("/startURL", func(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
jobId := query.Get("jobId")
jobMutex.Lock()
defer jobMutex.Unlock()
if i := slices.IndexFunc(jobs, func(job *Job) bool {
return job.Id == jobId
}); i == -1 {
w.WriteHeader(http.StatusNotFound)
return
} else {
job := jobs[i]
job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId})
}
})
serveMux.HandleFunc("/job", func(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
jobId := query.Get("jobId")
jobMutex.Lock()
defer jobMutex.Unlock()
if i := slices.IndexFunc(jobs, func(job *Job) bool {
return job.Id == jobId
}); i == -1 {
w.WriteHeader(http.StatusNotFound)
return
} else {
job := jobs[i]
job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId})
}
})
serveMux.HandleFunc("/remove", func(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
jobId := query.Get("jobId")
jobMutex.Lock()
defer jobMutex.Unlock()
if i := slices.IndexFunc(jobs, func(job *Job) bool {
return job.Id == jobId
}); i == -1 {
w.WriteHeader(http.StatusNotFound)
return
} else {
jobs = slices.Delete(jobs, i, i+1)
job := jobs[i]
job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId})
}
})
s := http.Server{
ReadTimeout: time.Second * 10,
IdleTimeout: time.Second * 60,
WriteTimeout: 0,
Addr: *listenAddr,
Handler: serveMux,
}
s.SetKeepAlivesEnabled(true)
if err := s.ListenAndServe(); err != nil {
panic(err)
}
defer s.Close()
}