From 892814aa5cf0e3d92804e2263b778eb907ad5b52 Mon Sep 17 00:00:00 2001 From: Timon Ringwald Date: Mon, 25 Apr 2022 15:37:00 +0200 Subject: [PATCH] use max channel capacity instead of average --- map.go | 4 ++-- utils.go | 14 ++++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/map.go b/map.go index d111cc2..5b40f16 100644 --- a/map.go +++ b/map.go @@ -1,6 +1,6 @@ package channel -// MapParallel applies mapper to all I's coming from in and sends their return values to out while conserving input order. +// MapParallel applies mapper to all I's coming from in and sends their return values to out while preserving input order. // All mappings will be done as concurrently as possible func MapParallel[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { return MapParallelWithRunner(source, NewUnlimitedRunner(), mapper) @@ -39,7 +39,7 @@ func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func return out } -// Map applies mapper to all I's coming from in and sends their return values to out while conserving input order. +// Map applies mapper to all I's coming from in and sends their return values to out while preserving input order. // All mappings will be done successively func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O { out := make(chan O, cap(source)) diff --git a/utils.go b/utils.go index 85117e3..bea7182 100644 --- a/utils.go +++ b/utils.go @@ -5,11 +5,13 @@ func determineBufferSize[T any](channels []<-chan T) int { return 0 } - bufSize := 0 + maxBufSize := 0 for _, ch := range channels { - bufSize += cap(ch) + if cap(ch) > maxBufSize { + maxBufSize = cap(ch) + } } - return bufSize / len(channels) + return maxBufSize } // Flush consumes all values and discards them immediately. @@ -20,7 +22,7 @@ func Flush[T any](sources ...<-chan T) { } // Each consumes all values and calls f for each of them. -// It blocks until all sources are closed +// It blocks until source is closed func Each[T any](source <-chan T, f func(T)) { for value := range source { f(value) @@ -28,14 +30,14 @@ func Each[T any](source <-chan T, f func(T)) { } // Tee returns 2 channels which both receive all values from source. -// Its basically a copy function for channels +// It's basically a copy function for channels func Tee[T any](source <-chan T) (<-chan T, <-chan T) { outs := TeeMany(source, 2) return outs[0], outs[1] } // TeeMany returns a given amount of channels which all receive all values from source. -// Its basically a copy function for channels +// It's basically a copy function for channels func TeeMany[T any](source <-chan T, amount int) []<-chan T { outs := make([]chan T, amount)