Skip to content

Support consuming N messages from channel in one command#600

Closed
pradhankukiran wants to merge 5 commits into
indeedeng:mainfrom
pradhankukiran:feature/issue-301-channel-consume-n-messages
Closed

Support consuming N messages from channel in one command#600
pradhankukiran wants to merge 5 commits into
indeedeng:mainfrom
pradhankukiran:feature/issue-301-channel-consume-n-messages

Conversation

@pradhankukiran

Copy link
Copy Markdown

Description

Implements server-side support for AtLeast/AtMost fields on InterStateChannelCommand (fields already exist in IDL but were unused). Enables three consumption modes:

  • Exact N: AtLeast=N, AtMost=N — wait for exactly N, consume exactly N
  • OneToAll: AtLeast=1 (no AtMost) — wait for at least 1, consume all available
  • ZeroToAll: AtLeast=0 (no AtMost) — don't wait, consume all available

Messages are consumed atomically. Gated behind global version 11 for backward compat with existing workflows.

Note: The Values field on InterStateChannelResult was added manually to the generated model. A corresponding update to iwf-idl/iwf.yaml is needed so make idl-code-gen produces the same result.

Checklist

  • Code compiles correctly
  • Tests for the changes have been added
  • All tests passing
  • This PR change is backwards-compatible
  • This PR CONTAINS a (planned) breaking change (it is not backwards compatible)

Related Issue

Closes #301

@longquanzheng

Copy link
Copy Markdown
Contributor

thanks for contribution. Can you add test to make sure it works?

@codecov

codecov Bot commented Mar 18, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 88.49558% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 65.13%. Comparing base (0c659fb) to head (747f4d9).

Files with missing lines Patch % Lines
service/interpreter/workflowImpl.go 79.62% 5 Missing and 6 partials ⚠️
service/interpreter/deciderTriggerer.go 88.88% 1 Missing and 1 partial ⚠️

❌ Your patch check has failed because the patch coverage (88.49%) is below the target coverage (90.00%). You can increase the patch coverage or adjust the target coverage.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #600      +/-   ##
==========================================
+ Coverage   64.74%   65.13%   +0.39%     
==========================================
  Files          64       64              
  Lines        6595     6695     +100     
==========================================
+ Hits         4270     4361      +91     
- Misses       1997     2002       +5     
- Partials      328      332       +4     
Files with missing lines Coverage Δ
service/interfaces.go 85.71% <ø> (ø)
service/interpreter/InternalChannel.go 81.35% <100.00%> (-12.93%) ⬇️
service/interpreter/activityImpl.go 65.20% <100.00%> (+0.78%) ⬆️
service/interpreter/continueAsNewer.go 87.16% <100.00%> (+0.08%) ⬆️
service/interpreter/globalVersioner.go 89.47% <100.00%> (+0.58%) ⬆️
service/interpreter/deciderTriggerer.go 92.30% <88.88%> (+5.82%) ⬆️
service/interpreter/workflowImpl.go 86.31% <79.62%> (-0.51%) ⬇️

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pradhankukiran pradhankukiran force-pushed the feature/issue-301-channel-consume-n-messages branch from 5ee0e69 to 6a37310 Compare March 25, 2026 10:52
@pradhankukiran pradhankukiran force-pushed the feature/issue-301-channel-consume-n-messages branch from 6a37310 to 1bd86db Compare March 28, 2026 12:16
@pradhankukiran

Copy link
Copy Markdown
Author

@longquanzheng added integration tests covering all 4 consumption modes (ExactN, OneToAll, ZeroToAll, AtMostOnly) across both Temporal and Cadence backends, with and without ContinueAsNew.

Also fixed a bug where AtLeast=0, AtMost=1 would panic on an empty channel, Retrieve() doesn't handle empty channels, so the code now routes through the safe RetrieveUpToN path when atLeast == 0.

commandRequest iwfidl.CommandRequest,
completedTimerCommands map[int]service.InternalTimerStatus,
completedSignalCommands, completedInterStateChannelCommands map[int]*iwfidl.EncodedObject,
completedInterStateChannelMultiCmds map[int][]*iwfidl.EncodedObject,

@longquanzheng longquanzheng May 28, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we can just change completedInterStateChannelCommands to be map[int][]*iwfidl.EncodedObject?
Or that's a breaking change that will break the determinism?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing completedInterStateChannelCommands to map[int][]*EncodedObject would break continue-as-new compatibility. Existing snapshots have object values, not arrays, so old waiting workflows may fail unmarshal/replay. Separate CompletedInterStateChannelMultiCmds keeps old snapshot shape intact.

