95 lines
2.2 KiB
Go
95 lines
2.2 KiB
Go
package channel
|
|
|
|
import (
|
|
"time"
|
|
)
|
|
|
|
// GroupByTime groups all incoming values from source using the grouper function
|
|
// and sends them to the returned channel after the given amount of time.
|
|
// This can useful for summing or averaging values from a channel on a fixed interval
|
|
func GroupByTime[T, G any](source <-chan T, duration time.Duration, grouper func(current G, value T) G) <-chan G {
|
|
out := make(chan G, cap(source))
|
|
|
|
go func() {
|
|
defer close(out)
|
|
|
|
ticker := time.NewTicker(duration)
|
|
defer ticker.Stop()
|
|
|
|
current := *new(G)
|
|
changed := false
|
|
for {
|
|
select {
|
|
case value, ok := <-source:
|
|
if !ok {
|
|
if changed {
|
|
out <- current
|
|
}
|
|
return
|
|
}
|
|
current = grouper(current, value)
|
|
changed = true
|
|
case <-ticker.C:
|
|
group := current
|
|
out <- group
|
|
current = *new(G)
|
|
changed = false
|
|
}
|
|
}
|
|
}()
|
|
|
|
return out
|
|
}
|
|
|
|
// GroupByAmount groups all incoming values from source using the grouper function
|
|
// and sends them to the returned channel after the given amount of values were grouped.
|
|
// This can useful for summing or averaging values from a channel for a given amount of values
|
|
func GroupByAmount[T, G any](source <-chan T, amount int, grouper func(current G, value T) G) <-chan G {
|
|
out := make(chan G, cap(source))
|
|
|
|
go func() {
|
|
defer close(out)
|
|
|
|
current := *new(G)
|
|
currentAmount := 0
|
|
for value := range source {
|
|
currentAmount++
|
|
current = grouper(current, value)
|
|
|
|
if currentAmount%amount == 0 {
|
|
group := current
|
|
out <- group
|
|
current = *new(G)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return out
|
|
}
|
|
|
|
// GroupByValue groups all incoming values from source using the grouper function
|
|
// and sends them to the returned channel after valueFunc returns true for a given value.
|
|
// This can useful for summing or averaging values from a channel for a given amount of values
|
|
func GroupByValue[T, G any](source <-chan T, valueFunc func(T) bool, grouper func(current G, value T) G) <-chan G {
|
|
out := make(chan G, cap(source))
|
|
|
|
go func() {
|
|
defer close(out)
|
|
|
|
current := *new(G)
|
|
currentAmount := 0
|
|
for value := range source {
|
|
currentAmount++
|
|
current = grouper(current, value)
|
|
|
|
if valueFunc(value) {
|
|
group := current
|
|
out <- group
|
|
current = *new(G)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return out
|
|
}
|