package channel func FilterSuccessive[T any](source <-chan T, filter func(T) bool) <-chan T { out := make(chan T, cap(source)) go func() { defer close(out) for value := range source { if filter(value) { out <- value } } }() return out } func Filter[T any](source <-chan T, filter func(T) bool) <-chan T { return FilterPreserveOrderWithRunner(source, getDefaultRunner(), filter) } func FilterPreserveOrderWithRunner[T any](source <-chan T, runner Runner, filter func(T) bool) <-chan T { type FilteredValue[T any] struct { Value T Filter bool } mappedValues := MapPreserveOrderWithRunner(source, runner, func(value T) FilteredValue[T] { return FilteredValue[T]{Value: value, Filter: filter(value)} }) filteredValues := FilterSuccessive(mappedValues, func(filteredValue FilteredValue[T]) bool { return filteredValue.Filter }) return MapSuccessive(filteredValues, func(filteredValue FilteredValue[T]) T { return filteredValue.Value }) }