Skip to main content

Built-in Pipeline Behaviors

MediatorK ships six ready-to-use pipeline behaviors. Drop them into MediatorFactory.create — no boilerplate required.


LoggingPipelineBehavior

Logs each request as it enters and exits the pipeline. Accepts any (String) -> Unit logger so it works on every platform.

KMP / common (println)

val mediator = MediatorFactory.create(
registrars = listOf(AppRegistrar()),
pipelineBehaviors = listOf(
LoggingPipelineBehavior(logger = ::println),
),
)

Output:

→ GetUserQuery
← GetUserQuery

JVM — SLF4J

import org.slf4j.LoggerFactory

val log = LoggerFactory.getLogger("Mediator")

LoggingPipelineBehavior(logger = log::info)

Android — Logcat

LoggingPipelineBehavior(logger = { msg -> Log.d("Mediator", msg) })

JS / browser — console

LoggingPipelineBehavior(logger = { msg -> console.log(msg) })

Log the result too

LoggingPipelineBehavior(logger = ::println, logResult = true)
// ← GetUserQuery result=User(id=1, name=Alice)

Parameters

ParameterTypeDefaultDescription
logger(String) -> Unit::printlnFunction that receives each log line
logResultBooleanfalseAppend the result to the exit log line
orderInt-100Low number = outermost — logs before any other behavior

Source

LoggingPipelineBehavior.kt
package com.fajrbahr.mediatork.pipeline.buildin

import com.fajrbahr.mediatork.api.PipelineBehavior
import com.fajrbahr.mediatork.api.Request
import com.fajrbahr.mediatork.api.RequestContext
import com.fajrbahr.mediatork.api.RequestHandlerDelegate

/**
* A [PipelineBehavior] that logs each request as it enters and exits the pipeline,
* including the result on exit.
*
* Accepts any `(String) -> Unit` logger so the same behavior works on every platform:
* pass `::println` for KMP/Native, `Log.d` on Android, an SLF4J logger on JVM,
* or `console::log` on JS/browser.
*
* @param logger function that receives each log line.
* @param order position in the behavior chain. Defaults to `-100` so logging is outermost by default.
*/
class LoggingPipelineBehavior(
val logger: (String) -> Unit = ::println,
override val order: Int = -100,
) : PipelineBehavior {

override suspend fun <TRequest : Request<TResult>, TResult> process(
requestContext: RequestContext,
next: RequestHandlerDelegate<TRequest, TResult>,
request: TRequest,
): TResult {
val name = request::class.simpleName ?: "UnknownRequest"
logger("→ $name")
val result = next(request)
logger("← $name result=$result")
return result
}
}

TimeoutPipelineBehavior

Cancels the downstream pipeline if it does not complete within the given deadline. Throws TimeoutCancellationException (a CancellationException) when the deadline is exceeded.

val mediator = MediatorFactory.create(
registrars = listOf(AppRegistrar()),
pipelineBehaviors = listOf(
TimeoutPipelineBehavior(timeoutMillis = 5_000),
),
)

Parameters

ParameterTypeDefaultDescription
timeoutMillisLongMaximum allowed duration per dispatch. Must be > 0
orderInt0Position in the behavior chain

Source

TimeoutPipelineBehavior.kt
package com.fajrbahr.mediatork.pipeline.buildin

import com.fajrbahr.mediatork.api.PipelineBehavior
import com.fajrbahr.mediatork.api.Request
import com.fajrbahr.mediatork.api.RequestContext
import com.fajrbahr.mediatork.api.RequestHandlerDelegate
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout
import kotlin.time.Duration.Companion.milliseconds

/**
* A [PipelineBehavior] that cancels the downstream pipeline if it does not complete
* within [timeoutMillis] milliseconds.
*
* Throws [TimeoutCancellationException] (a [CancellationException]) when the deadline
* is exceeded. Pair with [RetryPipelineBehavior] (at a lower [order] value) if you
* want to retry timed-out requests.
*
* @param timeoutMillis maximum allowed duration in milliseconds. Must be > 0.
* @param order position in the behavior chain. Defaults to `0`.
*/
class TimeoutPipelineBehavior(
val timeoutMillis: Long,
override val order: Int = 0,
) : PipelineBehavior {

init {
require(timeoutMillis > 0) { "timeoutMillis must be > 0, was $timeoutMillis" }
}

override suspend fun <TRequest : Request<TResult>, TResult> process(
requestContext: RequestContext,
next: RequestHandlerDelegate<TRequest, TResult>,
request: TRequest,
): TResult = withTimeout(timeoutMillis.milliseconds) { next(request) }
}

RequestCounterPipelineBehavior

Counts how many times each request type has passed through the pipeline.

val counter = RequestCounterPipelineBehavior()

val mediator = MediatorFactory.create(
registrars = listOf(AppRegistrar()),
pipelineBehaviors = listOf(counter),
)

mediator.send(GetUserQuery(id = 1))
mediator.send(GetUserQuery(id = 2))
mediator.send(CreateOrderCommand(...))

