package channel import "sync" // Each consumes all values and calls f for each of them. // It blocks until source is closed func Each[T any](source <-chan T, f func(T)) { EachWithRunner(source, getDefaultRunner(), f) } // Each consumes all values and calls f for each of them. // It blocks until source is closed func EachWithRunner[T any](source <-chan T, runner Runner, f func(T)) { wg := &sync.WaitGroup{} for value := range source { value := value wg.Add(1) runner.Run(func() { defer wg.Done() f(value) }) } wg.Wait() } // EachSuccessive consumes all values and calls f for each of them. // It blocks until source is closed func EachSuccessive[T any](source <-chan T, f func(T)) { for value := range source { f(value) } }