channel/map.go

58 lines
1.6 KiB
Go
Raw Normal View History

2022-02-09 12:10:02 +01:00
package channel
import "runtime"
// Map applies mapper to all I's coming from in and sends their return values to out while preserving input order.
// All mappings will be done as concurrently as possible using as many threads as there are CPU cores
func Map[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) {
return MapWithRunner(source, NewLimitedRunner(runtime.NumCPU()), mapper)
2022-02-09 12:10:02 +01:00
}
// MapWithRunner behaves like Map but uses runner to spawn its routines
func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O {
2022-02-10 08:07:03 +01:00
out := make(chan O, cap(source))
2022-02-09 12:10:02 +01:00
outchannels := make(chan chan O, cap(source))
// start routine for each incoming value
go func(in <-chan I, outchannels chan chan O) {
defer close(outchannels)
for inputValue := range in {
inputValue := inputValue
outCh := make(chan O)
outchannels <- outCh
runner.Run(func() {
defer close(outCh)
outCh <- mapper(inputValue)
})
}
}(source, outchannels)
// gather all results in incoming order
go func(out chan<- O, outchannels chan chan O) {
defer close(out)
for ch := range outchannels {
for outputValue := range ch {
out <- outputValue
}
}
2022-02-10 08:07:03 +01:00
}(out, outchannels)
2022-02-09 12:10:02 +01:00
2022-02-10 08:07:03 +01:00
return out
2022-02-09 12:10:02 +01:00
}
// MapSuccessive applies mapper to all I's coming from in and sends their return values to out while preserving input order.
2022-02-09 12:10:02 +01:00
// All mappings will be done successively
func MapSuccessive[I, O any](source <-chan I, mapper func(I) O) <-chan O {
2022-02-09 12:10:02 +01:00
out := make(chan O, cap(source))
go func() {
defer close(out)
for value := range source {
out <- mapper(value)
}
}()
return out
}