package main import ( "crypto/sha256" "crypto/tls" "encoding/base64" "encoding/json" "flag" encode_utils "git.gammaspectra.live/S.O.N.G/Ignite/cli/encode-utils" "gopkg.in/yaml.v3" "io" "net/http" "net/url" "os" "slices" "strconv" "sync" "sync/atomic" "time" ) var DefaultClient = &http.Client{ Timeout: time.Second * 5, } var InsecureDefaultClient = &http.Client{ Timeout: time.Second * 5, Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, }, } type ServerConfig struct { URL string `yaml:"url"` Key string `yaml:"key"` InsecureSSL bool `yaml:"insecure_ssl"` lastStatus atomic.Pointer[encode_utils.StatusData] nextStatus atomic.Int64 } func (s *ServerConfig) Status() *encode_utils.StatusData { if time.Unix(s.nextStatus.Load(), 0).Compare(time.Now()) < 0 { defer func() { s.nextStatus.Store(time.Now().Add(time.Second * 30).Unix()) }() u, _ := url.Parse(s.URL + "/status?k=" + s.Key) response, err := s.Do(&http.Request{ Method: "GET", URL: u, }) if err != nil { return s.lastStatus.Load() } defer response.Body.Close() dataBuf, err := io.ReadAll(response.Body) if err != nil { return s.lastStatus.Load() } var status encode_utils.StatusData err = json.Unmarshal(dataBuf, &status) if err != nil { return s.lastStatus.Load() } s.lastStatus.Store(&status) } return s.lastStatus.Load() } func (s *ServerConfig) Do(r *http.Request) (*http.Response, error) { if s.InsecureSSL { return InsecureDefaultClient.Do(r) } else { return DefaultClient.Do(r) } } func (s *ServerConfig) Pass(r *http.Request, replaceValues ...[2]string) (*http.Response, error) { urlPath := r.URL newUrl := s.URL + urlPath.String() urlPath, _ = url.Parse(newUrl) q := urlPath.Query() for _, v := range replaceValues { q.Set(v[0], v[1]) } urlPath.RawQuery = q.Encode() return s.Do(&http.Request{ Method: r.Method, URL: urlPath, Header: r.Header, Body: r.Body, }) } func (s *ServerConfig) Redirect(w http.ResponseWriter, r *http.Request, replaceValues ...[2]string) { urlPath := r.URL newUrl := s.URL + urlPath.String() urlPath, _ = url.Parse(newUrl) q := urlPath.Query() for _, v := range replaceValues { q.Set(v[0], v[1]) } urlPath.RawQuery = q.Encode() http.Redirect(w, r, urlPath.String(), http.StatusTemporaryRedirect) } type PoolConfig struct { Servers []*ServerConfig `yaml:"servers"` } var poolConfig PoolConfig var jobMutex sync.Mutex type Job struct { Id string `json:"id"` Config encode_utils.JobConfig `json:"config"` ServerId string `json:"server_id"` Server *ServerConfig `json:"server"` } var jobs []*Job func main() { configPath := flag.String("config", "pool.yml", "Configuration path for server pool") listenAddr := flag.String("listen", "0.0.0.0:8484", "Listen address for the server") flag.Parse() fData, err := os.ReadFile(*configPath) if err != nil { panic(err) } err = yaml.Unmarshal(fData, &poolConfig) if err != nil { panic(err) } serveMux := http.NewServeMux() serveMux.HandleFunc("/create", func(w http.ResponseWriter, r *http.Request) { for { for _, s := range poolConfig.Servers { if job := func(s *ServerConfig) *Job { response, err := s.Pass(r, [2]string{"k", s.Key}) if err != nil { return nil } defer response.Body.Close() defer io.ReadAll(response.Body) if response.StatusCode == http.StatusOK { var job struct { Id string `json:"id"` Config encode_utils.JobConfig `json:"config"` } dataBuf, err := io.ReadAll(response.Body) if err != nil { return nil } err = json.Unmarshal(dataBuf, &job) if err != nil { return nil } jobId := sha256.Sum256(append([]byte(job.Id), s.URL...)) return &Job{ Id: base64.RawURLEncoding.EncodeToString(jobId[:]), Config: job.Config, ServerId: job.Id, Server: s, } } return nil }(s); job != nil { jobMutex.Lock() defer jobMutex.Unlock() jobs = append(jobs, job) data, err := json.Marshal(job) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Length", strconv.FormatUint(uint64(len(data)), 10)) w.WriteHeader(http.StatusOK) _, _ = w.Write(data) return } } // Check every few seconds for any space for new jobs // TODO: do this globally aware + pool via channels time.Sleep(time.Second * 30) } }) serveMux.HandleFunc("/start", func(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() jobId := query.Get("jobId") jobMutex.Lock() defer jobMutex.Unlock() if i := slices.IndexFunc(jobs, func(job *Job) bool { return job.Id == jobId }); i == -1 { w.WriteHeader(http.StatusNotFound) return } else { job := jobs[i] job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId}) } }) serveMux.HandleFunc("/startURL", func(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() jobId := query.Get("jobId") jobMutex.Lock() defer jobMutex.Unlock() if i := slices.IndexFunc(jobs, func(job *Job) bool { return job.Id == jobId }); i == -1 { w.WriteHeader(http.StatusNotFound) return } else { job := jobs[i] job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId}) } }) serveMux.HandleFunc("/job", func(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() jobId := query.Get("jobId") jobMutex.Lock() defer jobMutex.Unlock() if i := slices.IndexFunc(jobs, func(job *Job) bool { return job.Id == jobId }); i == -1 { w.WriteHeader(http.StatusNotFound) return } else { job := jobs[i] job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId}) } }) serveMux.HandleFunc("/remove", func(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() jobId := query.Get("jobId") jobMutex.Lock() defer jobMutex.Unlock() if i := slices.IndexFunc(jobs, func(job *Job) bool { return job.Id == jobId }); i == -1 { w.WriteHeader(http.StatusNotFound) return } else { jobs = slices.Delete(jobs, i, i+1) job := jobs[i] job.Server.Redirect(w, r, [2]string{"k", job.Server.Key}, [2]string{"jobId", job.ServerId}) } }) s := http.Server{ ReadTimeout: time.Second * 10, IdleTimeout: time.Second * 60, WriteTimeout: 0, Addr: *listenAddr, Handler: serveMux, } s.SetKeepAlivesEnabled(true) if err := s.ListenAndServe(); err != nil { panic(err) } defer s.Close() }