Support consuming N messages from channel in one command#600
Support consuming N messages from channel in one command#600pradhankukiran wants to merge 5 commits into
Conversation
|
thanks for contribution. Can you add test to make sure it works? |
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (88.49%) is below the target coverage (90.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #600 +/- ##
==========================================
+ Coverage 64.74% 65.13% +0.39%
==========================================
Files 64 64
Lines 6595 6695 +100
==========================================
+ Hits 4270 4361 +91
- Misses 1997 2002 +5
- Partials 328 332 +4
🚀 New features to boost your workflow:
|
5ee0e69 to
6a37310
Compare
6a37310 to
1bd86db
Compare
|
@longquanzheng added integration tests covering all 4 consumption modes (ExactN, OneToAll, ZeroToAll, AtMostOnly) across both Temporal and Cadence backends, with and without ContinueAsNew. Also fixed a bug where AtLeast=0, AtMost=1 would panic on an empty channel, Retrieve() doesn't handle empty channels, so the code now routes through the safe RetrieveUpToN path when atLeast == 0. |
| commandRequest iwfidl.CommandRequest, | ||
| completedTimerCommands map[int]service.InternalTimerStatus, | ||
| completedSignalCommands, completedInterStateChannelCommands map[int]*iwfidl.EncodedObject, | ||
| completedInterStateChannelMultiCmds map[int][]*iwfidl.EncodedObject, |
There was a problem hiding this comment.
do you think we can just change completedInterStateChannelCommands to be map[int][]*iwfidl.EncodedObject?
Or that's a breaking change that will break the determinism?
There was a problem hiding this comment.
Changing completedInterStateChannelCommands to map[int][]*EncodedObject would break continue-as-new compatibility. Existing snapshots have object values, not arrays, so old waiting workflows may fail unmarshal/replay. Separate CompletedInterStateChannelMultiCmds keeps old snapshot shape intact.
|
I am adding a follow-up commit. Plain is to keep |
yes, please open a separate PR in idl repo and merge in that first. |
| return fmt.Errorf("InterStateChannelCommand atLeast cannot be negative") | ||
| } | ||
| if cmd.HasAtMost() && cmd.GetAtMost() < 0 { | ||
| return fmt.Errorf("InterStateChannelCommand atMost cannot be negative") | ||
| } | ||
| if cmd.HasAtLeast() && cmd.HasAtMost() && cmd.GetAtMost() < cmd.GetAtLeast() { | ||
| return fmt.Errorf("InterStateChannelCommand atMost cannot be less than atLeast") |
There was a problem hiding this comment.
returning plain error here will cause workflow task running in a loop. we can return application erorr to fail the workflow directly.
| @@ -160,6 +160,9 @@ type ( | |||
| CompletedTimerCommands map[int]InternalTimerStatus `json:"completedTimerCommands"` | |||
| CompletedSignalCommands map[int]*iwfidl.EncodedObject `json:"completedSignalCommands"` | |||
| CompletedInterStateChannelCommands map[int]*iwfidl.EncodedObject `json:"completedInterStateChannelCommands"` | |||
There was a problem hiding this comment.
can you leave a comment here? are we deprecating that or it's still being used?
| stateExecutionLocal = startResponse.GetUpsertStateLocals() | ||
| } | ||
|
|
||
| if globalVersioner.IsAfterVersionOfChannelConsumeN() { |
There was a problem hiding this comment.
can you explain a bit more why this is necssary here?
There was a problem hiding this comment.
I still don't understand why we have to do this. can you elaborate? maybe with an example?
thanks
|
|
||
| if received { | ||
| completedInterStateChannelCmds[idx] = interStateChannel.Retrieve(cmd.ChannelName) | ||
| for { |
There was a problem hiding this comment.
I don't think we need a for loop? or maybe I missed something(sorry btw I could be wrong, haven't touched the code for a while)
| if len(values) > 0 { | ||
| completedInterStateChannelCmds[idx] = values[0] | ||
| } else { | ||
| completedInterStateChannelMultiCmds[idx] = values |
There was a problem hiding this comment.
I feel like it's better not to always backfill the legacy field for single cmd? this means activity input will have duplicate data which cause the history size grow faster.
Since SDK need to change to support this anyway, we should change the SDK to only use the new field only.
For compatibility of old sdk, it will not send the mult-command, server will still need to check if it's single command, then fallback to the old code path and use the single command completed fields.
|
@longquanzheng Opened the IDL PR here: indeedeng/iwf-idl#91 |
| // Another command thread may consume from the same channel after this | ||
| // thread's Await condition is true. Re-check during retrieval; if data |
There was a problem hiding this comment.
may consume from the same channel after this thread's Await condition is true
I don't think that's possible in Cadence/Temporal's threading model(cooperative instead of preemtive).
Are you sure about this? Maybe you can write a test to confirm that behaviro for me.
Basically, once a thread(goroutine) got return from Await, it will continue to run until next blocking operation. There will not be any interruption between that.
|
Good point. Removed the retry loop and the comment. Since workflow goroutines are cooperative, once Await returns this thread runs the synchronous retrieval without another command thread interleaving first, so a single consume path is enough. |
There was a problem hiding this comment.
we don't usually have test like this
oh , you also need to do it for iwf-sdk.yaml https://github.com/indeedeng/iwf-idl/blob/main/iwf-sdk.yaml |
Description
Implements server-side support for
AtLeast/AtMostfields onInterStateChannelCommand(fields already exist in IDL but were unused). Enables three consumption modes:AtLeast=N, AtMost=N— wait for exactly N, consume exactly NAtLeast=1(no AtMost) — wait for at least 1, consume all availableAtLeast=0(no AtMost) — don't wait, consume all availableMessages are consumed atomically. Gated behind global version 11 for backward compat with existing workflows.
Note: The
Valuesfield onInterStateChannelResultwas added manually to the generated model. A corresponding update toiwf-idl/iwf.yamlis needed somake idl-code-genproduces the same result.Checklist
Related Issue
Closes #301