map syntax improved

This commit is contained in:
Tordarus 2022-02-10 08:07:03 +01:00
parent 07be46c4fe
commit a89b395510
2 changed files with 41 additions and 4 deletions

8
map.go
View File

@ -7,8 +7,8 @@ func MapParallel[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) {
}
// MapParallelWithRunner behaves like MapParallel but uses runner to spawn its routines
func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) (out <-chan O) {
outputChannel := make(chan O, cap(source))
func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O {
out := make(chan O, cap(source))
outchannels := make(chan chan O, cap(source))
// start routine for each incoming value
@ -34,9 +34,9 @@ func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func
out <- outputValue
}
}
}(outputChannel, outchannels)
}(out, outchannels)
return outputChannel
return out
}
// Map applies mapper to all I's coming from in and sends their return values to out while conserving input order.

View File

@ -18,3 +18,40 @@ func Flush[T any](sources ...<-chan T) {
for range Merge(sources...) {
}
}
// Each consumes all values and calls f for each of them.
// It blocks until all sources are closed
func Each[T any](source <-chan T, f func(T)) {
for value := range source {
f(value)
}
}
// Tee returns 2 channels which both receive all values from source.
// Its basically a copy function for channels
func Tee[T any](source <-chan T) (<-chan T, <-chan T) {
outs := TeeMany(source, 2)
return outs[0], outs[1]
}
// TeeMany returns a given amount of channels which all receive all values from source.
// Its basically a copy function for channels
func TeeMany[T any](source <-chan T, amount int) []<-chan T {
outs := make([]chan T, amount)
go func() {
defer func() {
for _, out := range outs {
close(out)
}
}()
for value := range source {
for _, out := range outs {
out <- value
}
}
}()
return (interface{}(outs)).([]<-chan T)
}