Vector Aggregator — Transform and Route
Parse, enrich, and route logs from Kafka to storage backends with Vector Aggregator
The Problem
Raw logs from Kafka need enrichment before indexing. You may want to add metadata, filter noisy events, route errors to a separate index, or transform fields. The aggregator handles this centrally — changes apply to all logs without touching individual agents.
Vector Aggregator Configuration
# infrastructure/vector/vector-aggregator.toml
# SOURCES: Read from Kafka topic
[sources.kafka]
type = "kafka"
bootstrap_servers = "kafka:9092"
topics = ["app-logs"]
group_id = "vector-aggregator"
decoding.codec = "json"
# TRANSFORMS: Enrich before indexing
[transforms.enrich]
type = "remap"
inputs = ["kafka"]
source = '''
if !exists(.timestamp) {
.timestamp = now()
}
.aggregator_processed_at = now()
.pipeline = "kafka-to-elasticsearch"
'''
# SINKS: Ship to Elasticsearch
[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["enrich"]
endpoints = ["http://elasticsearch:9200"]
bulk.index = "app-logs-%Y-%m-%d"
encoding.except_fields = ["source_type"]
[sinks.elasticsearch.buffer]
type = "disk"
max_size = 268435456 # 256MB
when_full = "block"
Architecture Role
Vector Agent → Kafka → [Vector Aggregator] → Elasticsearch
↓
Enrich, transform,
filter, route
The aggregator is the central processing point. Agents are lightweight collectors. The aggregator does the heavy lifting.
Transform: Enrichment
source = '''
if !exists(.timestamp) {
.timestamp = now()
}
.aggregator_processed_at = now()
.pipeline = "kafka-to-elasticsearch"
'''
- Ensures every event has a timestamp
- Adds processing metadata for debugging the pipeline itself
Elasticsearch Sink
Daily Indices
bulk.index = "app-logs-%Y-%m-%d"
Creates indices like app-logs-2026-04-11. Daily indices enable:
- Easy retention (delete old indices)
- Smaller index sizes for faster queries
- ILM (Index Lifecycle Management) policies
Disk Buffer
[sinks.elasticsearch.buffer]
type = "disk"
max_size = 268435456
when_full = "block"
If Elasticsearch is slow or temporarily down, Vector buffers to disk. No logs lost.
Advanced Transforms
Filter Noisy Events
[transforms.filter_health]
type = "filter"
inputs = ["enrich"]
condition = '.path != "/health" && .path != "/ready" && .path != "/metrics"'
Health check logs are noise — filter them out before indexing.
Route Errors to Separate Index
[transforms.route_by_level]
type = "route"
inputs = ["enrich"]
route.errors = '.level == "error" || .level == "critical"'
route._unmatched = true
[sinks.es_errors]
type = "elasticsearch"
inputs = ["route_by_level.errors"]
endpoints = ["http://elasticsearch:9200"]
bulk.index = "app-errors-%Y-%m-%d"
[sinks.es_all]
type = "elasticsearch"
inputs = ["route_by_level._unmatched"]
endpoints = ["http://elasticsearch:9200"]
bulk.index = "app-logs-%Y-%m-%d"
Errors go to app-errors-* (fast to search, separate alerts). Normal logs go to app-logs-*.
Running the Aggregator
# infrastructure/docker-compose.vector.yml
services:
vector-aggregator:
image: timberio/vector:0.35.0-alpine
container_name: vector-aggregator
volumes:
- ./vector/vector-aggregator.toml:/etc/vector/vector.toml:ro
- vector-agg-data:/var/lib/vector
depends_on:
- kafka
Next Step
In the next lesson, we set up Elasticsearch and Kibana — the storage and visualization layer for our log pipeline.