Code a Step
This guide shows how to implement a pipeline step service and the supporting mappers.
1) Pick the Service Interface
Choose the reactive interface that matches your data flow:
ReactiveService<I, O>: one input → one outputReactiveStreamingService<I, O>: one input → stream of outputsReactiveStreamingClientService<I, O>: stream of inputs → one output
2) Implement the Service
Annotate the class with @PipelineStep so build-time generation can produce adapters.
@PipelineStep(
inputType = PaymentRecord.class,
outputType = PaymentStatus.class,
stepType = StepOneToOne.class,
inboundMapper = PaymentRecordMapper.class,
outboundMapper = PaymentStatusMapper.class
)
@ApplicationScoped
public class ProcessPaymentService implements ReactiveService<PaymentRecord, PaymentStatus> {
@Override
public Uni<PaymentStatus> process(PaymentRecord paymentRecord) {
return Uni.createFrom().item(/* processed payment status */);
}
}3) Add Mappers
Create pair-based MapStruct mappers using TPF's Mapper<Domain, External> interface. Use one mapper per boundary.
Note: The Java type names you choose in your pipeline YAML (or the web UI) drive the DTO/domain fields and the generated proto mappings. See Data Types for the full list and defaults.
@Mapper(
componentModel = "jakarta",
uses = {CommonConverters.class},
unmappedTargetPolicy = ReportingPolicy.WARN
)
public interface PaymentRecordMapper extends Mapper<PaymentRecord, PaymentRecordDto> {
@Override
PaymentRecord fromExternal(PaymentRecordDto dto);
@Override
PaymentRecordDto toExternal(PaymentRecord domain);
}4) Handle Errors
Use Mutiny error handling in your reactive chain:
return processPayment(paymentRecord)
.onItem().transform(result -> createPaymentStatus(paymentRecord, result))
.onFailure().recoverWithUni(error -> Uni.createFrom().item(createErrorStatus(paymentRecord, error)));Use Item Reject Sink flows for per-item recoverable failures that should be audited and handled later:
return processBatch(batchItem)
.onFailure().recoverWithUni(error ->
rejectItem(batchItem, error)
.replaceWith(createSkippedStatus(batchItem)));rejectItem(...) and rejectStream(...) are for expected per-item business rejections that must be tracked and re-driven without failing the full execution. See Item Reject Sink for the canonical model and wiring.
5) Test in Isolation
@QuarkusTest
class ProcessPaymentServiceTest {
@Inject
ProcessPaymentService service;
@Test
void testSuccessfulPaymentProcessing() {
PaymentRecord record = createTestPaymentRecord();
Uni<PaymentStatus> result = service.process(record);
UniAssertSubscriber<PaymentStatus> subscriber =
result.subscribe().withSubscriber(UniAssertSubscriber.create());
subscriber.awaitItem();
}
}Best Practices
- Keep step logic focused on a single responsibility.
- Prefer non-blocking I/O and reactive composition.
- Choose failure handling by intent: use domain responses for expected business outcomes (for example validation/state conflicts), use Item Reject Sink (
rejectItem/rejectStream) for per-item recoverable processing failures that must be tracked for downstream handling, and use execution DLQ for systemic or unrecoverable pipeline/execution failures. - Validate input early and consistently.