@pradhankukiran

Copy link
Copy Markdown
Author

I am adding a follow-up commit. Plain is to keep CompletedInterStateChannelCommands as-is for continue-as-new compatibility, add InterStateChannelResult.values through IDL/codegen, validate invalid atLeast/atMost, and add focused unit coverage for the new channel/decider paths.
I’ll update generated Go from the IDL locally, but the canonical iwf-idl change may need a separate PR/update in the IDL repo.

@longquanzheng

Copy link
Copy Markdown
Contributor

I am adding a follow-up commit. Plain is to keep CompletedInterStateChannelCommands as-is for continue-as-new compatibility, add InterStateChannelResult.values through IDL/codegen, validate invalid atLeast/atMost, and add focused unit coverage for the new channel/decider paths. I’ll update generated Go from the IDL locally, but the canonical iwf-idl change may need a separate PR/update in the IDL repo.

yes, please open a separate PR in idl repo and merge in that first.

Comment thread service/interpreter/workflowImpl.go Outdated
Comment on lines +1187 to +1193
return fmt.Errorf("InterStateChannelCommand atLeast cannot be negative")
}
if cmd.HasAtMost() && cmd.GetAtMost() < 0 {
return fmt.Errorf("InterStateChannelCommand atMost cannot be negative")
}
if cmd.HasAtLeast() && cmd.HasAtMost() && cmd.GetAtMost() < cmd.GetAtLeast() {
return fmt.Errorf("InterStateChannelCommand atMost cannot be less than atLeast")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning plain error here will cause workflow task running in a loop. we can return application erorr to fail the workflow directly.

Comment thread service/interfaces.go
@@ -160,6 +160,9 @@ type (
CompletedTimerCommands map[int]InternalTimerStatus `json:"completedTimerCommands"`
CompletedSignalCommands map[int]*iwfidl.EncodedObject `json:"completedSignalCommands"`
CompletedInterStateChannelCommands map[int]*iwfidl.EncodedObject `json:"completedInterStateChannelCommands"`

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you leave a comment here? are we deprecating that or it's still being used?

stateExecutionLocal = startResponse.GetUpsertStateLocals()
}

if globalVersioner.IsAfterVersionOfChannelConsumeN() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a bit more why this is necssary here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why we have to do this. can you elaborate? maybe with an example?
thanks

Comment thread service/interpreter/workflowImpl.go Outdated

if received {
completedInterStateChannelCmds[idx] = interStateChannel.Retrieve(cmd.ChannelName)
for {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a for loop? or maybe I missed something(sorry btw I could be wrong, haven't touched the code for a while)

Comment thread service/interpreter/workflowImpl.go Outdated
Comment on lines +813 to +816
if len(values) > 0 {
completedInterStateChannelCmds[idx] = values[0]
} else {
completedInterStateChannelMultiCmds[idx] = values

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's better not to always backfill the legacy field for single cmd? this means activity input will have duplicate data which cause the history size grow faster.

Since SDK need to change to support this anyway, we should change the SDK to only use the new field only.

For compatibility of old sdk, it will not send the mult-command, server will still need to check if it's single command, then fallback to the old code path and use the single command completed fields.

@pradhankukiran

Copy link
Copy Markdown
Author

@longquanzheng Opened the IDL PR here: indeedeng/iwf-idl#91

Comment thread service/interpreter/workflowImpl.go Outdated
Comment on lines +793 to +794
// Another command thread may consume from the same channel after this
// thread's Await condition is true. Re-check during retrieval; if data

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may consume from the same channel after this thread's Await condition is true

I don't think that's possible in Cadence/Temporal's threading model(cooperative instead of preemtive).

Are you sure about this? Maybe you can write a test to confirm that behaviro for me.

Basically, once a thread(goroutine) got return from Await, it will continue to run until next blocking operation. There will not be any interruption between that.

@pradhankukiran

Copy link
Copy Markdown
Author

Good point. Removed the retry loop and the comment. Since workflow goroutines are cooperative, once Await returns this thread runs the synchronous retrieval without another command thread interleaving first, so a single consume path is enough.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't usually have test like this

@longquanzheng

Copy link
Copy Markdown
Contributor

@longquanzheng Opened the IDL PR here: indeedeng/iwf-idl#91

oh , you also need to do it for iwf-sdk.yaml https://github.com/indeedeng/iwf-idl/blob/main/iwf-sdk.yaml

@pradhankukiran pradhankukiran deleted the feature/issue-301-channel-consume-n-messages branch June 13, 2026 12:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support get (up to) N or all messages from channel in one command

2 participants