channel/timeout.go
2022-02-09 12:10:02 +01:00

31 lines
607 B
Go

package channel
import "time"
// CloseOnTimeout returns a channel which receives all values from all sources in order of arrival.
// If no source is sending a value in the given timeout duration, the channel will be closed
func CloseOnTimeout[T any](timeout time.Duration, sources ...<-chan T) <-chan T {
input := Merge(sources...)
output := make(chan T, cap(input))
go func() {
defer close(output)
for {
timer := time.NewTimer(timeout)
select {
case value, ok := <-input:
if !ok {
return
}
output <- value
case <-timer.C:
return
}
}
}()
return output
}