package channel import ( "time" ) // GroupByTime groups all incoming values from source using the grouper function // and sends them to the returned channel after the given amount of time. // This can useful for summing or averaging values from a channel on a fixed interval func GroupByTime[T, G any](source <-chan T, duration time.Duration, grouper func(current G, value T) G) <-chan G { out := make(chan G, cap(source)) go func() { defer close(out) ticker := time.NewTicker(duration) defer ticker.Stop() current := *new(G) changed := false for { select { case value, ok := <-source: if !ok { if changed { out <- current } return } current = grouper(current, value) changed = true case <-ticker.C: group := current out <- group current = *new(G) changed = false } } }() return out } // GroupByAmount groups all incoming values from source using the grouper function // and sends them to the returned channel after the given amount of values were grouped. // This can useful for summing or averaging values from a channel for a given amount of values func GroupByAmount[T, G any](source <-chan T, amount int, grouper func(current G, value T) G) <-chan G { out := make(chan G, cap(source)) go func() { defer close(out) current := *new(G) currentAmount := 0 for value := range source { currentAmount++ current = grouper(current, value) if currentAmount%amount == 0 { group := current out <- group current = *new(G) } } }() return out } // GroupByValue groups all incoming values from source using the grouper function // and sends them to the returned channel after valueFunc returns true for a given value. // This can useful for summing or averaging values from a channel for a given amount of values func GroupByValue[T, G any](source <-chan T, valueFunc func(T) bool, grouper func(current G, value T) G) <-chan G { out := make(chan G, cap(source)) go func() { defer close(out) current := *new(G) currentAmount := 0 for value := range source { currentAmount++ current = grouper(current, value) if valueFunc(value) { group := current out <- group current = *new(G) } } }() return out }