From Sequential Scripts to Concurrent AI Pipelines in Go — Part 4

MM
Moinuddin M Masud
6 min read

Part 4 — Channels: Removing the Lock Entirely

Part of the series: Production-Grade Concurrent AI Systems in Go

Full code for this post: github.com/madmmas/go-concurrent-ai-systems/tree/part-04Diff from Part 3: compare/part-03...part-04Run it: go run ./cmd/news-processor inside arc-1-foundations/part-04-channels


Part 3 left us with correct, fast, race-free code. A mutex protects the results slice, and as long as you lock only the append and nothing more, the performance holds up.

"As long as you lock only the append" is doing some work in that sentence, though. We proved in Part 3 that getting the lock boundary wrong turns a 3.4-second pipeline into a 32-second one — silently, with no error, no warning, just a number that's ten times worse than it should be. That's a sharp edge sitting in otherwise-correct-looking code.

Go offers a different design that avoids the edge entirely, not by being more careful with locks, but by not having a shared variable to lock in the first place.


The Idea

Instead of every worker reaching into the same slice, each worker sends its result somewhere. One single goroutine receives those results and is the only thing that ever touches the slice.

Worker 1 ─┐
Worker 2 ─┼──→ channel ──→ single collector ──→ results slice
Worker 3 ─┘

If only one goroutine ever writes to results, there's nothing to race. Not "nothing racing because we're careful" — nothing racing because the structure of the code makes a race impossible regardless of how careful anyone is later. That's a meaningfully stronger guarantee than a mutex gives you, because a mutex only works if every single access remembers to acquire it. Forget one mu.Lock() anywhere in a large codebase and the race is back. A channel with one consumer doesn't have that failure mode.


Building It

A channel in Go is a typed pipe. You send values into one end, and receive them from the other.

resultsCh := make(chan model.AIResult)

Workers send into it instead of appending to a slice:

go func(a model.Article) {
    defer wg.Done()
    result := p.processArticle(a)
    resultsCh <- result // send — no lock, no shared variable
}(article)

And a single goroutine — the collector — receives:

results := make([]model.AIResult, 0, len(articles))
for result := range resultsCh {
    results = append(results, result)
}

Only the collector goroutine ever runs that append. Every worker only ever sends. There's exactly one writer to results in the entire program.


The Part That's Easy to Get Wrong

Here's the full ProcessAll, and the detail worth paying close attention to:

func (p *ChannelProcessor) ProcessAll(articles []model.Article) ([]model.AIResult, time.Duration) {
    start := time.Now()

    resultsCh := make(chan model.AIResult)
    var wg sync.WaitGroup

    for _, article := range articles {
        wg.Add(1)
        go func(a model.Article) {
            defer wg.Done()
            result := p.processArticle(a)
            resultsCh <- result
        }(article)
    }

    go func() {
        wg.Wait()
        close(resultsCh) // <-- this goroutine, separate from the one below
    }()

    results := make([]model.AIResult, 0, len(articles))
    for result := range resultsCh {
        results = append(results, result)
    }

    return results, time.Since(start)
}

That small anonymous goroutine wrapping wg.Wait() and close(resultsCh) is not decoration — it's load-bearing. range over a channel keeps pulling values until the channel is both closed and drained. If nothing ever closes resultsCh, the for result := range resultsCh loop blocks forever, waiting for a value that will never arrive. The program hangs. No panic, no error — just a process that never exits, which in a real deployment looks identical to "still working" until someone notices the CPU graph flatlined an hour ago.

And it has to be a separate goroutine specifically because wg.Wait() blocks. If you called wg.Wait() directly in main() before the collector loop starts, you'd deadlock: wg.Wait() is waiting for workers to finish, but the unbuffered channel send resultsCh <- result inside each worker is waiting for the collector to receive — and the collector hasn't started yet because wg.Wait() hasn't returned. Everyone is waiting on everyone else. Moving wg.Wait() into its own goroutine breaks that cycle: it can sit there waiting in the background while the main goroutine gets on with receiving from the channel.


Does It Hold Up?

Run it:

[3] Summarization started (676ms)
...
[3] Summarization completed
[3] Sentiment Analysis started (1.331s)
...
Collected result for article 3
Collected result for article 2
Collected result for article 5
Collected result for article 4
Collected result for article 1

Processed 5 in 3.715s

Article 3 — the one with the shortest simulated latency — finishes and gets collected first, exactly as you'd expect from genuinely concurrent work. Five articles complete in roughly the time of the single slowest one, same as the mutex version.

With the race detector:

go run -race ./cmd/news-processor
Processed 5 in 3.638s

No warnings. Same speed as Part 3's correctly-locked mutex version, with one fewer thing that can be implemented wrong. There's no lock-scope decision to get right here — there isn't a lock to scope.


Mutex or Channel — When Do You Reach for Which?

It's tempting after this comparison to conclude channels are simply better and mutexes are obsolete. That's not quite right, and the Go standard library itself leans on mutexes constantly — sync.Map, much of net/http's internals, plenty of production code at companies running Go at scale.

The honest distinction is about shape. A mutex fits naturally when you have one piece of shared state — a counter, a cache, a map — that several goroutines need to read or modify directly. A channel fits naturally when you have a flow: producers generating values, consumers processing them, data moving through stages. Our results-collection problem turned out to fit the channel shape well, because that's genuinely what it is — workers producing results, one consumer collecting them.

If you instead needed, say, a shared in-memory cache of already-summarized articles that any worker could check before calling the LLM, a mutex-protected map is probably the more natural fit than trying to route everything through a channel. Neither tool is strictly better. The question is which shape matches the problem.


What's Next

We now have a clean, race-free, channel-based pipeline — and it still has a structural limit we haven't addressed: one goroutine per article. At ten or even a hundred articles that's invisible. At a hundred thousand, you're asking the Go runtime to schedule a hundred thousand goroutines simultaneously, and asking whatever AI provider you're calling to absorb a hundred thousand concurrent requests — which no real provider will tolerate before rate-limiting you into the ground.

In Part 5, we fix that by decoupling concurrency from article count entirely, using a fixed-size worker pool. You'll get to choose exactly how many goroutines run, independent of how many articles show up.

See you in Part 5.


This is Part 4 of the series "Production-Grade Concurrent AI Systems in Go." Read Part 3 — Race Conditions and Mutexes or continue to Part 5 — Worker Pools and Bounded Concurrency.