diff --git a/map.go b/map.go index 665af6c..b993582 100644 --- a/map.go +++ b/map.go @@ -1,5 +1,7 @@ package channel +import "sync" + // MapPreserveOrder applies mapper to all I's coming from source and sends their return values to out while preserving input order. // All mappings will be done as concurrently as possible using as many threads as there are CPU cores func MapPreserveOrder[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { @@ -51,12 +53,18 @@ func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) < go func() { defer close(out) + wg := &sync.WaitGroup{} + for value := range source { value := value + wg.Add(1) runner.Run(func() { + defer wg.Done() out <- mapper(value) }) } + + wg.Wait() }() return out