package channel // MapParallel applies mapper to all I's coming from in and sends their return values to out while preserving input order. // All mappings will be done as concurrently as possible func MapParallel[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { return MapParallelWithRunner(source, NewUnlimitedRunner(), mapper) } // MapParallelWithRunner behaves like MapParallel but uses runner to spawn its routines func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O { out := make(chan O, cap(source)) outchannels := make(chan chan O, cap(source)) // start routine for each incoming value go func(in <-chan I, outchannels chan chan O) { defer close(outchannels) for inputValue := range in { inputValue := inputValue outCh := make(chan O) outchannels <- outCh runner.Run(func() { defer close(outCh) outCh <- mapper(inputValue) }) } }(source, outchannels) // gather all results in incoming order go func(out chan<- O, outchannels chan chan O) { defer close(out) for ch := range outchannels { for outputValue := range ch { out <- outputValue } } }(out, outchannels) return out } // Map applies mapper to all I's coming from in and sends their return values to out while preserving input order. // All mappings will be done successively func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O { out := make(chan O, cap(source)) go func() { defer close(out) for value := range source { out <- mapper(value) } }() return out }