channel/utils.go

67 lines
1.5 KiB
Go
Raw Normal View History

2022-02-09 12:10:02 +01:00
package channel
func determineBufferSize[T any](channels []<-chan T) int {
if len(channels) == 0 {
return 0
}
maxBufSize := 0
2022-02-09 12:10:02 +01:00
for _, ch := range channels {
if cap(ch) > maxBufSize {
maxBufSize = cap(ch)
}
2022-02-09 12:10:02 +01:00
}
return maxBufSize
2022-02-09 12:10:02 +01:00
}
2022-02-09 12:17:54 +01:00
2022-02-09 12:19:19 +01:00
// Flush consumes all values and discards them immediately.
// It blocks until all sources are closed
2022-02-09 12:17:54 +01:00
func Flush[T any](sources ...<-chan T) {
for range Merge(sources...) {
}
}
2022-02-10 08:07:03 +01:00
// Each consumes all values and calls f for each of them.
// It blocks until source is closed
2022-02-10 08:07:03 +01:00
func Each[T any](source <-chan T, f func(T)) {
for value := range source {
f(value)
}
}
// Tee returns 2 channels which both receive all values from source.
// It's basically a copy function for channels
2022-02-10 08:07:03 +01:00
func Tee[T any](source <-chan T) (<-chan T, <-chan T) {
outs := TeeMany(source, 2)
return outs[0], outs[1]
}
// TeeMany returns a given amount of channels which all receive all values from source.
// It's basically a copy function for channels
2022-02-10 08:07:03 +01:00
func TeeMany[T any](source <-chan T, amount int) []<-chan T {
2022-08-28 15:28:33 +02:00
outputs := make([]chan T, amount)
for i := range outputs {
outputs[i] = make(chan T, cap(source))
}
2022-02-10 08:07:03 +01:00
go func() {
defer func() {
2022-08-28 15:28:33 +02:00
for _, out := range outputs {
2022-02-10 08:07:03 +01:00
close(out)
}
}()
for value := range source {
2022-08-28 15:28:33 +02:00
for _, out := range outputs {
2022-02-10 08:07:03 +01:00
out <- value
}
}
}()
2022-08-28 15:28:33 +02:00
readOnlyOutputs := make([]<-chan T, 0, amount)
for _, out := range outputs {
readOnlyOutputs = append(readOnlyOutputs, out)
}
return readOnlyOutputs
2022-02-10 08:07:03 +01:00
}