counter.countFor(GetUserQuery::class) // 2
counter.countFor(CreateOrderCommand::class) // 1
counter.snapshot() // {"GetUserQuery" to 2, "CreateOrderCommand" to 1}

API

MemberDescription
countFor(KClass<*>)Returns the dispatch count for a specific request class
snapshot()Returns a copy of all counts as Map<String, Long>
reset()Clears all counters

Parameters

ParameterTypeDefaultDescription
orderInt0Position in the behavior chain

Source

RequestCounterPipelineBehavior.kt
package com.fajrbahr.mediatork.pipeline.buildin

import com.fajrbahr.mediatork.api.PipelineBehavior
import com.fajrbahr.mediatork.api.Request
import com.fajrbahr.mediatork.api.RequestContext
import com.fajrbahr.mediatork.api.RequestHandlerDelegate
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.reflect.KClass

/**
* A [PipelineBehavior] that counts how many times each request type has passed through
* the pipeline. Counts survive across multiple `send` calls for the lifetime of this
* behavior instance.
*
*
* Thread-safe: uses a [Mutex] so concurrent `send` calls never race on the counter map.
*
* @param order position in the behavior chain. Defaults to `0`.
*/
class RequestCounterPipelineBehavior(
override val order: Int = 0,
) : PipelineBehavior {

private val mutex = Mutex()
private val counts = mutableMapOf<String, Long>()

override suspend fun <TRequest : Request<TResult>, TResult> process(
requestContext: RequestContext,
next: RequestHandlerDelegate<TRequest, TResult>,
request: TRequest,
): TResult {
val key = request::class.simpleName ?: request::class.toString()
mutex.withLock { counts[key] = (counts[key] ?: 0L) + 1L }
return next(request)
}

/** Returns the number of times [requestClass] has been dispatched. */
suspend fun countFor(requestClass: KClass<*>): Long {
val key = requestClass.simpleName ?: requestClass.toString()
return mutex.withLock { counts[key] ?: 0L }
}

/** Returns a snapshot of all counts keyed by request class simple name. */
suspend fun snapshot(): Map<String, Long> = mutex.withLock { counts.toMap() }

/** Resets all counters to zero. */
suspend fun reset() = mutex.withLock { counts.clear() }
}

CachingPipelineBehavior

Caches handler results by request key for a configurable TTL. On a cache hit the handler is skipped entirely. Best suited for query requests whose results change infrequently.

CachingPipelineBehavior(
ttlMs = 30_000, // cache for 30 seconds
keyFor = { req -> req.toString() }, // default: full toString
)
APIDescription
invalidate(key)Remove a single entry
clear()Remove all entries
size()Count of cached entries

Parameters

ParameterTypeDefaultDescription
ttlMsLong60_000Time-to-live per entry in milliseconds
keyFor(Request<*>) -> StringtoString()Cache key function
orderInt0Position in the behavior chain

Source

CachingPipelineBehavior.kt
package com.fajrbahr.mediatork.pipeline.buildin

import com.fajrbahr.mediatork.api.PipelineBehavior
import com.fajrbahr.mediatork.api.Request
import com.fajrbahr.mediatork.api.RequestContext
import com.fajrbahr.mediatork.api.RequestHandlerDelegate
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.time.TimeSource

/**
* A [PipelineBehavior] that caches handler results by request key for a configurable TTL.
*
* On a cache hit the handler is skipped entirely and the cached value is returned.
* On a miss the handler runs and the result is stored. Entries expire after [ttlMs]
* milliseconds — the next call after expiry re-runs the handler and refreshes the entry.
*
* Best suited for **query** requests whose results change infrequently. Commands that
* produce side-effects should not be cached.
*
* @param ttlMs time-to-live for each entry in milliseconds. Defaults to 60 000 (1 minute).
* @param keyFor function that produces a cache key from a request. Defaults to `toString()`.
* @param order position in the behavior chain. Defaults to `0`.
*/
class CachingPipelineBehavior(
val ttlMs: Long = 60_000L,
val keyFor: (Request<*>) -> String = { it.toString() },
override val order: Int = 0,
) : PipelineBehavior {

init {
require(ttlMs > 0) { "ttlMs must be > 0, was $ttlMs" }
}

private val mutex = Mutex()
private val cache = mutableMapOf<String, Entry>()

@Suppress("UNCHECKED_CAST")
override suspend fun <TRequest : Request<TResult>, TResult> process(
requestContext: RequestContext,
next: RequestHandlerDelegate<TRequest, TResult>,
request: TRequest,
): TResult {
val key = keyFor(request)
mutex.withLock {
val entry = cache[key]
if (entry != null && !entry.isExpired()) return entry.value as TResult
}
val result = next(request)
mutex.withLock {
cache[key] = Entry(result, TimeSource.Monotonic.markNow(), ttlMs)
}
return result
}

/** Removes the cached entry for [key]. No-op if the key is not cached. */
suspend fun invalidate(key: String): Unit = mutex.withLock { cache.remove(key); Unit }

/** Removes all cached entries. */
suspend fun clear() = mutex.withLock { cache.clear() }

/** Returns the number of entries currently in the cache (including expired ones not yet evicted). */
suspend fun size(): Int = mutex.withLock { cache.size }

private data class Entry(val value: Any?, val mark: TimeSource.Monotonic.ValueTimeMark, val ttlMs: Long) {
fun isExpired() = mark.elapsedNow().inWholeMilliseconds >= ttlMs
}
}

