35 lines
755 B
Go
35 lines
755 B
Go
|
package channel
|
||
|
|
||
|
import "sync"
|
||
|
|
||
|
// Each consumes all values and calls f for each of them.
|
||
|
// It blocks until source is closed
|
||
|
func Each[T any](source <-chan T, f func(T)) {
|
||
|
EachWithRunner(source, getDefaultRunner(), f)
|
||
|
}
|
||
|
|
||
|
// Each consumes all values and calls f for each of them.
|
||
|
// It blocks until source is closed
|
||
|
func EachWithRunner[T any](source <-chan T, runner Runner, f func(T)) {
|
||
|
wg := &sync.WaitGroup{}
|
||
|
|
||
|
for value := range source {
|
||
|
value := value
|
||
|
wg.Add(1)
|
||
|
runner.Run(func() {
|
||
|
defer wg.Done()
|
||
|
f(value)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
wg.Wait()
|
||
|
}
|
||
|
|
||
|
// EachSuccessive consumes all values and calls f for each of them.
|
||
|
// It blocks until source is closed
|
||
|
func EachSuccessive[T any](source <-chan T, f func(T)) {
|
||
|
for value := range source {
|
||
|
f(value)
|
||
|
}
|
||
|
}
|