Add Stream.Split
This commit is contained in:
parent
7662721dca
commit
cda09dfed2
|
@ -124,6 +124,49 @@ func (s *Stream) Slice(start, end int) *Stream {
|
|||
return slice
|
||||
}
|
||||
|
||||
// Split produces several Stream across each split point, and locks the input.
|
||||
// For example, splits = [2, 5, 20] will produce 4 Stream, [0-1], [2-4], [5-19], [20-...]
|
||||
func (s *Stream) Split(splits ...int) []*Stream {
|
||||
if len(splits) == 0 {
|
||||
return []*Stream{s}
|
||||
} else if !s.Lock() {
|
||||
return nil
|
||||
}
|
||||
slice := make([]*Stream, len(splits)+1)
|
||||
for i := range slice {
|
||||
slice[i], _ = NewStream(s.Properties())
|
||||
}
|
||||
|
||||
var index int
|
||||
go func() {
|
||||
defer func() {
|
||||
//empty source channel
|
||||
for range s.channel {
|
||||
|
||||
}
|
||||
}()
|
||||
defer func() {
|
||||
for _, sC := range slice {
|
||||
close(sC.channel)
|
||||
}
|
||||
}()
|
||||
|
||||
for f := range s.channel {
|
||||
for len(splits) > 0 && index >= splits[0] {
|
||||
close(slice[0].channel)
|
||||
splits = splits[1:]
|
||||
slice = slice[1:]
|
||||
}
|
||||
|
||||
slice[0].channel <- f
|
||||
|
||||
index++
|
||||
}
|
||||
}()
|
||||
|
||||
return slice
|
||||
}
|
||||
|
||||
// Sample samples frames every each Frame, and locks the input
|
||||
func (s *Stream) Sample(each int) *Stream {
|
||||
if !s.Lock() {
|
||||
|
|
Loading…
Reference in a new issue