package channel func determineBufferSize[T any](channels []<-chan T) int { if len(channels) == 0 { return 0 } bufSize := 0 for _, ch := range channels { bufSize += cap(ch) } return bufSize / len(channels) } // Flush consumes all values and discards them immediately. // It blocks until all sources are closed func Flush[T any](sources ...<-chan T) { for range Merge(sources...) { } } // Each consumes all values and calls f for each of them. // It blocks until all sources are closed func Each[T any](source <-chan T, f func(T)) { for value := range source { f(value) } } // Tee returns 2 channels which both receive all values from source. // Its basically a copy function for channels func Tee[T any](source <-chan T) (<-chan T, <-chan T) { outs := TeeMany(source, 2) return outs[0], outs[1] } // TeeMany returns a given amount of channels which all receive all values from source. // Its basically a copy function for channels func TeeMany[T any](source <-chan T, amount int) []<-chan T { outs := make([]chan T, amount) go func() { defer func() { for _, out := range outs { close(out) } }() for value := range source { for _, out := range outs { out <- value } } }() return (interface{}(outs)).([]<-chan T) }