fixed Map function to wait until all values were mapped before closing channel

This commit is contained in:
milarin 2023-03-06 13:03:10 +01:00
parent ac5678bde6
commit 2db4225251

8
map.go
View File

@ -1,5 +1,7 @@
package channel package channel
import "sync"
// MapPreserveOrder applies mapper to all I's coming from source and sends their return values to out while preserving input order. // MapPreserveOrder applies mapper to all I's coming from source and sends their return values to out while preserving input order.
// All mappings will be done as concurrently as possible using as many threads as there are CPU cores // All mappings will be done as concurrently as possible using as many threads as there are CPU cores
func MapPreserveOrder[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { func MapPreserveOrder[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) {
@ -51,12 +53,18 @@ func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <
go func() { go func() {
defer close(out) defer close(out)
wg := &sync.WaitGroup{}
for value := range source { for value := range source {
value := value value := value
wg.Add(1)
runner.Run(func() { runner.Run(func() {
defer wg.Done()
out <- mapper(value) out <- mapper(value)
}) })
} }
wg.Wait()
}() }()
return out return out