Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
Database Manual
/ / /

sp.process() (mongosh method)

sp.process()

New in version 7.0: Creates an ephemeral Stream Processor on the current Stream Processing Instance.

This method is supported in Atlas Stream Processing Instances.

The sp.process() method has the following syntax:

sp.process(
[
<pipeline>
]
)

sp.createStreamProcessor() takes these fields:

Field
Type
Necessity
Description

pipeline

array

Required

Stream aggregation pipeline you want to apply to your streaming data.

sp.process() creates an ephemeral, unnamed stream processor on the current stream processing instance and immediately initializes it. This stream processor only persists as long as it runs. If you terminate an ephemeral stream processor, you must create it again in order to use it.

The user running sp.process() must have the atlasAdmin role.

The following example creates an ephemeral stream processor which ingests data from the sample_stream_solar connection. The processor excludes all documents where the value of the device_id field is device_8, passing the rest to a tumbling window with a 10-second duration. Each window groups the documents it receives, then returns various useful statistics of each group. The stream processor then merges these records to solar_db.solar_coll over the mongodb1 connection.

sp.process(
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

Back

sp.listStreamProcessors

On this page