diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java
index ff2db37e..8b2f9aa0 100644
--- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java
+++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java
@@ -1322,4 +1322,109 @@ public static FuncCallHttpStep post(
return http(name).POST().endpoint(endpoint, auth).body(body);
}
+
+ /**
+ * Extracts and deserializes the workflow input data into the specified type from a workflow
+ * context.
+ *
+ *
This utility method provides type-safe access to the workflow's initial input.
+ *
+ *
Use this method when you have access to the {@link WorkflowContextData} and need to retrieve
+ * the original input that was provided when the workflow instance was started.
+ *
+ *
Usage Example:
+ *
+ *
{@code
+ * inputFrom((object, WorkflowContextData workflowContext) -> {
+ * OrderRequest order = input(workflowContext, OrderRequest.class);
+ * return new Input(order);
+ * });
+ * }
+ *
+ * @param the type to deserialize the input into
+ * @param context the workflow context containing instance data and input
+ * @param inputClass the class object representing the target type for deserialization
+ * @return the deserialized workflow input object of type T
+ */
+ public static T input(WorkflowContextData context, Class inputClass) {
+ return context
+ .instanceData()
+ .input()
+ .as(inputClass)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Workflow input is missing or cannot be deserialized into type "
+ + inputClass.getName()
+ + " when calling FuncDSL.input(WorkflowContextData, Class)."));
+ }
+
+ /**
+ * Extracts and deserializes the task input data into the specified type from a task context.
+ *
+ * This utility method provides type-safe access to a task's input.
+ *
+ *
Use this method when you have access to the {@link TaskContextData} and need to retrieve the
+ * input provided to that task.
+ *
+ *
Usage Example:
+ *
+ *
{@code
+ * inputFrom((Object obj, TaskContextData taskContextData) -> {
+ * OrderRequest order = input(taskContextData, OrderRequest.class);
+ * return order;
+ * });
+ * }
+ *
+ * @param the type to deserialize the input into
+ * @param taskContextData the task context from which to retrieve the task input
+ * @param inputClass the class object representing the target type for deserialization
+ * @return the deserialized task input object of type T
+ */
+ public static T input(TaskContextData taskContextData, Class inputClass) {
+ return taskContextData
+ .input()
+ .as(inputClass)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Workflow input is missing or cannot be deserialized into type "
+ + inputClass.getName()
+ + " when calling FuncDSL.input(TaskContextData, Class)."));
+ }
+
+ /**
+ * Extracts and deserializes the output data from a task into the specified type.
+ *
+ * This utility method provides type-safe access to a task's output.
+ *
+ *
Use this method when you need to access the result/output produced by a task execution. This
+ * is particularly useful in subsequent tasks that need to process or transform the output of a
+ * previous task in the workflow.
+ *
+ *
Usage Example:
+ *
+ *
{@code
+ * .exportAs((object, workflowContext, taskContextData) -> {
+ * Long output = output(taskContextData, Long.class);
+ * return output * 2;
+ * })
+ * }
+ *
+ * @param the type to deserialize the task output into
+ * @param taskContextData the task context containing the output data
+ * @param outputClass the class object representing the target type for deserialization
+ * @return the deserialized task output object of type T
+ */
+ public static T output(TaskContextData taskContextData, Class outputClass) {
+ return taskContextData
+ .output()
+ .as(outputClass)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Task output is missing or cannot be deserialized into type "
+ + outputClass.getName()
+ + " when calling FuncDSL.output(TaskContextData, Class)."));
+ }
}
diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLDataFlowTransformationHelpersTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLDataFlowTransformationHelpersTest.java
new file mode 100644
index 00000000..b1a8236b
--- /dev/null
+++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLDataFlowTransformationHelpersTest.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverless.workflow.impl.executors.func;
+
+import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function;
+import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.input;
+import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.output;
+
+import io.serverlessworkflow.api.types.Workflow;
+import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
+import io.serverlessworkflow.impl.WorkflowApplication;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Test;
+
+public class FuncDSLDataFlowTransformationHelpersTest {
+
+ @Test
+ void test_input_with_inputFrom() {
+
+ SoftAssertions softly = new SoftAssertions();
+
+ Workflow workflow =
+ FuncWorkflowBuilder.workflow("reviewSubmissionWithModel")
+ .tasks(
+ function(
+ "add5",
+ (Long input) -> {
+ softly.assertThat(input).isEqualTo(10L);
+ return input + 5;
+ },
+ Long.class),
+ function("returnEnriched", (Long enrichedValue) -> enrichedValue, Long.class)
+ .inputFrom(
+ (object, workflowContext) -> {
+ softly.assertThat(object).isEqualTo(15L);
+ Long input = input(workflowContext, Long.class);
+ softly.assertThat(input).isEqualTo(10L);
+ return object + input;
+ },
+ Long.class))
+ .build();
+
+ try (WorkflowApplication app = WorkflowApplication.builder().build()) {
+ WorkflowDefinition def = app.workflowDefinition(workflow);
+ WorkflowModel model = def.instance(10L).start().join();
+ Number number = model.asNumber().orElseThrow();
+ softly.assertThat(number.longValue()).isEqualTo(25L);
+ }
+
+ softly.assertAll();
+ }
+
+ @Test
+ void test_input_with_outputAs() {
+
+ SoftAssertions softly = new SoftAssertions();
+
+ Workflow workflow =
+ FuncWorkflowBuilder.workflow("enrichOutputWithModelTest")
+ .tasks(
+ function(
+ "add5",
+ (Long input) -> {
+ softly.assertThat(input).isEqualTo(10L);
+ return input + 5;
+ },
+ Long.class)
+ .outputAs(
+ (object, workflowContext, taskContextData) -> {
+ softly.assertThat(object).isEqualTo(15L);
+ Long input = input(workflowContext, Long.class);
+ softly.assertThat(input).isEqualTo(10L);
+ return input + object;
+ },
+ Long.class))
+ .build();
+
+ try (WorkflowApplication app = WorkflowApplication.builder().build()) {
+ WorkflowDefinition def = app.workflowDefinition(workflow);
+
+ WorkflowModel model = def.instance(10L).start().join();
+ Number number = model.asNumber().orElseThrow();
+
+ softly.assertThat(number.longValue()).isEqualTo(25L);
+ }
+
+ softly.assertAll();
+ }
+
+ @Test
+ void test_output_with_exportAs() {
+
+ SoftAssertions softly = new SoftAssertions();
+
+ Workflow workflow =
+ FuncWorkflowBuilder.workflow("enrichOutputWithInputTest")
+ .tasks(
+ function(
+ "add5",
+ (Long input) -> {
+ softly.assertThat(input).isEqualTo(10L);
+ return input + 5;
+ },
+ Long.class)
+ .exportAs(
+ (object, workflowContext, taskContextData) -> {
+ Long taskOutput = output(taskContextData, Long.class);
+ softly.assertThat(taskOutput).isEqualTo(15L);
+ return taskOutput * 2;
+ }))
+ .build();
+
+ try (WorkflowApplication app = WorkflowApplication.builder().build()) {
+ WorkflowDefinition def = app.workflowDefinition(workflow);
+ WorkflowModel model = def.instance(10L).start().join();
+ Number number = model.asNumber().orElseThrow();
+ softly.assertThat(number.longValue()).isEqualTo(15L);
+ }
+
+ softly.assertAll();
+ }
+
+ @Test
+ void test_input_with_inputFrom_fluent_way() {
+ SoftAssertions softly = new SoftAssertions();
+
+ Workflow workflow =
+ FuncWorkflowBuilder.workflow("enrichOutputWithInputTest")
+ .tasks(
+ function("sumFive", (Long input) -> input + 5, Long.class)
+ .inputFrom(
+ (object, workflowContext, taskContextData) ->
+ input(taskContextData, Long.class) * 2))
+ .build();
+
+ try (WorkflowApplication app = WorkflowApplication.builder().build()) {
+ WorkflowDefinition def = app.workflowDefinition(workflow);
+ WorkflowModel model = def.instance(10L).start().join();
+ Number number = model.asNumber().orElseThrow();
+
+ softly.assertThat(number.longValue()).isEqualTo(25L);
+ }
+
+ softly.assertAll();
+ }
+}