From 2db42252516cfdf612f1bc14fa45c5c53bfe5e99 Mon Sep 17 00:00:00 2001 From: milarin Date: Mon, 6 Mar 2023 13:03:10 +0100 Subject: [PATCH] fixed Map function to wait until all values were mapped before closing channel --- map.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/map.go b/map.go index 665af6c..b993582 100644 --- a/map.go +++ b/map.go @@ -1,5 +1,7 @@ package channel +import "sync" + // MapPreserveOrder applies mapper to all I's coming from source and sends their return values to out while preserving input order. // All mappings will be done as concurrently as possible using as many threads as there are CPU cores func MapPreserveOrder[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { @@ -51,12 +53,18 @@ func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) < go func() { defer close(out) + wg := &sync.WaitGroup{} + for value := range source { value := value + wg.Add(1) runner.Run(func() { + defer wg.Done() out <- mapper(value) }) } + + wg.Wait() }() return out