Ignite/cli/encode-server/job.go

124 lines
2.4 KiB
Go

package main
import (
"crypto/rand"
"encoding/base64"
"errors"
encode_utils "git.gammaspectra.live/S.O.N.G/Ignite/cli/encode-utils"
"git.gammaspectra.live/S.O.N.G/Ignite/encoder"
"git.gammaspectra.live/S.O.N.G/Ignite/utilities"
"io"
"maps"
"slices"
"sync"
"sync/atomic"
)
var JobsMutex sync.Mutex
var jobs []*Job
type JobId string
type Job struct {
Id JobId `json:"id" yaml:"id"`
Config encode_utils.JobConfig `json:"config" yaml:"config"`
Logger utilities.Logger `json:"-" yaml:"-"`
Encoder encoder.Encoder `json:"-" yaml:"-"`
sync.Mutex `json:"-" yaml:"-"`
Status struct {
Start atomic.Uint64
Read atomic.Uint64
Processed atomic.Uint64
} `json:"-" yaml:"-"`
}
func (j *Job) Init(w io.Writer) error {
if j.Encoder != nil {
return errors.New("already initialized")
}
// Do not modify original settings
settings := maps.Clone(j.Config.Encoder.Settings)
var err error
if e := GetEncoder(j.Config.Encoder.Name); e == nil {
return errors.New("encoder not found")
} else {
j.Encoder, err = e.New(w, j.Config.Properties, settings, j.Logger)
}
if err != nil {
return err
}
if j.Encoder == nil {
return errors.New("encoder not supported")
}
return nil
}
func (j *Job) Close() error {
if j.Encoder != nil {
j.Encoder.Close()
j.Encoder = nil
}
return nil
}
// CreateJob Creates a Job. JobsMutex must be held
func CreateJob(cfg encode_utils.JobConfig) (*Job, error) {
job := &Job{
Config: cfg,
Logger: utilities.DefaultLogger(),
}
var idBuf [16]byte
_, err := io.ReadFull(rand.Reader, idBuf[:])
if err != nil {
return nil, err
}
job.Id = JobId(base64.RawURLEncoding.EncodeToString(idBuf[:]))
if slices.ContainsFunc(jobs, func(j *Job) bool {
return j.Id == job.Id
}) {
return nil, errors.New("job already exists")
}
jobs = append(jobs, job)
return job, nil
}
// GetJob Removes a Job.
func GetJob(jobId JobId) *Job {
JobsMutex.Lock()
defer JobsMutex.Unlock()
if i := slices.IndexFunc(jobs, func(j *Job) bool {
return j.Id == jobId
}); i != -1 {
return jobs[i]
}
return nil
}
// RemoveJob Removes a Job
func RemoveJob(jobId JobId) error {
JobsMutex.Lock()
defer JobsMutex.Unlock()
if i := slices.IndexFunc(jobs, func(j *Job) bool {
return j.Id == jobId
}); i == -1 {
return errors.New("job does not exist")
} else {
err := jobs[i].Close()
if err != nil {
return err
}
jobs = slices.Delete(jobs, i, i+1)
return nil
}
}