In this section, you will learn about how streams work, how to define them and how to use them. Not all available actions and I/O are listed here. You can see the full list in this section of the documentation.
Concepts
Writing streams
Mirabelle ships with a complete, extensible DSL to define streams. The DSL is heavily inspired by Riemann.
Events are represented as an immutable map. An event has standard fields. All fields are optional.
:host
: the event source. It can be an hostname for example.:service
: What is measured. http_requests_duration_seconds
for example.:state
: A string representing the event state. By convention, ok
, warning
, critical
or expired
are often used.:metric
: A number associated to the event (the value of what is measured).:time
: The event time in second, as a timestamp (1619988803
for example). It could also be a float (1619988803,173413
for example), the Mirabelle/Riemann protocol supports microsecond resolution.:description
: The event description.:ttl
: The duration that the event is considered valid. See the Index documentation for more information about the index and events expiration.:tags
: A list of tags associated to the event (like ["foo" "bar"]
for example).:stream
. It can be used to specify on which stream the event should be send. By default, events are sent to all streags with :default
in their configurations.Steams have a name, and are composed by actions. Let’s define a simple stream named :log
which will log all events it receives:
(streams
(stream {:name :log :default true}
(info)))
The streams
action is the top level one, and will wrap all defined steams. Then, the stream
action will define a stream. The action tapes a map as parameter which indicates the stream name in the :name
key.
The :default
key indicates that events arriving to the Mirabelle TCP server should be sent to this stream (multiple streams can have :default
set, in that case events will be forwarded by default to all of these streams).
The info
action will simply log all events flowing throught it.
Let’s now define another stream:
(streams
(stream {:name :log :default true}
(info))
(stream {:name :http_requests_duration}
(where [:= :service "http_requests_duration_seconds"]
(info)
(over 1.5
(with :state "critical"
(error))))))
In this second example, we still have our first stream named :log
. We also have another stream, a bit more complex, named :http_requests_duration
.
This second stream will first keep only events with services equal to “http_requests_duration_seconds” using the where
action.
Then, it will log (using info
) the events. In another branch, over
is used
tp keep only events with :metric
greater than 1.5
(we can imagine that
we want to alert if an http request takes longer than 1.5 seconds).
Finally, the event :state
is set to “critical” using the with
action, and finally the event is logged as error using error
(we could in a real setup send
an alert to an alerting system like Pagerduty for example).
streams can have multiple branches. It’s not an issue at all, modifying events in multiple branches, streams, or threads will never produce side effects, it’s completely safe.
Now that we know how to write streams, let’s use them.
The Mirabelle DSL should first be compiled to an EDN datastructure before being used by Mirabelle. Let’s take the previous example stream and put it in a file:
(streams
(stream {:name :http_requests_duration}
(where [:= :service "http_requests_duration_seconds"]
(info)
(over 1.5
(with :state "critical"
(error))))))
You then need to compile this file using this command:
java -jar mirabelle.jar compile <source-directory-containing-your-stream> <destination-directory>
For example, let’s say you have put the previous stream in a file named stream.clj
in the /tmp/streams
directory.
If ou launch java -jar mirabelle.jar compile /tmp/streams /tmp/compiled
, your file will be compiled and a new stream.clj
file will be created in the destination directory (which is /tmp/compiled
here).
Let’s do that.
java -jar mirabelle.jar compile /tmp/streams /tmp/compiled
The resulting file in /tmp/compiled/stream.clj
should be:
{:http_requests_duration
{:actions
{:action :sdo,
:children
({:action :where,
:params [[:= :service "http_requests_duration_seconds"]],
:children
({:action :info}
{:action :over,
:params [1.5],
:children
({:action :with,
:children ({:action :error}),
:params [{:state "critical"}]})})})}}}
The Mirabelle DSL was compiled to an EDN representation. You can easily map what you have defined in the DSL (stream name, actions, branches…) to the generated EDN datastructure.
You are now ready to use your stream.
Let’s launch Mirabelle, with the /tmp/compiled
directory referenced into the configuration on the :stream section
(as explained in the configuration documentation).
How to launch Mirabelle is explained in this section.
Once Mirabelle started, you can send events to it. For that, you can check the integration documentation section for the available clients (Riemann clients are fully compatible with Mirabelle). In this example, I will use the Riemann C client which provides a CLI and is available in many Linux package managers.
riemann-client send --metric-f 1 --service "http_requests_duration_seconds" --host=my-host
If I send the previous event, I should see in Mirabelle logs:
{"@timestamp":"2021-05-01T22:48:58.786+02:00","@version":"1","message":"#riemann.codec.Event{:host \"my-host\", :service \"http_requests_duration_seconds\", :state nil, :description nil, :metric 1.0, :tags nil, :time 1.619902138786E9, :ttl nil, :x-client \"riemann-c-client\"}","logger_name":"mirabelle.action","thread_name":"defaultEventExecutorGroup-2-8","level":"INFO","level_value":20000}
My event was indeed logging by the info
action in my stream. Let’s send an event with the metric greater than our threshold:
riemann-client send --metric-f 2 --service "http_requests_duration_seconds" --host=my-host
You will see in the Mirabelle logs:
{"@timestamp":"2021-05-01T22:50:57.960+02:00","@version":"1","message":"#riemann.codec.Event{:host \"my-host\", :service \"http_requests_duration_seconds\", :state nil, :description nil, :metric 2.0, :tags nil, :time 1.61990225796E9, :ttl nil, :x-client \"riemann-c-client\"}","logger_name":"mirabelle.action","thread_name":"defaultEventExecutorGroup-2-2","level":"INFO","level_value":20000}
{"@timestamp":"2021-05-01T22:50:57.961+02:00","@version":"1","message":"#riemann.codec.Event{:host \"my-host\", :service \"http_requests_duration_seconds\", :state \"critical\", :description nil, :metric 2.0, :tags nil, :time 1.61990225796E9, :ttl nil, :x-client \"riemann-c-client\"}","logger_name":"mirabelle.action","thread_name":"defaultEventExecutorGroup-2-2","level":"ERROR","level_value":40000}
The event is logged twice: one time by our info
action, and the second time by error
(you can see the level
key in the log). In the second log, the :state
was set to “critical”. Our threshold works !
More examples are available at the bottom on this page, and available actions are listed in the Actions and I/O reference section of the documentation.
Streams can also be created dynamically using the API.
Mirabelle supports hot reload on a SIGHUP. On a reload, only streams which had their configurations modified will be reloaded. Streams created using the API will be unchanged.
Profiles and readers
Mirabelle generates its configuration using the Aero Clojure library.
You can set the PROFILE
environment variable in order to use Aero profiles:
(streams
(stream {:name :foo :default true}
(where [:and
[:= :service "disk-used"]
[:> :metric #profile {:preprod 70
:prod 60
:default 90}]]
(error))))
In this example, we log events as error if they have :service
equal to “disk-used” and if the :metric
field is greater than a threshold.
This threshold will not be the same depending of the PROFILE
value. By defaut (if PROFILE
is not set) the value will be 90
. If PROFILE
is set to preprod
, the threshold will be 70
, and 60
for prod
.
You can also use other Aero build-in readers described in the Aero readme, like #env
to read an environment variable, #join
and #envf
to build strings based on multiple nevironments variables…
Include
It’s possible to include a configuration file in another one. Let’s take this file named for example log-service.clj
:
(where [:= :service #mirabelle/var :my-service]
(info))
You can then use this file using include
in a Mirabelle stream:
(streams
(stream {:name :foo :default true}
(include "log-service.clj" {:variables {:my-service "disk-used"}})
(include "log-service.clj" {:variables {:my-service "ram-used"}})))
The #mirabelle/var
reader allows you to read a variable passed to the include
action (here, the variable is named :my-service
).
You can also override the default Mirabelle profile (passed as an environment variable) by passing the ;profile
key to the include
options:
(include "log-service.clj" {:variables {:my-service "disk-used"}
:profile :dev})
The include
action allows you to create parameterizable configuration snippets.
In the previous example, we are only logging events, which is not very useful. What if we want to interact with other systems, like sending alerts to a service like Pagerduty, or forward all events a timeserie database like InfluxDB ?
Outputs should be defined in the :outputs
section fo the configuration file. All natively available outputs are described in the Actions and I/O reference section of the documentation.
For example, Mirabelle supports sending alerts to Pagerduty. Let’s configure a Pagerduty client and use it in a stream.
First, modify your configuration file to create an new :pagerduty-client
output:
{:outputs {:pagerduty-client {:config {:service-key "pagerduty-service-key"
:source-key :service
:summary-keys [:host :service :state]
:dedup-keys [:host :service]}
:type :pagerduty}}}
You can now use this output named :pagerduty-client
in a stream by using the output!
action:
(streams
(stream {:name :pagerduty-example}
(output! :pagerduty-client)))
If this event is set to Mirabelle:
riemann-client send --metric-d 100 --service "http_requests_duration_seconds" --state "critical" --host=myhost --attribute=environment=prod
You should see in Pagerduty a new triggered alert named myhost - http_requests_duration_seconds - critical
containing all the informations about your event.
You can check the I/O documentation to have details about how the Pagerduty output can be used (to resolve alert automatically for example).
A special output is async-queue
. You could define an async queue in the :outputs
configuration key:
{:outputs {:thread-pool {:type :async-queue
:config {}}}}
Here are the parameters you can set in the async queue :config
map:
:core-pool-size
: the ThreadPoolExecutor core pool size, default to 1.:max-pool-size
: the ThreadPoolExecutor max pool size, default to 8.:keep-alive-time
: the time threads stays alive when unused: default to 5000 (milliseconds).:queue-size
: the event queue size, default to 10000.You can check the javadoc of the ThreadPoolExecutor
class to know more about these parameters.
This will create a new async queue named :thread-pool
. When you use an async queue in a stream, events will be pushed into the queue and downstream actions will be executed into a dedicated threadpool:
;; we reference the async queue from the config as name
(async-queue! :thread-pool
(some-blocking-action))
You can use async queues to avoid blocking the main Mirabelle threads and achieve better performance.
In Mirabelle, all streams use the events’ time as a wall clock. All side effects (without exception), like flushing windows, will be triggered based on events’ time.
It means the same events, in the same order, will always produce the same result. It’s easy for users to write unit tests on streams and to reason about streams thanks to this feature.
It also allows you to use Mirabelle for a lot of use cases:
Some use cases are explained in this section of the documentation.
This section shows more advanced use cases for streams. Not all actions are described here, the list of all actions is available here.
The first way of filtering events is to use the where
action. For example, (where [:= :service "foo"]
will keep all events with service “foo”.
A lot of predicates can be used in where
:
:pos?
: is the value positive ? [:pos? :metric]
:neg?
: is the value negative ? [:neg? :metric]
:zero?
: is the value equal to zero ? [:zero? :metric]
:>
: is the value greater than a threshold ? [:> :metric 10]
:>=
: is the value greater or equal than a threshold ? [:>= :metric 10]
:<
: is the value lower than a threshold ? [:< :metric 10]
:<=
: is the value lower or equal than a threshold ? [:<= :metric 10]
:=
: is the value equal to the parameter ? [:= :metric 10]
:always-true
: this condition will always be true [:always-true]
:contains
: does the value contain the parameter ? [:contains :tags "foo"]
:absent
: does the value not contain the parameter ? [:absent :tags "foo"]
:regex
: is the value matching the regex ? [:regex :service "foo.*"]
:nil?
: is the value nil ? [:nil? :host]
:not-nil?
: is the value not nil ? [:not-nil? :host]
:not=
: is the value not equal to the parameter ? [:not= :service "bar"]
You can combine then with :or
or :and
. For example, [:and [:= :service "foo"] [:> :metric 10]]
will keep all events with :service
“foo” and :metric
greater than 10.
The split
action is a more powerful where
:
(split
[:> :metric 10] (debug)
[:> :metric 5] (info)
(error))
In this example, debug
will be called if the metric is greater than 10, if not info
is called if the metric is greater than 5, and by default error
is called if nothing matches (the default stream is optional, the event is discarded if not set).
the over
and under
streams can also be used to filter events with :metric
over or under a threshold: (over 3)
, (under 4)
.
You can also filter all events with :state
“critical” using (critical)
, filter events with :state
“warning” using (warning)
, and expired events using (expired)
(not-expired
also exists to do the opposite).
Some streams can also be used to only let pass events if a condition is true for a given period of time.
For example, the above-dt
stream will only let events pass if all events received have their :metric
fields above a threshold for a certain duration:
(above-dt {:metric 1 :duration 60}
(error)))
In this example, above-dt
will let events pass (to log them as error) only if it receives events with :metric
greater than 1 during more than 6O seconds.
The streams below-dt
, between-dt
, outside-dt
, critical-dt
also work that way. They are useful to avoid alerting on spikes for examples.
The tagged-all
stream is also available to keep only events containing one tag or a set of tags: (tagged-all "foo")
or (tagged-all ["foo" "bar"])
.
A lot of actions allow you to modify events. The first one, with
, allows you to set a field (or multiple fields) to some specific values:
(with :state "critical"
(info))
(with [:nested :key] "critical"
(info))
(with {:state "critical" :ttl 60}
(info))
The default
action is similar to with
but it only accepts one value, and will only set the value if the value is not already defined in the event:
(default :ttl 60
(info))
;; nested keys
(default [:nested :key] 60
(info))
The sdissoc
action takes a field or a list of fields and will remove them from the event. For example, (sdissoc :host)
or (sdissoc [:host :service])
.
You can use rename-keys
to rename some events keys:
(rename-keys {:host :service
:environment :env})
In this example, the :host
key will be renamed :service
and the :environment
key is renamed :env
. Existing values will be overrided.
If you want to keep only some keys from an event (and so remove all the others), you can use keep-keys
:
(keep-keys [:host :service :time :metric :description :environment])
Some actions can modify the :metric
field. increment
and decrement
will add +1 or -1 to it, and you can use scale
to multiply it with a value: (scale 1000)
for example.
You can also add tags to events, for example:
;; add the "foo" tag to events
(tag "foo")
;; add the "foo" and "bar" tags to events
(tag ["foo" "bar"])
The percentiles
action allows you to compute percentiles (quantiles) on events:
(percentiles {:percentiles [0.5 0.75 0.99]
:duration 10
:nb-significant-digits 3}
This example will compute the 0.5, 0.75 and 0.99 quantiles every 10 seconds. It also supports the :delay
parameter (to tolerate events arriving late), :highest-trackable-value
and :lowest-discernible-value
to bound results.
The implementation uses the HdrHistogram library.
You can compute the rate of incoming events (by counting them) using the rate
action:
(rate {:duration 20})
This action will send the rate of events downstream every 20 seconds. You can also add a :delay
parameter to tolerate late events.
The :sum
stream can be used to sum events :metrics
field for a period of time:
(sum {:duration 20})
It also supports a :delay
parameter to tolerate late events.
Use mean
to compute the mean of events metrics over time:
(mean {:duration 10}
(info))
It also supports a :delay
parameter to tolerate late events.
The changed
action can be used to detect state transitions.
(changed {:field :state :init "ok"}
(error))
In this example, events will only be passed downstream to the error
action if the :state
value is updated, the default value being ok
. For example:
{:state "ok"} ;; filtered
{:state "critical"} ;; passed downstream
{:state "critical"} ;; filtered
{:state "critical"} ;; filtered
{:state "ok"} ;; passed downstream
You can also use the stable
action to filter flapping states for example:
(stable 60 :state
(info))
In this example, events wll be forwarded to the child action (info
) only if the :state
key is stable (has the same value) for all events during 60 seconds.
You have three available window types in Mirabelle. Like some actions in Mirabelle, time windows will send a list of events downstream instead of an individual event. It means you must be careful about which action you will use downstream. It must be actions supporting list of events.
The first one, fixed-time-window
, will buffer all events during a defined duration and then flush them downstream. For example, (fixed-time-window {:duration 60})
will create windows of 60 seconds. You can pass a :delay
option to fixed-time-window
to tolerate late events and avoid losing them (for example: (fixed-time-window {:duration 60 :delay 30})
).
The fixed-event-window
action will create windows not based on time, but based on the number of events the action receives. For example, (fixed-event-window {:size 10})
will wait until 10 events make it to the buffer, and then pass the entire window downstream.
The moving-event-window
action works like fixed-event-window
but will pass events downstream for every event received. For example, (moving-event-window {:size 10})
will in that case always send the last 10 events downstream.
The moving-time-window
action will return for each event all events from the last duration
seconds, for example: (moving-time-window {:duration 60})
.
The ssort
action can be used to
Let’s take for example (ssort {:duration 10 :field :time})
. For this input:
{:time 1} {:time 10} {:time 4} {:time 9} {:time 13} {:time 31}
The output would be:
{:time 1} {:time 4} {:time 9} {:time 10} {:time 13}
You can pass a :delay
option to ssort
in order to not send events downstream immediately. For example (ssort {:duration 10 :field :time :delay 10})
will send events downstream 10 seconds after their windows are closed.
This action can be very useful to tolerate late events in streams. You could for example use ssort
in front of another action in order to wait for late events.
Several actions can be executed on list of events (produced by windows for example).
Let’s take this example which creates windows of 10 events and forwards them to multiple streams:
(fixed-event-window {:size 10}
(coll-max
(info))
(coll-min
(info))
(coll-sum
(info))
(coll-quotient
(info))
(coll-mean
(info))
(coll-rate
(info))
(coll-count
(info))
(coll-top 10
(info))
(coll-bottom 10
(info))
(coll-where [:= :service "foo"]
(info))
(coll-sort :time
(info)))
coll-max
will forward downstream the event with the biggest :metric
field, coll-min
will forward the event with the smallest :metric
.
coll-sum
will sum all events’ :metric
together.
coll-quotient
will divide the first event’s :metric
by the value of the next events.
coll-top
and coll-bottom
returns the events with the top biggest (or top smallest) :metric
field (for example, coll-top
would return the 10 events with the biggest :metric
).
coll-increase
should receive a list of events representing an always-increasing counter and will compute the increase between the oldest and the latest event.
coll-mean
will compute the mean based on the event’s :metric
fields. coll-rate
computes the rate of events (the sum of all :metrics
divided by the time range, based on the most ancient and most recent events), and coll-count
will return a new event with :metric
being the number of events in the window.
The three previous streams use the latest event from the list of events to build the new event.
The coll-percentiles
action can also be used to compute percentiles on a list of events:
(fixed-time-window {:duration 60}
(coll-percentiles [0.5 0.75 0.98 0.99]
(info)))
In this example, we generate 60-seconds time windows and pass them to the coll-percentiles
action. The action takes the wanted quantiles as parameter.
The coll-percentiles
action will produce for each quantile an event with the :quantile
key set to the quantile value, and the :metric
field set to the value computed from the list of events for this quantile. The quantiles 0
and 1
can also be used to get the smallest and biggest event respectively.
coll-where
will keep only events in the list matching the provided condition (see the filtering events section.
coll-sort
will sort events in the list based on the field passed as parameter.
If needed, for instance if your action doesn’t support event lists, you can use flatten
:
(moving-event-window {:size 10}
(sflatten
(info)))
here, the events produced by moving-event-window
will be sent one by one to info
.
You will often need to split streams for a given field. For example, imagine you want to count the number of events emitted every 60 seconds by host. If you write this stream:
(fixed-time-window {:duration 60}
(coll-count
(info)))
You will have the number of events for all hosts.
But if you write:
(by {:fields [:host]}
(fixed-time-window {:duration 60}
(coll-count
(info))))
The by
action takes a map containing a :fields
key as parameter and will generate a new instance of the downstream actions for each unique values associated to the fields. Here for example, the computation will be done in isolation for each different :host
.
You can also pass the :gc-interval
and :fork-ttl
keys to the by
action.
This will enable garbage collection, of children actions, executed every
:gc-interval
(in seconds) and which will remove actions which didn’t
receive events since :fork-ttl
seconds.
These two streams can help you perform computation on events from multiple sources.
(where [:= :service "http_requests_duration_seconds"]
(coalesce {:duration 10 :fields [:host :environment]}
(coll-percentiles [0.5 0.75 0.98 0.99]
(info))))
coalesce
will return periodically (here, every 10 seconds) the latest event for each, in this example, :host
and environment
. For example, if you have 20 unique host/environment combinations push events regularly, coalesce will emit a list of 20 events (the latest for each combination).
Expired events are not emitted, so if a host stops pushing, its event will not be emitted once the event is expired.
project
works like coalesce but instead of keeping the latest event based on some fields, you should provide where
clauses:
(project [[:= :service "enqueues"]
[:= :service "dequeues"]]
(coll-quotient
(with :service "enqueues per dequeue"
(info))))
In this example, we pass to project two where
clauses, for example to divide the :metric
from the first event with the other one.
You can use the throttle
action to let only some events pass at most every dt seconds. You can for example use it to avoid sending too many alerts to an external system:
(throttle {:count 3 :duration 60}
(error))
In this example, throttle will only forward 3 events to the error
action every 60 seconds.
The dtt
action can be used to convert a counter (which could always increase) to a rate:
(ddt
(info))
If this stream is fed with {:metric 1 :time 1}
and then {:metric 10 :time 4}
(these events could represent the count of requests to a HTTP server for example), the event {:metric (/ 9 3) :time 4}
will be produced. This event is computed by doing (10-1)/3
for the :metric
field, and the :time
field is the time of the latest event.
The result is the rate of requests during this time period.
We already saw in the Action on lists of events: max, min, count, percentiles, rate… section the coll-max
and coll-min
streams. You have two other ways to compute maximum and minimum values without having to build a list of events.
top
and bottom
can be used to get the maximum or minimum events from the last seconds. For example, (top {:duration 10})
will compute the maximum event for windows of 10 seconds. You can also configure these streams with a :delay
value to tolerate late events ((top {:duration 10 :delay 10})
for example).
smax
and smin
will send for each event they receive the maximum or minimum event. The value is never resetted.
If you need to create a value from others keys values, use sformat
:
(sformat \"%s-foo-%s\" :format-test [:host :service])
;; nested keys
(sformat \"%s-foo-%s\" [:nested :key] [[:host :name] :service])
It’s using Clojure https://clojuredocs.org/clojure.core/format function under the hood to associate to a key (that can be nested) the result of the string formatting using the values extracted from the list of keys passed as last parameter.
Sometimes, streams can generate errors. For example, a downstream service can be down, or you could try to do an invalid action on an event. Let’s take an example:
(exception-stream
(with :metric "invalid!"
(increment))
(error))
In this example, we associate a string to the event :metric
field and then we try to increment it. This will produce an exception.
exception-stream
is an action which will catch all exceptions generated on its first child, generate an event from its exception, and pass it to its second child (error
here). You can use it to log errors, or forward them to another system.
The error event generated by exception-stream looks like this:
{:time 1.620331421405E9,
:service "mirabelle-exception"
:state "error"
:metric 1
:tags ["exception" "java.lang.ClassCastException"]
:exception e
:base-event {...}
:description "java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Number..."
The event :time
is the time of the event which generated the exception. The :service
will always be mirabelle-exception
, the :state
“error” and the :metric
1.
The event will have as tag “exception” and the exception class name. The :exception
key will contain the JVM Exception
instance, :base-event
the full event which triggered the exception, and finally :description
will contain a string representation of the exception (including the stacktrace).
You can extract the base event with the extract
action, for example:
(exception-stream
(with :metric "invalid!"
(increment))
(sdo
(error)
(extract :base-event
(info))))
If your event has the the mirabelle/discard
tag, the event will be ignored by every action triggering side effects (logging, I/O, publish!).
You can reinject events from a stream to itself, or to another stream (a dynamic stream for example).
By default, events are reinjected into the current stream. You can specify the name of the targeted stream if needed:
;; send events into the current stream
(reinject!)
;; send events to another stream
(reinject! :custom-dynamic-stream)
Be careful about infinite loops while using reinject!
.