TimingPipelineBehavior

Measures how long each request takes and reports it via a callback. Timing is always reported — even when the handler throws.

// KMP / println
TimingPipelineBehavior(onTiming = { name, ms -> println("$name took ${ms}ms") })

// Android — Firebase Performance
TimingPipelineBehavior(onTiming = { name, ms ->
FirebasePerformance.getInstance().newTrace(name).also { it.start(); it.stop() }
})

// JVM — Micrometer
TimingPipelineBehavior(onTiming = { name, ms ->
meterRegistry.timer(name).record(ms, TimeUnit.MILLISECONDS)
})

Parameters

ParameterTypeDefaultDescription
onTiming(String, Long) -> UnitCallback with (requestName, durationMs)
orderInt0Position in the behavior chain

Source

TimingPipelineBehavior.kt
package com.fajrbahr.mediatork.pipeline.buildin

import com.fajrbahr.mediatork.api.PipelineBehavior
import com.fajrbahr.mediatork.api.Request
import com.fajrbahr.mediatork.api.RequestContext
import com.fajrbahr.mediatork.api.RequestHandlerDelegate
import kotlin.time.TimeSource

/**
* A [PipelineBehavior] that measures how long each request takes and reports it via a callback.
*
* The callback receives the request's class name and the elapsed duration in milliseconds.
* Timing is always reported — even when the handler throws — so you get latency data for
* both successful and failed requests.
*
* @param onTiming callback invoked after every request with `(requestName, durationMs)`.
* @param order position in the behavior chain. Defaults to `0`.
*/
class TimingPipelineBehavior(
override val order: Int = 0,
val onTiming: (requestName: String, durationMs: Long) -> Unit,
) : PipelineBehavior {

override suspend fun <TRequest : Request<TResult>, TResult> process(
requestContext: RequestContext,
next: RequestHandlerDelegate<TRequest, TResult>,
request: TRequest,
): TResult {
val name = request::class.simpleName ?: "UnknownRequest"
val start = TimeSource.Monotonic.markNow()
try {
return next(request)
} finally {
onTiming(name, start.elapsedNow().inWholeMilliseconds)
}
}
}

ErrorTrackingPipelineBehavior

Intercepts every unhandled exception, forwards it to a callback, then rethrows. Use this to wire crash-reporting services without touching handler code.

// Android — Firebase Crashlytics
ErrorTrackingPipelineBehavior { request, error ->
FirebaseCrashlytics.getInstance().recordException(error)
}

// KMP — Sentry
ErrorTrackingPipelineBehavior { request, error ->
Sentry.captureException(error)
}

Parameters

ParameterTypeDefaultDescription
orderIntInt.MAX_VALUEInnermost by default — fires closest to the handler
onError(Request<*>, Throwable) -> UnitCallback with the request and the exception

Source

ErrorTrackingPipelineBehavior.kt
package com.fajrbahr.mediatork.pipeline.buildin

import com.fajrbahr.mediatork.api.PipelineBehavior
import com.fajrbahr.mediatork.api.Request
import com.fajrbahr.mediatork.api.RequestContext
import com.fajrbahr.mediatork.api.RequestHandlerDelegate

/**
* A [PipelineBehavior] that intercepts every unhandled exception and forwards it to a
* callback before rethrowing.
*
* Use this to wire crash-reporting services (Firebase Crashlytics, Sentry, Bugsnag) into
* the pipeline without touching handler code. The callback receives the original request
* and the throwable — the exception is always rethrown after the callback returns.
*
* @param onError callback invoked on every unhandled exception with `(request, throwable)`.
* @param order position in the behavior chain. Defaults to `Int.MAX_VALUE` (innermost) so it
* fires closest to the handler, after retry/timeout behaviors have already given up.
*/
class ErrorTrackingPipelineBehavior(
override val order: Int = Int.MAX_VALUE,
val onError: (request: Request<*>, error: Throwable) -> Unit,
) : PipelineBehavior {

override suspend fun <TRequest : Request<TResult>, TResult> process(
requestContext: RequestContext,
next: RequestHandlerDelegate<TRequest, TResult>,
request: TRequest,
): TResult {
return try {
next(request)
} catch (e: Throwable) {
onError(request, e)
throw e
}
}
}

Registering multiple built-in behaviors

val counter = RequestCounterPipelineBehavior()

val mediator = MediatorFactory.create(
registrars = listOf(AppRegistrar()),
pipelineBehaviors = listOf(
LoggingPipelineBehavior(logger = ::println, order = -100),
TimeoutPipelineBehavior(timeoutMillis = 5_000, order = -1),
counter,
),
)

Next

Pre / Post Processors