Workflow
Purpose
A Workflow is a configured pipeline of processor instances that moves data from a Source through optional processing stages to a Sink. Every Project requires at least one Workflow to process anything.
A Workflow contains exactly one Input Processor, any number of Flow Processors, and any number of Output Processors. Processors are arranged on a canvas and connected by links that define the data flow direction.
Key concepts
- Processor Instance — a concrete placement of an Asset on the Workflow canvas. A Workflow references Assets and creates instances of them with specific configuration for this workflow.
- Resources — a Workflow can declare dependencies on Formats, Services, and Resources that it uses. This makes the dependency explicit and allows inheritance from a parent Project or Workflow.
- Inheritance — like other Assets, Workflows support inheritance. A child Workflow can override scheduler weight, timeout values, and alarming settings while inheriting everything else from its parent.
This Asset can be used by:
| Asset type | Link |
|---|---|
| Projects | Project |
Configuration
Name & Description
-
Name: Name of the Asset. Spaces are not allowed in the name. -
Description: Enter a description.
Inheritance chain of this Asset — If this Asset extends another, the inheritance chain is shown here. Click to navigate to any parent Asset in the chain.
Asset Usage: If the Asset is used by other Assets, the Asset Usage box shows how many times this Asset is used and which parts are referencing it. Otherwise it is not shown. Click to expand and then click to follow, if any.
Required Roles
In case you are deploying to a Cluster which is running (a) Reactive Engine Nodes which have (b) specific Roles
configured, then you can restrict use of this Asset to those Nodes with matching
roles.
If you want this restriction, then enter the names of the Required Roles here. Otherwise, leave empty to match all
Nodes (no restriction).
Resources
Declares explicit dependencies on Formats, Services, and Resources that are not automatically deployed.
Most Formats, Services, and Resources are automatically deployed with a Workflow when they are referenced by any Asset or processor within that Workflow. However, items that are only accessed programmatically — for example, a data dictionary used exclusively in a JavaScript or Python script — must be added here explicitly so that they are included in the deployment.
If a Resource is already deployed as part of another Workflow in the same deployment, it does not need to be added again here.
| Allowed type | Purpose |
|---|---|
| Format | Data format definitions used by processors to parse or serialise messages |
| Service | Service functions (e.g., database queries) callable from processors |
| Resource | Environment variables, shared constants, or other shared resources |
Workflow Settings
Scheduler weight: Relative priority of this Workflow compared to other Workflows on the same engine. Higher values receive more processing time. Set to0(default) for equal weighting.Restart timeout in case of failures [sec]: Number of seconds the engine waits before attempting to restart a failed Workflow instance. If0, no automatic restart occurs.Watchdog timeout [sec]: Number of seconds after which a Workflow that has not made progress is considered stalled. A stalled Workflow triggers the configured alarming. If0, the watchdog is disabled.
Alarming
Configure notifications for Workflow lifecycle events. Each event type uses the Alarm Center to route alerts.
| Event | When it fires |
|---|---|
| When a stream is rolled back | A stream failed and was rolled back to its last committed state |
| When a stream shall be retried | A stream retry has been triggered |
| When a stream has warnings or failures | A stream completed with non-fatal issues |
| When a stream is committed | A stream completed successfully |
Example: IoT Sensor Processing Workflow
This example builds on the TCP Source and Stream Input Processor to create a complete sensor data ingestion pipeline.
Goal
Receive temperature and humidity readings from IoT sensors over TCP, filter out anomalous readings, and write the cleaned records to a log file.
Assets used
| Asset | Role in workflow |
|---|---|
Source TCP SensorTcpSource | Listens on acme.sensor.host:9000 |
Stream Input Processor SensorInput | Reads the TCP stream as newline-delimited JSON |
JavaScript Flow Processor FilterReadings | Filters readings by temperature range |
Stream Output Processor SensorOutput | Writes cleaned records to a file |
Generic Format SensorFormat | Defines the structure of a sensor reading |
File Sink SensorLogSink | Output file sensor-log.json |
Workflow canvas
Workflow settings
| Setting | Value | Reason |
|---|---|---|
| Scheduler weight | 10 | Give this workflow higher priority than batch workflows |
| Restart timeout | 60 | Retry failed streams within 1 minute |
| Watchdog timeout | 300 | Alert if no progress for 5 minutes |
Alarming configuration
| Event | Action |
|---|---|
| Stream rolled back | Alert: "Sensor stream rollback — check sensor connectivity" |
| Stream retry | Log only |
| Warnings or failures | Alert: "Sensor readings contain anomalies" |
| Stream committed | Log only |
What happens at runtime
- SensorTcpSource binds to
0.0.0.0:9000and accepts incoming TCP connections from sensors - SensorInput reads the raw byte stream and uses
SensorFormatto split it into newline-delimited JSON records - FilterReadings (JavaScript Processor) evaluates each reading — records outside a configurable temperature range (e.g., below
-40°Cor above85°C) are silently dropped - SensorOutput writes the remaining records to
sensor-log.jsonusingSensorLogSink - If a stream fails, the engine rolls back and retries automatically. If retries are exhausted, the configured alarms fire.
See Also
- Project — Workflows are contained within Projects
- Stream Input Processor — the required entry point of every Workflow
- Deployment — how Workflows are deployed to an engine
- Alarm Center — how alarming events are routed and handled