Stream
The Stream class is the underlying class of the stream object. The stream instance represents the stream which is being processed at the time and is automatically available for access in any script-based Processor, e.g. the Javascript Processor. It provides a number of functions which can - and sometimes have to - be invoked to control stream processing. It is used to emit messages to output ports, request retries, rollbacks, and log messages, access the stream's metadata, and more.
Properties
id
readonlyid:string
The ID of the stream is represented as a v4 UUID string. It is unique for each stream and automatically generated by layline.io. The ID is used to identify the stream in the system. It is used in the Audit Trail and in the logs.
Example
const STREAM_ID = stream.id;
name
readonlyname:string
Returns the name of a stream.
For file-based processing, the stream name is the name of the file being processed. For other sources, you usually explicitly set the stream name in the respective Source Asset.
For example if your data stems from a Service Source Asset, then the stream name is defined in the configuration of that Asset like so:
${msg:IoT.StreamName}-${date:yyyy-MM-dd-HH-mm-ss} which will result in DeviceX-2022-10-10-21-45-33.
If your data source is a file, then this would be the file name, e.g. my_file_name.csv.
Same as getName.
Example
const STREAM_NAME = stream.name;
originalName
readonlyoriginalName:string
Returns the original name of a stream including the configured prefix and suffix.
Same as getOriginalName.
Example
const ORIGINAL_STREAM_NAME = stream.originalName;
path
readonlypath:string
Returns the path of a stream.
Example
const STREAM_PATH = stream.path;
prefix
readonlyprefix:string
Get the detected prefix of a stream.
Same as getPrefix.
Example
const PREFIX = stream.prefix;
suffix
readonlysuffix:string
Get the detected suffix of a stream.
Same as getSuffix.
Example
const SUFFIX = stream.suffix;
Methods
emit()
emit(
message,outputPort):void
Emit a message to the specified output port. Once emitted, the context of the message is lost. If you need to emit a message to another output port, you need to clone the original message first, and then emit the clone to that other output port.
Parameters
message
outputPort
Returns
void
void
Example
OUTPUT_PORT = processor.getOutputPort('Output'); // Get the output port by the name given in the UI
stream.emit(message, OUTPUT_PORT); // Emits the message to the output port
getId()
getId():
string
Each Stream in layline.io has an ID which uniquely identifies a stream. Use this method to retrieve this unique ID.
Returns
string
Unique Stream ID
Example
const STREAM_ID = stream.getId();
getMetadata()
getMetadata():
Message
Retrieves metadata information from a stream. Returns the information in form of a Message. Depending on the type of stream, the message contains differing information. For example a file-based stream will return other data than a Kafka stream.
Returns
- Returns a message which contains the stream-type specific data.
Stream-type specific message return content:
File Stream:
| Property | Type | Description |
|---|---|---|
| Path | System.String | Directory path from which the file was read |
| Size | System.Long | File size in bytes |
| LastModified | System.DateTime | Last modified date and time |
| FolderSetup | System.String | Name of the directory path related folder configuration |
FTP Stream:
| Property | Type | Description |
|---|---|---|
| Path | System.String | Directory path from which the file was read |
| Size | System.Long | File size in bytes |
| LastModified | System.DateTime | Last modified date and time |
| FolderSetup | System.String | Name of the directory path related folder configuration |
HTTP Stream:
| Property | Type | Description |
|---|---|---|
| BindAddress | System.String | IP-Address |
| BindPort | System.Integer | IP-Port number |
Kafka (Exclusive Partition Stream):
| Property | Type | Description |
|---|---|---|
| GroupId | System.String | Consumer group id |
| Topics | System.String[] | Array of topics |
Kafka (Standard Stream):
| Property | Type | Description |
|---|---|---|
| GroupId | System.String | Consumer group id |
| Topic | System.String | Topic name of the exclusive partition |
| Partition | System.Integer | Partition number |
AWS S3 Service Source Stream:
| Property | Type | Description |
|---|---|---|
| Path | System.String | S3 path |
| Size | System.Long | S3 object size |
| StorageClass | System.String | S3 storage class |
| LastModified | System.DateTime | Last modified date and time |
AWS SQS Source Stream:
| Property | Type | Description |
|---|---|---|
| QueueUrl | System.String |
OneDrive Stream:
| Property | Type | Description |
|---|---|---|
| Path | System.String | OneDrive path from which the file was read |
| Size | System.Long | File size in bytes |
| LastModified | System.DateTime | Last modified date and time |
| FolderSetup | System.String | Name of the path related folder configuration |
SharePoint Stream:
| Property | Type | Description |
|---|---|---|
| Path | System.String | SharePoint path from which the file was read |
| Size | System.Long | File size in bytes |
| LastModified | System.DateTime | Last modified date and time |
| FolderSetup | System.String | Name of the path related folder configuration |
Serial Source Stream:
| Property | Type | Description |
|---|---|---|
| Port | System.String | Port name |
| BaudRate | System.Integer | Baud rate |
| DataBits | System.Integer | |
| StopBits | System.String | |
| Parity | System.String | |
| FlowControl | System.String |
Secondary Stream:
| Property | Type | Description |
|---|---|---|
| ParentStreamName | System.String | Name of the originating stream (parent) |
| ParentStreamId | System.String | Id of the originating stream (parent) |
Service Source Stream:
| Property | Type | Description |
|---|---|---|
| Service | System.String | Service name |
TCP Source Stream:
| Property | Type | Description |
|---|---|---|
| LocalAddress | System.String | |
| LocalPort | System.Integer | |
| RemoteAddress | System.String | |
| RemotePort | System.Integer |
WebSocket Source Stream:
| Property | Type | Description |
|---|---|---|
| LocalAddress | System.String | |
| LocalPort | System.Integer | |
| RemoteAddress | System.String | |
| RemotePort | System.Integer |
Example
// Get the metadata for the stream (a Kafka stream in our example):
const msgMetadata = stream.getMetadata();
// Result:
// msgMetadata.data = {
// GroupId: "MyConsumerGroup",
// Topic: "mytopic",
// Partition: "0"
// }
}
getName()
getName():
string
Returns the name of a stream.
For file-based processing, the stream name is the name of the file being processed excluding the configured prefix and suffix. For other sources, you usually explicitly set the stream name in the respective Source Asset.
For example if your data stems from a Service Source Asset, then the stream name is defined in the configuration of that Asset like so:
${msg:IoT.StreamName}-${date:yyyy-MM-dd-HH-mm-ss} which will result in DeviceX-2022-10-10-21-45-33.
If your data source is a file, then this would be the file name, e.g. my_file_name.csv.
Same as Stream.name.
Returns
string
Stream name, e.g. my_file_name.csv
Example
const STREAM_NAME = stream.getName();
getOriginalName()
getOriginalName():
string
Returns the original name of a stream including the configured prefix and suffix.
Same as Stream.originalName.
Returns
string
Stream name, e.g. my_file_name.csv
Example
const ORIGINAL_STREAM_NAME = stream.getOriginalName();
getPath()
getPath():
string
Returns the path of a stream.
Returns
string
Stream path
Example
const STREAM_PATH = stream.getPath();
getPrefix()
getPrefix():
string
Get the detected prefix of a stream.
Same as Stream.prefix.
Returns
string
Prefix of the stream
Example
const PREFIX = stream.getPrefix();
getSuffix()
getSuffix():
string
Get the detected suffix of a stream.
Same as Stream.suffix.
Returns
string
Prefix of the stream
Example
const SUFFIX = stream.getSuffix();
logError()
logError(
msg):void
Logs a message with Severity.ERROR to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.
Parameters
msg
string
Information you want to log.
Returns
void
Example
stream.logError('Ran into the following problem: ' + problem);
logFatal()
logFatal(
msg):void
Logs a message with Severity.FATAL to the stream log. You can view this both via the Audit Trail in the UI and output in the process terminal output.
Parameters
msg
string
Information you want to log.
Returns
void
Example
stream.logFatal('Ran into the following problem: ' + problem);