package channel // MapPreserveOrder applies mapper to all I's coming from source and sends their return values to out while preserving input order. // All mappings will be done as concurrently as possible using as many threads as there are CPU cores func MapPreserveOrder[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { return MapPreserveOrderWithRunner(source, getDefaultRunner(), mapper) } // MapPreserveOrderWithRunner behaves like MapPreserveOrder but uses runner to spawn its routines func MapPreserveOrderWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O { out := make(chan O, cap(source)) outchannels := make(chan chan O, cap(source)) // start routine for each incoming value go func(in <-chan I, outchannels chan chan O) { defer close(outchannels) for inputValue := range in { inputValue := inputValue outCh := make(chan O) outchannels <- outCh runner.Run(func() { defer close(outCh) outCh <- mapper(inputValue) }) } }(source, outchannels) // gather all results in incoming order go func(out chan<- O, outchannels chan chan O) { defer close(out) for ch := range outchannels { for outputValue := range ch { out <- outputValue } } }(out, outchannels) return out } // Map applies mapper to all I's coming from source and sends their return values to out. // All mappings will be done as concurrently as possible using as many threads as there are CPU cores func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O { return MapWithRunner(source, getDefaultRunner(), mapper) } // MapWithRunner behaves like Map but uses runner to spawn its routines func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O { out := make(chan O, cap(source)) go func() { defer close(out) for value := range source { value := value runner.Run(func() { out <- mapper(value) }) } }() return out } // MapSuccessive applies mapper to all I's coming from source and sends their return values to out while preserving input order. // All mappings will be done successively in a single thread func MapSuccessive[I, O any](source <-chan I, mapper func(I) O) <-chan O { out := make(chan O, cap(source)) go func() { defer close(out) for value := range source { out <- mapper(value) } }() return out }