From 19959a6c0188b36a021f1ee3f1c24264db4672be Mon Sep 17 00:00:00 2001 From: milarin Date: Sat, 17 Dec 2022 19:50:41 +0100 Subject: [PATCH] - efficiency of toMap improved by concurrently calling the mapper function - ToMapSuccessive introduced which still uses old behavior --- internal_stuff.go | 12 ++++++++++++ map.go | 4 +--- to.go | 23 ++++++++++++++++++++++- 3 files changed, 35 insertions(+), 4 deletions(-) create mode 100644 internal_stuff.go diff --git a/internal_stuff.go b/internal_stuff.go new file mode 100644 index 0000000..9ab9010 --- /dev/null +++ b/internal_stuff.go @@ -0,0 +1,12 @@ +package channel + +import "runtime" + +type mapEntry[K comparable, V any] struct { + Key K + Value V +} + +func getDefaultRunner() Runner { + return NewLimitedRunner(runtime.NumCPU()) +} diff --git a/map.go b/map.go index 42c4e6d..50bc174 100644 --- a/map.go +++ b/map.go @@ -1,11 +1,9 @@ package channel -import "runtime" - // Map applies mapper to all I's coming from in and sends their return values to out while preserving input order. // All mappings will be done as concurrently as possible using as many threads as there are CPU cores func Map[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { - return MapWithRunner(source, NewLimitedRunner(runtime.NumCPU()), mapper) + return MapWithRunner(source, getDefaultRunner(), mapper) } // MapWithRunner behaves like Map but uses runner to spawn its routines diff --git a/to.go b/to.go index f4a65ca..a8a7d80 100644 --- a/to.go +++ b/to.go @@ -30,8 +30,29 @@ func ToList[T any](ch <-chan T) *list.List { } // ToMap returns a map containing all values read from ch. -// The map key-value pairs are determined by f +// The map key-value pairs are determined by f which will be called as concurrently as possible +// to build the resulting map func ToMap[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V { + return ToMapWithRunner(ch, getDefaultRunner(), f) +} + +// ToMap returns a map containing all values read from ch. +// The map key-value pairs are determined by f which will be called as concurrently as possible +// to build the resulting map +func ToMapWithRunner[T any, K comparable, V any](ch <-chan T, runner Runner, f func(T) (K, V)) map[K]V { + map2entry := func(t T) mapEntry[K, V] { + k, v := f(t) + return mapEntry[K, V]{Key: k, Value: v} + } + + map2kv := func(e mapEntry[K, V]) (K, V) { return e.Key, e.Value } + + return ToMapSuccessive(MapWithRunner(ch, runner, map2entry), map2kv) +} + +// ToMapSuccessive returns a map containing all values read from ch. +// The map key-value pairs are determined by f +func ToMapSuccessive[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V { m := map[K]V{} Each(ch, func(value T) { k, v := f(value)