Implemented encode server pool
Some checks are pending
continuous-integration/drone/push Build is running
Some checks are pending
continuous-integration/drone/push Build is running
This commit is contained in:
parent
241ee4164e
commit
f2290a17b4
270
cli/encode-pool/main.go
Normal file
270
cli/encode-pool/main.go
Normal file
|
@ -0,0 +1,270 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
encode_utils "git.gammaspectra.live/S.O.N.G/Ignite/cli/encode-utils"
|
||||
"gopkg.in/yaml.v3"
|
||||
"io"
|
||||
"maps"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var DefaultClient = &http.Client{
|
||||
Timeout: time.Second * 5,
|
||||
}
|
||||
|
||||
var InsecureDefaultClient = &http.Client{
|
||||
Timeout: time.Second * 5,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
},
|
||||
}
|
||||
|
||||
type ServerConfig struct {
|
||||
URL string `yaml:"url"`
|
||||
Key string `yaml:"key"`
|
||||
InsecureSSL bool `yaml:"insecure_ssl"`
|
||||
}
|
||||
|
||||
func (s *ServerConfig) Pass(r *http.Request, replaceValues ...[2]string) (*http.Response, error) {
|
||||
urlPath := r.URL
|
||||
|
||||
newUrl := s.URL + urlPath.String()
|
||||
urlPath, _ = url.Parse(newUrl)
|
||||
q := urlPath.Query()
|
||||
|
||||
for _, v := range replaceValues {
|
||||
q.Set(v[0], v[1])
|
||||
}
|
||||
urlPath.RawQuery = q.Encode()
|
||||
|
||||
if s.InsecureSSL {
|
||||
return InsecureDefaultClient.Do(&http.Request{
|
||||
Method: r.Method,
|
||||
URL: urlPath,
|
||||
Header: r.Header,
|
||||
Body: r.Body,
|
||||
})
|
||||
} else {
|
||||
return DefaultClient.Do(&http.Request{
|
||||
Method: r.Method,
|
||||
URL: urlPath,
|
||||
Header: r.Header,
|
||||
Body: r.Body,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ServerConfig) Redirect(w http.ResponseWriter, r *http.Request, replaceValues ...[2]string) {
|
||||
urlPath := r.URL
|
||||
|
||||
newUrl := s.URL + urlPath.String()
|
||||
urlPath, _ = url.Parse(newUrl)
|
||||
q := urlPath.Query()
|
||||
|
||||
for _, v := range replaceValues {
|
||||
q.Set(v[0], v[1])
|
||||
}
|
||||
urlPath.RawQuery = q.Encode()
|
||||
|
||||
http.Redirect(w, r, urlPath.String(), http.StatusTemporaryRedirect)
|
||||
}
|
||||
|
||||
type PoolConfig struct {
|
||||
Servers []*ServerConfig `yaml:"servers"`
|
||||
}
|
||||
|
||||
var poolConfig PoolConfig
|
||||
|
||||
var jobMutex sync.Mutex
|
||||
|
||||
type Job struct {
|
||||
Id string `json:"id"`
|
||||
Config encode_utils.JobConfig `json:"config"`
|
||||
ServerId string `json:"server_id"`
|
||||
Server *ServerConfig `json:"server"`
|
||||
}
|
||||
|
||||
var jobs []*Job
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "pool.yml", "Configuration path for server pool")
|
||||
listenAddr := flag.String("listen", "0.0.0.0:8484", "Listen address for the server")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
fData, err := os.ReadFile(*configPath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = yaml.Unmarshal(fData, &poolConfig)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
serveMux := http.NewServeMux()
|
||||
|
||||
serveMux.HandleFunc("/create", func(w http.ResponseWriter, r *http.Request) {
|
||||
for {
|
||||
for _, s := range poolConfig.Servers {
|
||||
if job := func(s *ServerConfig) *Job {
|
||||
response, err := s.Pass(r, [2]string{"k", s.Key})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer response.Body.Close()
|
||||
defer io.ReadAll(response.Body)
|
||||
if response.StatusCode == http.StatusOK {
|
||||
var job struct {
|
||||
Id string `json:"id"`
|
||||
Config encode_utils.JobConfig `json:"config"`
|
||||
}
|
||||
dataBuf, err := io.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
err = json.Unmarshal(dataBuf, &job)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
jobId := sha256.Sum256(append([]byte(job.Id), s.URL...))
|
||||
return &Job{
|
||||
Id: base64.RawURLEncoding.EncodeToString(jobId[:]),
|
||||
Config: job.Config,
|
||||
ServerId: job.Id,
|
||||
Server: s,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(s); job != nil {
|
||||
jobMutex.Lock()
|
||||
defer jobMutex.Unlock()
|
||||
jobs = append(jobs, job)
|
||||
data, err := json.Marshal(job)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
_, _ = w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write(data)
|
||||
return
|
||||
}
|
||||
}
|
||||
// Check every few seconds for any space for new jobs
|
||||
// TODO: do this globally aware + pool via channels
|
||||
time.Sleep(time.Second * 30)
|
||||
}
|
||||
})
|
||||
|
||||
serveMux.HandleFunc("/start", func(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
|
||||
jobId := query.Get("jobId")
|
||||
|
||||
jobMutex.Lock()
|
||||
defer jobMutex.Unlock()
|
||||
if i := slices.IndexFunc(jobs, func(job *Job) bool {
|
||||
return job.Id == jobId
|
||||
}); i == -1 {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
} else {
|
||||
job := jobs[i]
|
||||
job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId})
|
||||
}
|
||||
})
|
||||
|
||||
serveMux.HandleFunc("/startURL", func(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
|
||||
jobId := query.Get("jobId")
|
||||
|
||||
jobMutex.Lock()
|
||||
defer jobMutex.Unlock()
|
||||
if i := slices.IndexFunc(jobs, func(job *Job) bool {
|
||||
return job.Id == jobId
|
||||
}); i == -1 {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
} else {
|
||||
job := jobs[i]
|
||||
job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId})
|
||||
}
|
||||
})
|
||||
|
||||
serveMux.HandleFunc("/job", func(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
|
||||
jobId := query.Get("jobId")
|
||||
|
||||
jobMutex.Lock()
|
||||
defer jobMutex.Unlock()
|
||||
if i := slices.IndexFunc(jobs, func(job *Job) bool {
|
||||
return job.Id == jobId
|
||||
}); i == -1 {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
} else {
|
||||
job := jobs[i]
|
||||
job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId})
|
||||
}
|
||||
})
|
||||
|
||||
serveMux.HandleFunc("/remove", func(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
|
||||
jobId := query.Get("jobId")
|
||||
|
||||
jobMutex.Lock()
|
||||
defer jobMutex.Unlock()
|
||||
if i := slices.IndexFunc(jobs, func(job *Job) bool {
|
||||
return job.Id == jobId
|
||||
}); i == -1 {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
} else {
|
||||
job := jobs[i]
|
||||
response, err := job.Server.Pass(r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId})
|
||||
defer response.Body.Close()
|
||||
defer io.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if response.StatusCode == http.StatusOK {
|
||||
jobs = slices.Delete(jobs, i, i+1)
|
||||
}
|
||||
maps.Copy(w.Header(), response.Header)
|
||||
w.WriteHeader(response.StatusCode)
|
||||
_, _ = io.Copy(w, response.Body)
|
||||
}
|
||||
})
|
||||
|
||||
s := http.Server{
|
||||
ReadTimeout: time.Second * 10,
|
||||
IdleTimeout: time.Second * 60,
|
||||
WriteTimeout: 0,
|
||||
Addr: *listenAddr,
|
||||
Handler: serveMux,
|
||||
}
|
||||
s.SetKeepAlivesEnabled(true)
|
||||
|
||||
if err := s.ListenAndServe(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer s.Close()
|
||||
}
|
|
@ -78,7 +78,7 @@ func CreateJob(cfg encode_utils.JobConfig) (*Job, error) {
|
|||
Config: cfg,
|
||||
Logger: utilities.DefaultLogger(),
|
||||
}
|
||||
var idBuf [32]byte
|
||||
var idBuf [16]byte
|
||||
_, err := io.ReadFull(rand.Reader, idBuf[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
Loading…
Reference in a new issue