Item Reject Sink
Item Reject Sink is a step-level business outcome channel for recoverable, per-item failures. It is not the same as execution failure handling.
Use it when a step can continue processing after rejecting specific items (for example, malformed records in a batch) and you still need auditability and re-drive.
When To Use It
Choose failure handling by intent:
- Use domain responses when the outcome is expected and can be represented directly in business response types.
- Use Item Reject Sink when individual items must be rejected, tracked, and re-driven while the execution continues.
- Use execution DLQ only for terminal orchestration failures in queue-async mode.
Step APIs
Step contracts expose:
rejectItem(...)for single-item rejection.rejectStream(...)for stream-level rejection with sample and count metadata.
Example (StepOneToOne style):
@Override
public Uni<PaymentStatus> process(PaymentRecord paymentRecord) {
return processPayment(paymentRecord)
.onItem().transform(result -> PaymentStatus.approved(result))
.onFailure().recoverWithUni(error ->
rejectItem(paymentRecord, error)
.replaceWith(PaymentStatus.rejected("REJECTED_FOR_REDRIVE")));
}Example (StepManyToOne style):
@Override
public Uni<BatchResult> process(Multi<PaymentRecord> input) {
return aggregate(input)
.onFailure().recoverWithUni(error ->
rejectStream(List.of(), 0L, error)
.replaceWith(BatchResult.completedWithRejects()));
}What Gets Published
Default reject envelopes are metadata-only:
- execution/correlation/idempotency metadata when available,
- step identity,
- retry/attempt metadata,
- error class/message,
- timestamp,
- deterministic fingerprint.
Payload inclusion is opt-in:
pipeline.item-reject.include-payload=trueImplication:
- With the default
include-payload=false, sink entries are designed for audit/triage, not automatic replay payload reconstruction. - If you need automated re-drive from reject messages, that replay path is application-owned and must be implemented explicitly.
Provider Model
Configure with pipeline.item-reject.*:
provider:log,memory,sqsstrict-startuppublish-failure-policy- provider-specific settings (
memory-capacity,sqs.queue-url,sqs.region,sqs.endpoint-override)
Production guard:
- If effective step config enables
recoverOnFailure=true, - and selected sink is non-durable (
logormemory), - startup fails in production launch mode.
This protects recover-and-continue flows from silent loss of reject events.
Relationship To Execution DLQ
Item Reject Sink and execution DLQ serve different scopes:
- Item Reject Sink: item-level, expected recover-and-continue path.
- Execution DLQ: execution-level, terminal failure path.
For operational triage and queue-async crash recovery, see Error Handling and Recovery.