Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ private void setAppId(String appId) {
this.appId = appId;
}

private boolean hasSourceAppId() {
return this.appId != null && !this.appId.isEmpty();
}

private boolean hasTargetAppId(TaskOptions options) {
return options != null && options.hasAppID();
}

@Override
public Instant getCurrentInstant() {
// TODO: Throw if instant is null
Expand Down Expand Up @@ -329,36 +337,32 @@ public <V> Task<V> callActivity(
}

// Add router information for cross-app routing
// Router always has a source app ID from EXECUTIONSTARTED event
OrchestratorService.TaskRouter.Builder routerBuilder = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId);

// Add target app ID if specified in options
if (options != null && options.hasAppID()) {
OrchestratorService.TaskRouter router = null;
if (hasSourceAppId() && hasTargetAppId(options)) {
String targetAppId = options.getAppID();
OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder()
scheduleTaskBuilder.setRouter(OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
.setTargetAppID(targetAppId)
.build();
scheduleTaskBuilder.setRouter(router);
.build());
this.logger.fine(() -> String.format(
"cross app routing detected: source=%s, target=%s",
this.appId, targetAppId));
}

// Capture for use inside lambda
final OrchestratorService.TaskRouter actionRouter = router;

TaskFactory<V> taskFactory = () -> {
int id = this.sequenceNumber++;
OrchestratorService.ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build();
OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction
.newBuilder()
.setId(id)
.setScheduleTask(scheduleTaskBuilder);
if (options != null && options.hasAppID()) {
String targetAppId = options.getAppID();
OrchestratorService.TaskRouter actionRouter = OrchestratorService.TaskRouter.newBuilder()
if (hasSourceAppId() && hasTargetAppId(options)) {
actionBuilder.setRouter(OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
.setTargetAppID(targetAppId)
.build();
actionBuilder.setRouter(actionRouter);
.setTargetAppID(options.getAppID())
.build());
}
this.pendingActions.put(id, actionBuilder.build());

Expand Down Expand Up @@ -499,13 +503,40 @@ public <V> Task<V> callSubOrchestrator(
}
createSubOrchestrationActionBuilder.setInstanceId(instanceId);

// TODO: @cicoyle - add suborchestration cross app logic here when its supported
// Add router information for cross-app routing of sub-orchestrations
if (hasSourceAppId()) {
OrchestratorService.TaskRouter.Builder routerBuilder = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId);

// Add target app ID if specified in options
if (hasTargetAppId(options)) {
routerBuilder.setTargetAppID(options.getAppID());
this.logger.fine(() -> String.format(
"cross app sub-orchestration routing detected: source=%s, target=%s",
this.appId, options.getAppID()));
}

createSubOrchestrationActionBuilder.setRouter(routerBuilder.build());
}

TaskFactory<V> taskFactory = () -> {
int id = this.sequenceNumber++;
this.pendingActions.put(id, OrchestratorService.OrchestratorAction.newBuilder()
OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction
.newBuilder()
.setId(id)
.setCreateSubOrchestration(createSubOrchestrationActionBuilder)
.build());
.setCreateSubOrchestration(createSubOrchestrationActionBuilder);

// Set router on the OrchestratorAction for cross-app routing
if (hasSourceAppId()) {
OrchestratorService.TaskRouter.Builder actionRouterBuilder = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId);
if (hasTargetAppId(options)) {
actionRouterBuilder.setTargetAppID(options.getAppID());
}
actionBuilder.setRouter(actionRouterBuilder.build());
}

this.pendingActions.put(id, actionBuilder.build());

if (!this.isReplaying) {
this.logger.fine(() -> String.format(
Expand Down Expand Up @@ -941,11 +972,20 @@ private void completeInternal(
}

int id = this.sequenceNumber++;
OrchestratorService.OrchestratorAction action = OrchestratorService.OrchestratorAction.newBuilder()
OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction
.newBuilder()
.setId(id)
.setCompleteOrchestration(builder.build())
.build();
this.pendingActions.put(id, action);
.setCompleteOrchestration(builder.build());

// Add router to completion action for cross-app routing back to parent
if (hasSourceAppId()) {
actionBuilder.setRouter(
OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
.build());
}

this.pendingActions.put(id, actionBuilder.build());
this.isComplete = true;
}

Expand Down Expand Up @@ -1009,7 +1049,16 @@ private void processEvent(OrchestratorService.HistoryEvent e) {
this.setInput(executionStarted.getInput().getValue());
this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId());
this.logger.fine(() -> this.instanceId + ": Workflow execution started");
this.setAppId(e.getRouter().getSourceAppID());
// For cross-app suborchestrations, if the router has a target, use that as our appID
// since that's where we're actually executing
if (e.hasRouter()) {
OrchestratorService.TaskRouter router = e.getRouter();
if (router.hasTargetAppID()) {
this.setAppId(router.getTargetAppID());
} else {
this.setAppId(router.getSourceAppID());
}
}

var versionName = "";
if (!StringUtils.isEmpty(this.orchestratorVersionName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,138 @@ void subOrchestration() throws TimeoutException {
}
}

@Test
void subOrchestrationWithActivity() throws TimeoutException {
final String parentOrchestratorName = "ParentOrchestrator";
final String childOrchestratorName = "ChildOrchestrator";
final String activityName = "PlusOne";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(parentOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int childResult = ctx.callSubOrchestrator(childOrchestratorName, input, int.class).await();
ctx.complete(childResult);
})
.addOrchestrator(childOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int result = ctx.callActivity(activityName, input, int.class).await();
ctx.complete(result);
})
.addActivity(activityName, ctx -> ctx.getInput(int.class) + 1)
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, 10);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals(11, instance.readOutputAs(int.class));
}
}

@Test
void subOrchestrationChain() throws TimeoutException {
final String orchestratorName = "ChainOrchestrator";
final String leafOrchestratorName = "LeafOrchestrator";
final String activityName = "Double";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
int input = ctx.getInput(int.class);
// Chain: parent calls child which calls leaf
int result = ctx.callSubOrchestrator(leafOrchestratorName, input, int.class).await();
// Call activity after sub-orchestration completes
result = ctx.callActivity(activityName, result, int.class).await();
ctx.complete(result);
})
.addOrchestrator(leafOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int result = ctx.callActivity(activityName, input, int.class).await();
ctx.complete(result);
})
.addActivity(activityName, ctx -> ctx.getInput(int.class) * 2)
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
// input=3 -> leaf doubles to 6 -> parent doubles to 12
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 3);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals(12, instance.readOutputAs(int.class));
}
}

