139 lines
3.3 KiB
Go
139 lines
3.3 KiB
Go
package channels
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
|
|
"code.wmdillon.com/GoApi/apicontext"
|
|
)
|
|
|
|
var (
|
|
ErrChannelClosed = errors.New("channel is closed (returned !ok)")
|
|
)
|
|
|
|
type ChannelPipelineWorkFunction[T any] func(T) T
|
|
|
|
func ProcessChannelThroughFunction[T any](ctx context.Context, c <-chan T, workCallback ChannelPipelineWorkFunction[T]) <-chan T {
|
|
results := make(chan T, cap(c))
|
|
go func() {
|
|
defer close(results)
|
|
for apicontext.ContextIsNotDone(ctx) {
|
|
if t, err := TrySelectFromChannel(ctx, c); err != nil {
|
|
return
|
|
} else if err := TryAddToChannel(ctx, results, workCallback(t)); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return results
|
|
}
|
|
|
|
// could be problematic in certain situations because it launches one goroutine per incoming channel.
|
|
// this shouldn't typically be a problem with a small number of channels, or when the channels producers
|
|
// aren't very busy, but it's worth noting.
|
|
func MergeChannelsWithContext[T any](ctx context.Context, bufferSize int, channels ...<-chan T) <-chan T {
|
|
results := make(chan T, bufferSize)
|
|
go func() {
|
|
defer close(results)
|
|
var wg sync.WaitGroup
|
|
for _, c := range channels {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for apicontext.ContextIsNotDone(ctx) {
|
|
if t, err := TrySelectFromChannel(ctx, c); err != nil {
|
|
return
|
|
} else if err := TryAddToChannel(ctx, results, t); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}()
|
|
return results
|
|
}
|
|
|
|
func MergeChannels[T any](bufferSize int, channels ...<-chan T) <-chan T {
|
|
return MergeChannelsWithContext(context.Background(), bufferSize, channels...)
|
|
}
|
|
|
|
func TryAddToChannel[T any](ctx context.Context, c chan<- T, t T) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return context.DeadlineExceeded
|
|
case c <- t:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func TrySelectFromChannel[T any](ctx context.Context, c <-chan T) (T, error) {
|
|
var t T
|
|
var ok bool
|
|
select {
|
|
case <-ctx.Done():
|
|
return t, context.DeadlineExceeded
|
|
case t, ok = <-c:
|
|
if !ok {
|
|
return t, ErrChannelClosed
|
|
} else {
|
|
return t, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func ChannelToSliceWithContext[T any](ctx context.Context, c <-chan T) []T {
|
|
results := make([]T, 0, cap(c))
|
|
loop:
|
|
for apicontext.ContextIsNotDone(ctx) {
|
|
if t, err := TrySelectFromChannel(ctx, c); err != nil {
|
|
break loop
|
|
} else {
|
|
results = append(results, t)
|
|
}
|
|
}
|
|
return results
|
|
}
|
|
|
|
func ChannelToSlice[T any](c <-chan T) []T {
|
|
return ChannelToSliceWithContext(context.Background(), c)
|
|
}
|
|
|
|
func SliceToChannelWithContext[T any](ctx context.Context, s []T, bufferSize int) <-chan T {
|
|
results := make(chan T, bufferSize)
|
|
go func() {
|
|
defer close(results)
|
|
for _, t := range s {
|
|
if err := TryAddToChannel(ctx, results, t); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return results
|
|
}
|
|
|
|
func SliceToChannel[T any](s []T, bufferSize int) <-chan T {
|
|
return SliceToChannelWithContext(context.Background(), s, bufferSize)
|
|
}
|
|
|
|
func CopyChannelWithContext[T any](ctx context.Context, c <-chan T) (a, b <-chan T) {
|
|
left, right := make(chan T, cap(c)), make(chan T, cap(c))
|
|
go func() {
|
|
defer func() {
|
|
close(left)
|
|
close(right)
|
|
}()
|
|
for apicontext.ContextIsNotDone(ctx) {
|
|
if t, err := TrySelectFromChannel(ctx, c); err != nil {
|
|
return
|
|
} else if err := TryAddToChannel(ctx, left, t); err != nil {
|
|
return
|
|
} else if err := TryAddToChannel(ctx, right, t); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return left, right
|
|
}
|