diff --git a/filter.go b/filter.go index 873f7c1..d99059d 100644 --- a/filter.go +++ b/filter.go @@ -1,6 +1,6 @@ package channel -func Filter[T any](source <-chan T, filter func(T) bool) <-chan T { +func FilterSuccessive[T any](source <-chan T, filter func(T) bool) <-chan T { out := make(chan T, cap(source)) go func() { @@ -14,3 +14,26 @@ func Filter[T any](source <-chan T, filter func(T) bool) <-chan T { return out } + +func Filter[T any](source <-chan T, filter func(T) bool) <-chan T { + return FilterPreserveOrderWithRunner(source, getDefaultRunner(), filter) +} + +func FilterPreserveOrderWithRunner[T any](source <-chan T, runner Runner, filter func(T) bool) <-chan T { + type FilteredValue[T any] struct { + Value T + Filter bool + } + + mappedValues := MapPreserveOrderWithRunner(source, runner, func(value T) FilteredValue[T] { + return FilteredValue[T]{Value: value, Filter: filter(value)} + }) + + filteredValues := FilterSuccessive(mappedValues, func(filteredValue FilteredValue[T]) bool { + return filteredValue.Filter + }) + + return MapSuccessive(filteredValues, func(filteredValue FilteredValue[T]) T { + return filteredValue.Value + }) +}