channel/map.go

87 lines
2.4 KiB
Go
Raw Permalink Normal View History

2022-02-09 12:10:02 +01:00
package channel
import "sync"
// MapPreserveOrder applies mapper to all I's coming from source and sends their return values to out while preserving input order.
// All mappings will be done as concurrently as possible using as many threads as there are CPU cores
func MapPreserveOrder[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) {
return MapPreserveOrderWithRunner(source, getDefaultRunner(), mapper)
2022-02-09 12:10:02 +01:00
}
// MapPreserveOrderWithRunner behaves like MapPreserveOrder but uses runner to spawn its routines
func MapPreserveOrderWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O {
2022-02-10 08:07:03 +01:00
out := make(chan O, cap(source))
2022-02-09 12:10:02 +01:00
outchannels := make(chan chan O, cap(source))
// start routine for each incoming value
go func(in <-chan I, outchannels chan chan O) {
defer close(outchannels)
for inputValue := range in {
inputValue := inputValue
outCh := make(chan O)
outchannels <- outCh
runner.Run(func() {
defer close(outCh)
outCh <- mapper(inputValue)
})
}
}(source, outchannels)
// gather all results in incoming order
go func(out chan<- O, outchannels chan chan O) {
defer close(out)
for ch := range outchannels {
for outputValue := range ch {
out <- outputValue
}
}
2022-02-10 08:07:03 +01:00
}(out, outchannels)
2022-02-09 12:10:02 +01:00
2022-02-10 08:07:03 +01:00
return out
2022-02-09 12:10:02 +01:00
}
// Map applies mapper to all I's coming from source and sends their return values to out.
// All mappings will be done as concurrently as possible using as many threads as there are CPU cores
func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O {
return MapWithRunner(source, getDefaultRunner(), mapper)
}
// MapWithRunner behaves like Map but uses runner to spawn its routines
func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O {
out := make(chan O, cap(source))
go func() {
defer close(out)
wg := &sync.WaitGroup{}
for value := range source {
value := value
wg.Add(1)
runner.Run(func() {
defer wg.Done()
out <- mapper(value)
})
}
wg.Wait()
}()
return out
}
// MapSuccessive applies mapper to all I's coming from source and sends their return values to out while preserving input order.
// All mappings will be done successively in a single thread
func MapSuccessive[I, O any](source <-chan I, mapper func(I) O) <-chan O {
2022-02-09 12:10:02 +01:00
out := make(chan O, cap(source))
go func() {
defer close(out)
for value := range source {
out <- mapper(value)
}
}()
return out
}