Skip to content

Commit 20df3fa

Browse files
eolivellimendonk
andauthored
Add new flow control agents: timer-source and trigger-event (#110)
Co-authored-by: Mendon Kissling <59585235+mendonk@users.noreply.github.com>
1 parent a70f6dd commit 20df3fa

File tree

6 files changed

+98
-8
lines changed

6 files changed

+98
-8
lines changed

SUMMARY.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@
100100
* [unwrap-key-value](pipeline-agents/data-transform/unwrap-key-value.md)
101101
* [Flow control](pipeline-agents/flow-control/README.md)
102102
* [dispatch](pipeline-agents/flow-control/dispatch.md)
103+
* [timer-source](pipeline-agents/flow-control/timer-source.md)
104+
* [trigger-event](pipeline-agents/flow-control/trigger-event.md)
103105
* [Custom Agents](pipeline-agents/custom-agents/README.md)
104106
* [Agent Developer Guide](pipeline-agents/agent-developer-guide/README.md)
105107
* [Agent Types](pipeline-agents/agent-developer-guide/agent-types.md)

building-applications/expression-language.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ Utility methods available under the `fn` namespace. For example, to get the curr
4949
| fromJson | Parse input as JSON. |
5050
| lowercase(input) | Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null. |
5151
| now() | Returns the current epoch millis. |
52+
| uuid() | Returns a new UUID (unique id) as string |
53+
| random(number) | Returns a random number between zero the the max value provided as argument |
5254
| replace(input,regex,replacement) | Replaces each substring of `input` that matches the `regex` regular expression with `replacement`. See [Java's replaceAll](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/String.html#replaceAll\(java.lang.String,java.lang.String\)). |
5355
| str(input) | Converts `input` to a string.|
5456
| toString(input) | Converts `input` to a string.|

pipeline-agents/flow-control/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,6 @@ layout:
1515
# Flow control agents
1616

1717

18-
* [Dispatch](dispatch.md)
18+
* [Dispatch](dispatch.md)
19+
* [Timer source](timer-source.md)
20+
* [Trigger Event](trigger-event.md)

pipeline-agents/flow-control/dispatch.md

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ This agent enables you to dispatch records to different topics, based on conditi
44

55
### Example
66

7-
Example of reading from a container, extracting the document text, splitting the text into chunks, and outputting each chunk as a message to the output topic.
7+
This is an example about how to use the `dispatch` agent to dispatch records to different topics based on the language of the text.
88

99
```yaml
1010
- name: "Detect language"
@@ -26,14 +26,13 @@ Example of reading from a container, extracting the document text, splitting the
2626
action: drop
2727
```
2828
29-
This example detect the language of the input record and then decides to send english and french records to different topics.
30-
You could then implement another pipeline to perform different computation based on the language.
31-
32-
In case the language is not recognized, the message will be dropped.
33-
In case the language is recognized but the it's neither english or french, it will continue the pipeline - in this case it will be written in the `default-topic`
29+
This example detects the language of the input record and then dispatches english and french records to different topics.
30+
You could then implement another pipeline to perform different computations based on the language.
3431
32+
If the language is not recognized, the message is dropped.
33+
If the language is recognized but it's neither english or french, the message will be written to the `default-topic`
3534

3635

3736
### Configuration
3837

39-
Checkout the full configuration properties in the [API Reference page](../../building-applications/api-reference/agents.md#dispatch).
38+
Check out the full configuration properties in the [API Reference page](../../building-applications/api-reference/agents.md#dispatch).
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# timer-source
2+
3+
This source agent periodically emits a new record (an "event") and passes it to the pipeline.
4+
5+
### Example
6+
7+
This is an example using the `timer-source` agent to emit a new record every 60 seconds.
8+
9+
```yaml
10+
pipeline:
11+
- name: "Timer"
12+
type: "timer-source"
13+
configuration:
14+
period-seconds: 60
15+
fields:
16+
- name: "key.id"
17+
expression: "fn:uuid()"
18+
- name: "value.string_payload"
19+
expression: "'constant-string-payload'"
20+
- name: "value.int_payload"
21+
expression: "42"
22+
- name: "properties.foo"
23+
expression: "'some property'"
24+
```
25+
26+
In this example the `timer-source` agent emits one record every 60 seconds.
27+
28+
## Defining the contents of the output record
29+
30+
The agent allows you to configure a set of fields that will be written to the output record.
31+
As usual you can write to the key part, the value part and the properties of the record.
32+
Use the [expression language](../../building-applications/expression-language.md) to define the fields and write the expression.
33+
34+
### Configuration
35+
36+
Checkout the full configuration properties in the [API Reference page](../../building-applications/api-reference/agents.md#timer-source).
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# trigger-event
2+
3+
This agent writes a new record (an "event") to a different topic, based on a condition.
4+
It can also optionally drop the record from the main flow of the pipeline.
5+
6+
This agent is different from the [dispatch agent](dispatch.md) as the record that is sent to the new topic is a new record, not the same record that was received.
7+
8+
### Example
9+
10+
This is an example using the `trigger-event` agent to write a new record to a different topic when a condition is met.
11+
12+
```yaml
13+
- name: "Split some text"
14+
type: "text-splitter"
15+
input: input-topic-splitter
16+
configuration:
17+
....
18+
19+
- name: "Trigger event on last chunk"
20+
type: "trigger-event"
21+
output: output-topic-chunks
22+
configuration:
23+
destination: drop-stale-chunks-topic
24+
continue-processing: true
25+
when: fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)
26+
fields:
27+
- name: "value.filename"
28+
expression: "key.filename"
29+
```
30+
31+
In this example the `text-splitter` agent splits a text into a set of chunks.
32+
When the last chunk is processed, the `trigger-event` agent writes a new record to the `drop-stale-chunks-topic` topic.
33+
The new record will have a filename field in the value part ("value.filename").
34+
35+
## Defining the contents of the output record
36+
37+
The `trigger-event` agent configures a set of fields that will be written to the output record.
38+
As usual you can write to the key part, the value part, and the properties of the record.
39+
Use the [expression language](../../building-applications/expression-language.md) to define the fields and write the expression.
40+
41+
42+
### Aborting the processing downstream
43+
44+
The `trigger-event` agent can also abort the processing of the record downstream by setting the `continue-processing` property to `false`.
45+
This is useful in cases where you have some system events that you want to write to a different topic, but you don't want to continue processing the record downstream.
46+
47+
### Configuration
48+
49+
Check out the full configuration properties in the [API Reference page](../../building-applications/api-reference/agents.md#trigger-event).

0 commit comments

Comments
 (0)