package channels import ( "context" "errors" "sync" "code.wmdillon.com/GoApi/apicontext" ) var ( ErrChannelClosed = errors.New("channel is closed (returned !ok)") ) type ChannelPipelineWorkFunction[T any] func(T) T func ProcessChannelThroughFunction[T any](ctx context.Context, c <-chan T, workCallback ChannelPipelineWorkFunction[T]) <-chan T { results := make(chan T, cap(c)) go func() { defer close(results) for apicontext.ContextIsNotDone(ctx) { if t, err := TrySelectFromChannel(ctx, c); err != nil { return } else if err := TryAddToChannel(ctx, results, workCallback(t)); err != nil { return } } }() return results } // could be problematic in certain situations because it launches one goroutine per incoming channel. // this shouldn't typically be a problem with a small number of channels, or when the channels producers // aren't very busy, but it's worth noting. func MergeChannelsWithContext[T any](ctx context.Context, bufferSize int, channels ...<-chan T) <-chan T { results := make(chan T, bufferSize) go func() { defer close(results) var wg sync.WaitGroup for _, c := range channels { wg.Add(1) go func() { defer wg.Done() for apicontext.ContextIsNotDone(ctx) { if t, err := TrySelectFromChannel(ctx, c); err != nil { return } else if err := TryAddToChannel(ctx, results, t); err != nil { return } } }() } wg.Wait() }() return results } func MergeChannels[T any](bufferSize int, channels ...<-chan T) <-chan T { return MergeChannelsWithContext(context.Background(), bufferSize, channels...) } func TryAddToChannel[T any](ctx context.Context, c chan<- T, t T) error { select { case <-ctx.Done(): return context.DeadlineExceeded case c <- t: return nil } } func TrySelectFromChannel[T any](ctx context.Context, c <-chan T) (T, error) { var t T var ok bool select { case <-ctx.Done(): return t, context.DeadlineExceeded case t, ok = <-c: if !ok { return t, ErrChannelClosed } else { return t, nil } } } func ChannelToSliceWithContext[T any](ctx context.Context, c <-chan T) []T { results := make([]T, 0, cap(c)) loop: for apicontext.ContextIsNotDone(ctx) { if t, err := TrySelectFromChannel(ctx, c); err != nil { break loop } else { results = append(results, t) } } return results } func ChannelToSlice[T any](c <-chan T) []T { return ChannelToSliceWithContext(context.Background(), c) } func SliceToChannelWithContext[T any](ctx context.Context, s []T, bufferSize int) <-chan T { results := make(chan T, bufferSize) go func() { defer close(results) for _, t := range s { if err := TryAddToChannel(ctx, results, t); err != nil { return } } }() return results } func SliceToChannel[T any](s []T, bufferSize int) <-chan T { return SliceToChannelWithContext(context.Background(), s, bufferSize) } func CopyChannelWithContext[T any](ctx context.Context, c <-chan T) (a, b <-chan T) { left, right := make(chan T, cap(c)), make(chan T, cap(c)) go func() { defer func() { close(left) close(right) }() for apicontext.ContextIsNotDone(ctx) { if t, err := TrySelectFromChannel(ctx, c); err != nil { return } else if err := TryAddToChannel(ctx, left, t); err != nil { return } else if err := TryAddToChannel(ctx, right, t); err != nil { return } } }() return left, right }