Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1322,4 +1322,109 @@ public static FuncCallHttpStep post(

return http(name).POST().endpoint(endpoint, auth).body(body);
}

/**
* Extracts and deserializes the workflow input data into the specified type from a workflow
* context.
*
* <p>This utility method provides type-safe access to the workflow's initial input.
*
* <p>Use this method when you have access to the {@link WorkflowContextData} and need to retrieve
* the original input that was provided when the workflow instance was started.
*
* <p><b>Usage Example:</b>
*
* <pre>{@code
* inputFrom((object, WorkflowContextData workflowContext) -> {
* OrderRequest order = input(workflowContext, OrderRequest.class);
* return new Input(order);
* });
* }</pre>
*
* @param <T> the type to deserialize the input into
* @param context the workflow context containing instance data and input
* @param inputClass the class object representing the target type for deserialization
* @return the deserialized workflow input object of type T
*/
public static <T> T input(WorkflowContextData context, Class<T> inputClass) {
return context
.instanceData()
.input()
.as(inputClass)
.orElseThrow(
() ->
new IllegalStateException(
"Workflow input is missing or cannot be deserialized into type "
+ inputClass.getName()
+ " when calling FuncDSL.input(WorkflowContextData, Class<T>)."));
}

/**
* Extracts and deserializes the task input data into the specified type from a task context.
*
* <p>This utility method provides type-safe access to a task's input.
*
* <p>Use this method when you have access to the {@link TaskContextData} and need to retrieve the
* input provided to that task.
*
* <p><b>Usage Example:</b>
*
* <pre>{@code
* inputFrom((Object obj, TaskContextData taskContextData) -> {
* OrderRequest order = input(taskContextData, OrderRequest.class);
* return order;
* });
* }</pre>
*
* @param <T> the type to deserialize the input into
* @param taskContextData the task context from which to retrieve the task input
* @param inputClass the class object representing the target type for deserialization
* @return the deserialized task input object of type T
*/
public static <T> T input(TaskContextData taskContextData, Class<T> inputClass) {
return taskContextData
.input()
.as(inputClass)
.orElseThrow(
() ->
new IllegalStateException(
"Workflow input is missing or cannot be deserialized into type "
+ inputClass.getName()
+ " when calling FuncDSL.input(TaskContextData, Class<T>)."));
}

/**
* Extracts and deserializes the output data from a task into the specified type.
*
* <p>This utility method provides type-safe access to a task's output.
*
* <p>Use this method when you need to access the result/output produced by a task execution. This
* is particularly useful in subsequent tasks that need to process or transform the output of a
* previous task in the workflow.
*
* <p><b>Usage Example:</b>
*
* <pre>{@code
* .exportAs((object, workflowContext, taskContextData) -> {
* Long output = output(taskContextData, Long.class);
* return output * 2;
* })
* }</pre>
*
* @param <T> the type to deserialize the task output into
* @param taskContextData the task context containing the output data
* @param outputClass the class object representing the target type for deserialization
* @return the deserialized task output object of type T
*/
public static <T> T output(TaskContextData taskContextData, Class<T> outputClass) {
return taskContextData
.output()
.as(outputClass)
.orElseThrow(
() ->
new IllegalStateException(
"Task output is missing or cannot be deserialized into type "
+ outputClass.getName()
+ " when calling FuncDSL.output(TaskContextData, Class<T>)."));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverless.workflow.impl.executors.func;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.input;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.output;

import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.Test;

public class FuncDSLDataFlowTransformationHelpersTest {

@Test
void test_input_with_inputFrom() {

SoftAssertions softly = new SoftAssertions();

Workflow workflow =
FuncWorkflowBuilder.workflow("reviewSubmissionWithModel")
.tasks(
function(
"add5",
(Long input) -> {
softly.assertThat(input).isEqualTo(10L);
return input + 5;
},
Long.class),
function("returnEnriched", (Long enrichedValue) -> enrichedValue, Long.class)
.inputFrom(
(object, workflowContext) -> {
softly.assertThat(object).isEqualTo(15L);
Long input = input(workflowContext, Long.class);
softly.assertThat(input).isEqualTo(10L);
return object + input;
},
Long.class))
.build();

try (WorkflowApplication app = WorkflowApplication.builder().build()) {
WorkflowDefinition def = app.workflowDefinition(workflow);
WorkflowModel model = def.instance(10L).start().join();
Number number = model.asNumber().orElseThrow();
softly.assertThat(number.longValue()).isEqualTo(25L);
}

softly.assertAll();
}

@Test
void test_input_with_outputAs() {

SoftAssertions softly = new SoftAssertions();

Workflow workflow =
FuncWorkflowBuilder.workflow("enrichOutputWithModelTest")
.tasks(
function(
"add5",
(Long input) -> {
softly.assertThat(input).isEqualTo(10L);
return input + 5;
},
Long.class)
.outputAs(
(object, workflowContext, taskContextData) -> {
softly.assertThat(object).isEqualTo(15L);
Long input = input(workflowContext, Long.class);
softly.assertThat(input).isEqualTo(10L);
return input + object;
},
Long.class))
.build();

try (WorkflowApplication app = WorkflowApplication.builder().build()) {
WorkflowDefinition def = app.workflowDefinition(workflow);

WorkflowModel model = def.instance(10L).start().join();
Number number = model.asNumber().orElseThrow();

softly.assertThat(number.longValue()).isEqualTo(25L);
}

softly.assertAll();
}

@Test
void test_output_with_exportAs() {

SoftAssertions softly = new SoftAssertions();

Workflow workflow =
FuncWorkflowBuilder.workflow("enrichOutputWithInputTest")
.tasks(
function(
"add5",
(Long input) -> {
softly.assertThat(input).isEqualTo(10L);
return input + 5;
},
Long.class)
.exportAs(
(object, workflowContext, taskContextData) -> {
Long taskOutput = output(taskContextData, Long.class);
softly.assertThat(taskOutput).isEqualTo(15L);
return taskOutput * 2;
}))
.build();

try (WorkflowApplication app = WorkflowApplication.builder().build()) {
WorkflowDefinition def = app.workflowDefinition(workflow);
WorkflowModel model = def.instance(10L).start().join();
Number number = model.asNumber().orElseThrow();
softly.assertThat(number.longValue()).isEqualTo(15L);
}

softly.assertAll();
}

@Test
void test_input_with_inputFrom_fluent_way() {
SoftAssertions softly = new SoftAssertions();

Workflow workflow =
FuncWorkflowBuilder.workflow("enrichOutputWithInputTest")
.tasks(
function("sumFive", (Long input) -> input + 5, Long.class)
.inputFrom(
(object, workflowContext, taskContextData) ->
input(taskContextData, Long.class) * 2))
.build();

try (WorkflowApplication app = WorkflowApplication.builder().build()) {
WorkflowDefinition def = app.workflowDefinition(workflow);
WorkflowModel model = def.instance(10L).start().join();
Number number = model.asNumber().orElseThrow();

softly.assertThat(number.longValue()).isEqualTo(25L);
}

softly.assertAll();
}
}