channel/each.go

35 lines
755 B
Go
Raw Permalink Normal View History

2023-03-04 11:24:42 +01:00
package channel
import "sync"
// Each consumes all values and calls f for each of them.
// It blocks until source is closed
func Each[T any](source <-chan T, f func(T)) {
EachWithRunner(source, getDefaultRunner(), f)
}
// Each consumes all values and calls f for each of them.
// It blocks until source is closed
func EachWithRunner[T any](source <-chan T, runner Runner, f func(T)) {
wg := &sync.WaitGroup{}
for value := range source {
value := value
wg.Add(1)
runner.Run(func() {
defer wg.Done()
f(value)
})
}
wg.Wait()
}
// EachSuccessive consumes all values and calls f for each of them.
// It blocks until source is closed
func EachSuccessive[T any](source <-chan T, f func(T)) {
for value := range source {
f(value)
}
}