use max channel capacity instead of average

This commit is contained in:
Timon Ringwald 2022-04-25 15:37:00 +02:00
parent 55c3c7a464
commit 892814aa5c
2 changed files with 10 additions and 8 deletions

4
map.go
View File

@ -1,6 +1,6 @@
package channel package channel
// MapParallel applies mapper to all I's coming from in and sends their return values to out while conserving input order. // MapParallel applies mapper to all I's coming from in and sends their return values to out while preserving input order.
// All mappings will be done as concurrently as possible // All mappings will be done as concurrently as possible
func MapParallel[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { func MapParallel[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) {
return MapParallelWithRunner(source, NewUnlimitedRunner(), mapper) return MapParallelWithRunner(source, NewUnlimitedRunner(), mapper)
@ -39,7 +39,7 @@ func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func
return out return out
} }
// Map applies mapper to all I's coming from in and sends their return values to out while conserving input order. // Map applies mapper to all I's coming from in and sends their return values to out while preserving input order.
// All mappings will be done successively // All mappings will be done successively
func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O { func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O {
out := make(chan O, cap(source)) out := make(chan O, cap(source))

View File

@ -5,11 +5,13 @@ func determineBufferSize[T any](channels []<-chan T) int {
return 0 return 0
} }
bufSize := 0 maxBufSize := 0
for _, ch := range channels { for _, ch := range channels {
bufSize += cap(ch) if cap(ch) > maxBufSize {
maxBufSize = cap(ch)
}
} }
return bufSize / len(channels) return maxBufSize
} }
// Flush consumes all values and discards them immediately. // Flush consumes all values and discards them immediately.
@ -20,7 +22,7 @@ func Flush[T any](sources ...<-chan T) {
} }
// Each consumes all values and calls f for each of them. // Each consumes all values and calls f for each of them.
// It blocks until all sources are closed // It blocks until source is closed
func Each[T any](source <-chan T, f func(T)) { func Each[T any](source <-chan T, f func(T)) {
for value := range source { for value := range source {
f(value) f(value)
@ -28,14 +30,14 @@ func Each[T any](source <-chan T, f func(T)) {
} }
// Tee returns 2 channels which both receive all values from source. // Tee returns 2 channels which both receive all values from source.
// Its basically a copy function for channels // It's basically a copy function for channels
func Tee[T any](source <-chan T) (<-chan T, <-chan T) { func Tee[T any](source <-chan T) (<-chan T, <-chan T) {
outs := TeeMany(source, 2) outs := TeeMany(source, 2)
return outs[0], outs[1] return outs[0], outs[1]
} }
// TeeMany returns a given amount of channels which all receive all values from source. // TeeMany returns a given amount of channels which all receive all values from source.
// Its basically a copy function for channels // It's basically a copy function for channels
func TeeMany[T any](source <-chan T, amount int) []<-chan T { func TeeMany[T any](source <-chan T, amount int) []<-chan T {
outs := make([]chan T, amount) outs := make([]chan T, amount)