channel/utils.go
2022-02-10 08:07:03 +01:00

58 lines
1.2 KiB
Go

package channel
func determineBufferSize[T any](channels []<-chan T) int {
if len(channels) == 0 {
return 0
}
bufSize := 0
for _, ch := range channels {
bufSize += cap(ch)
}
return bufSize / len(channels)
}
// Flush consumes all values and discards them immediately.
// It blocks until all sources are closed
func Flush[T any](sources ...<-chan T) {
for range Merge(sources...) {
}
}
// Each consumes all values and calls f for each of them.
// It blocks until all sources are closed
func Each[T any](source <-chan T, f func(T)) {
for value := range source {
f(value)
}
}
// Tee returns 2 channels which both receive all values from source.
// Its basically a copy function for channels
func Tee[T any](source <-chan T) (<-chan T, <-chan T) {
outs := TeeMany(source, 2)
return outs[0], outs[1]
}
// TeeMany returns a given amount of channels which all receive all values from source.
// Its basically a copy function for channels
func TeeMany[T any](source <-chan T, amount int) []<-chan T {
outs := make([]chan T, amount)
go func() {
defer func() {
for _, out := range outs {
close(out)
}
}()
for value := range source {
for _, out := range outs {
out <- value
}
}
}()
return (interface{}(outs)).([]<-chan T)
}