diff --git a/map.go b/map.go index 250c10e..d111cc2 100644 --- a/map.go +++ b/map.go @@ -7,8 +7,8 @@ func MapParallel[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { } // MapParallelWithRunner behaves like MapParallel but uses runner to spawn its routines -func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) (out <-chan O) { - outputChannel := make(chan O, cap(source)) +func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O { + out := make(chan O, cap(source)) outchannels := make(chan chan O, cap(source)) // start routine for each incoming value @@ -34,9 +34,9 @@ func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func out <- outputValue } } - }(outputChannel, outchannels) + }(out, outchannels) - return outputChannel + return out } // Map applies mapper to all I's coming from in and sends their return values to out while conserving input order. diff --git a/utils.go b/utils.go index 59e4784..85117e3 100644 --- a/utils.go +++ b/utils.go @@ -18,3 +18,40 @@ func Flush[T any](sources ...<-chan T) { for range Merge(sources...) { } } + +// Each consumes all values and calls f for each of them. +// It blocks until all sources are closed +func Each[T any](source <-chan T, f func(T)) { + for value := range source { + f(value) + } +} + +// Tee returns 2 channels which both receive all values from source. +// Its basically a copy function for channels +func Tee[T any](source <-chan T) (<-chan T, <-chan T) { + outs := TeeMany(source, 2) + return outs[0], outs[1] +} + +// TeeMany returns a given amount of channels which all receive all values from source. +// Its basically a copy function for channels +func TeeMany[T any](source <-chan T, amount int) []<-chan T { + outs := make([]chan T, amount) + + go func() { + defer func() { + for _, out := range outs { + close(out) + } + }() + + for value := range source { + for _, out := range outs { + out <- value + } + } + }() + + return (interface{}(outs)).([]<-chan T) +}