Persistence Plugin
The persistence plugin is a foundational side-effect plugin that stores pipeline data without changing the stream. It is designed to be minimal and composable: you bring the provider dependency you want, and the plugin handles the rest.
What it does
- Observes stream elements and persists them
- Returns the original element unchanged
- Selects the appropriate persistence provider at runtime
Module layout
The plugin is split into two parts:
- Plugin library:
plugins/foundational/persistence - Service host module: e.g.
examples/.../persistence-svc
The host module exists so the annotation processor can generate typed transport adapters and client steps in a concrete module that knows your domain types.
Required dependencies
The service host module should depend on:
common(domain types and mappers)plugins/foundational/persistence- One or more persistence providers (reactive or blocking)
Provider selection
Providers implement PersistenceProvider<T> and declare whether they support the current execution context. The persistence plugin will:
- Find a provider that supports the item type
- Ensure it matches the current runtime context (reactive vs blocking)
- Persist the entity and return the original item
This lets you plug in multiple backends without changing the plugin code.
To lock a specific provider (recommended for production), set persistence.provider.class to the provider's fully qualified class name. The persistence manager will fail fast if the configured provider cannot be found or does not support the current execution context. For build-time validation, pass -Apersistence.provider.class=<fqcn> to the annotation processor.
Parallelism guidance
Persistence providers can declare ordering hints. When a provider does not declare hints, the framework assumes RELAXED ordering and SAFE thread safety and emits warnings. With pipeline.parallelism=AUTO, the framework will run providers that advise strict ordering sequentially (with a warning); with pipeline.parallelism=PARALLEL the framework will allow parallel execution and warn that ordering advice is overridden.
If your workload depends on ordering (for example, sequence numbers or cross-step dependencies), keep the pipeline in SEQUENTIAL or AUTO.
Runtime note
Reactive persistence requires a Mutiny session or transaction. The plugin ensures this by running persistence calls inside a transaction boundary (via the persistence manager). If you add custom persistence providers, keep the reactive session/transaction requirement in mind.
Configuring the aspect
Enable the persistence aspect in your pipeline config and point it at the plugin implementation class:
aspects:
persistence:
enabled: true
scope: "GLOBAL"
position: "AFTER_STEP"
config:
pluginImplementationClass: "org.pipelineframework.plugin.persistence.PersistenceService"The framework expands this into side-effect steps that observe the stream after each step.
AFTER_STEP observes the output of a step (the next step's BEFORE_STEP). This means a single position captures every boundary except one cap. AFTER_STEP misses the first input boundary; BEFORE_STEP misses the final output boundary. Use two aspects if you need both caps.