@Test
void subOrchestrationFanOut() throws TimeoutException {
final String parentOrchestratorName = "FanOutParent";
final String childOrchestratorName = "FanOutChild";
final String activityName = "Square";
final int childCount = 5;

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(parentOrchestratorName, ctx -> {
// Fan out: launch multiple sub-orchestrations in parallel
List<Task<Integer>> tasks = IntStream.range(1, childCount + 1)
.mapToObj(i -> ctx.callSubOrchestrator(childOrchestratorName, i, int.class))
.collect(Collectors.toList());

List<Integer> results = ctx.allOf(tasks).await();
int sum = results.stream().mapToInt(Integer::intValue).sum();
ctx.complete(sum);
})
.addOrchestrator(childOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int result = ctx.callActivity(activityName, input, int.class).await();
ctx.complete(result);
})
.addActivity(activityName, ctx -> {
int val = ctx.getInput(int.class);
return val * val;
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, 0);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
// 1^2 + 2^2 + 3^2 + 4^2 + 5^2 = 1 + 4 + 9 + 16 + 25 = 55
assertEquals(55, instance.readOutputAs(int.class));
}
}

@Test
void subOrchestrationWithInstanceId() throws TimeoutException {
final String parentOrchestratorName = "ParentWithInstanceId";
final String childOrchestratorName = "ChildWithInstanceId";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(parentOrchestratorName, ctx -> {
String childInstanceId = ctx.getInstanceId() + ":child";
String result = ctx.callSubOrchestrator(
childOrchestratorName, "hello", childInstanceId, String.class).await();
ctx.complete(result);
})
.addOrchestrator(childOrchestratorName, ctx -> {
String input = ctx.getInput(String.class);
ctx.complete(input + " world");
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, "test");
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals("hello world", instance.readOutputAs(String.class));
}
}

@Test
void continueAsNew() throws TimeoutException {
final String orchestratorName = "continueAsNew";
Expand Down
Loading
Loading