In today’s data-driven world, effectively processing and analyzing log data is crucial for maintaining system health, security, and performance. Logstash, a powerful open-source data processing pipeline, serves as a central hub for collecting, transforming, and forwarding log data to various destinations.
Logstash operates through three core components:
- Inputs: Collect data from multiple sources
- Filters: Transform and enrich the data
- Outputs: Ship the processed data to desired destinations
This guide focuses on practical aspects of using Logstash for log processing, filtering, and field extraction, with particular emphasis on common data processing tasks and integration with Elasticsearch.
Understanding Logstash Pipeline Architecture
Basic Pipeline Structure
Logstash implements a straightforward yet powerful pipeline architecture that processes data through three distinct stages:
- Input → Filter → Output flow
- Data enters through input plugins
- Undergoes transformation via filters
- Exits through output plugins
The event-driven processing model ensures efficient handling of data streams, with each event processed independently through the pipeline stages.
Key Architectural Components
Logstash’s architecture relies on several crucial components:
- Worker Threads: Handle parallel processing of events
- Queue Management: Buffer events between pipeline stages
- Processing Stages: Coordinate data flow and transformations
Working with Input Plugins
Common Input Plugin Types
Logstash offers various input plugins to collect data from different sources:
- File Input
input {
file {
path => "/var/log/*.log"
type => "system_logs"
start_position => "beginning"
}
}
- Beats Integration
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
- Network-based Inputs
input {
tcp {
port => 12345
type => "network_data"
}
}
Configuration Examples
Multiple input sources can be configured simultaneously:
input {
file {
path => "/var/log/apache/access.log"
type => "apache"
}
beats {
port => 5044
type => "beats"
}
syslog {
port => 514
type => "syslog"
}
}
Mastering Data Processing with Filters
Essential Filter Plugins
Grok Pattern Matching
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
This Grok filter parses unstructured log data into structured fields using pattern matching. The %{COMBINEDAPACHELOG}
is a built-in pattern that extracts common Apache log fields like client IP, request method, URI path, response code, bytes transferred, referrer, and user agent. When applied, it transforms a single text line into multiple queryable fields that can be independently analyzed.
Date Processing
filter {
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
}
The date filter converts string timestamp fields into Logstash’s internal datetime format. In this example, it processes a timestamp field formatted like “10/May/2023:13:55:36 -0700”, parses it according to the specified pattern, and stores the result in the @timestamp
field. This enables proper time-based indexing and querying in Elasticsearch, allowing for chronological analysis of events.
Mutate Operations
filter {
mutate {
add_field => { "environment" => "production" }
remove_field => [ "unwanted_field" ]
convert => { "response" => "integer" }
}
}
The mutate filter modifies fields in the event. This configuration:
- Creates a new field
environment
with value “production”, providing context for all processed events - Removes the
unwanted_field
field, reducing event size and removing unnecessary data - Converts the
response
field from string to integer format, enabling numeric operations like averaging or comparison operations
Practical Filter Configurations
Apache Log Processing Example:
filter {
if [type] == "apache" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
}
}
This configuration processes Apache logs through multiple filters within a conditional block:
- The
if
condition ensures these filters only process events withtype
field equal to “apache” - The grok filter extracts structured fields from the Apache log format
- The date filter parses the timestamp field and converts it to Logstash’s datetime format
- The geoip filter enriches the event by adding geographical information (country, city, coordinates) based on the client IP address, enabling location-based analytics and visualizations
Advanced Filtering Techniques
Conditional Processing:
filter {
if [status] =~ /^5\d\d$/ {
mutate {
add_tag => ["error"]
}
}
}
This configuration demonstrates conditional logic in filters:
- The regular expression
^5\d\d$
matches any HTTP status code in the 500 range (server errors) - When a 5xx status code is detected, the event gets tagged with “error”
- Tags make it easy to search for error events and can trigger special handling in outputs
- This approach enables automated error detection and alerting based on status codes
Multi-filter Chains:
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
mutate {
convert => { "bytes" => "integer" }
add_field => { "processed_by" => "logstash_processor_1" }
remove_field => [ "message" ]
}
if [status] == "404" {
aggregate {
task_id => "%{clientip}"
code => "
map['not_found_count'] ||= 0
map['not_found_count'] += 1
map['paths'] ||= []
map['paths'] << event.get('request')
event.set('not_found_count', map['not_found_count'])
"
push_map_as_event_on_timeout => true
timeout => 120
timeout_tags => ['not_found_report']
}
}
}
This example shows how multiple filters can be chained together to process events sequentially:
- First, grok extracts structured fields from the log message, parsing Apache logs into fields like clientip, status, request, etc.
- Then, date normalizes timestamp information, converting the timestamp string to a proper datetime object
- Next, mutate transforms the data by:
- Converting the “bytes” field to integer type for proper numeric processing
- Adding a new field to track which processor handled the event
- Removing the original raw message to save storage space
- Finally, a conditional aggregate filter applies to 404 status codes, collecting statistics about missing resources by:
- Tracking how many 404 errors come from each client IP
- Building a list of requested paths that returned 404 errors
- Creating summary events after 120 seconds of inactivity
Filters are executed in the order specified, with each filter potentially using fields created by previous filters. This sequential processing allows for complex data transformation pipelines.
Configuring Outputs
Elasticsearch Output Configuration
Basic setup:
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-%{+YYYY.MM.dd}"
document_type => "_doc"
template_overwrite => true
}
}
This configuration establishes a connection between Logstash and Elasticsearch:
hosts
specifies the Elasticsearch server address (localhost:9200 in this case)index
defines a date-based index pattern that creates a new index daily, facilitating efficient data retention and searchingdocument_type
sets the Elasticsearch document type to “_doc” (recommended in modern Elasticsearch)template_overwrite
ensures that Logstash’s index template always replaces any existing template with the same name, maintaining consistent mappings
Output Best Practices
Error Handling
output {
elasticsearch {
hosts => ["localhost:9200"]
retry_on_conflict => 5
action => "index"
timeout => 120
}
}
This configuration implements robust error handling for Elasticsearch output:
retry_on_conflict
attempts to resolve document update conflicts up to 5 times before failingaction
explicitly sets the operation to “index” (alternatives include create, update, or delete)timeout
extends the connection timeout to 120 seconds, preventing premature failures during network issues or when Elasticsearch is under heavy load- These settings collectively ensure data isn’t lost during temporary Elasticsearch unavailability
Real-World Processing Examples
Web Server Log Analysis
Complete pipeline configuration:
input {
file {
path => "/var/log/apache/access.log"
type => "apache"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "user_agent"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "apache-logs-%{+YYYY.MM.dd}"
}
}
This end-to-end pipeline demonstrates a complete Apache log processing workflow:
- The input section monitors the Apache access log file
- The filter section:
- Parses raw logs into structured fields using the grok filter
- Normalizes timestamps with the date filter
- Enriches events with geographical data based on client IPs
- Extracts detailed user agent information (browser, OS, device) into a nested structure
- The output section sends processed data to Elasticsearch with daily indices This pipeline enables comprehensive web traffic analysis, including visitor locations, browser statistics, and temporal access patterns.
Application Metrics Processing
Metrics collection setup:
input {
beats {
port => 5044
type => "metrics"
}
}
filter {
if [type] == "metrics" {
json {
source => "message"
}
mutate {
add_field => {
"environment" => "production"
"app_version" => "1.2.3"
}
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "metrics-%{+YYYY.MM}"
}
}
This pipeline processes application metrics data:
- The input collects metrics from Beats agents (such as Metricbeat) on port 5044
- The filter section:
- Uses a conditional to process only events with type “metrics”
- Parses JSON-formatted metric data with the json filter
- Adds contextual information (environment and app version) with the mutate filter
- The output sends data to monthly Elasticsearch indices for efficient long-term metrics storage This configuration enables application performance monitoring with rich metadata for filtering and aggregation, while using monthly indices to balance search performance with index management complexity.
Performance Optimization and Best Practices
Configuration Optimization
Worker Threads
pipeline.workers: 2
pipeline.batch.size: 125
pipeline.batch.delay: 50
This configuration tunes Logstash’s concurrency settings:
pipeline.workers: 2
sets the number of parallel worker threads to 2, balancing throughput with resource consumptionpipeline.batch.size: 125
processes 125 events per batch, optimizing memory usage and processing efficiencypipeline.batch.delay: 50
adds a 50ms delay between batches when the pipeline is idle, reducing CPU usage These settings should be adjusted based on server resources and event volume – more workers benefit multi-core systems, while batch size affects memory usage and processing latency.
Code Examples Appendix
Input Configurations
input {
file {
path => "/var/log/*.log"
exclude => "*.gz"
sincedb_path => "/var/lib/logstash/sincedb"
start_position => "beginning"
}
}
This file input configuration demonstrates advanced file monitoring options:
path
uses wildcards to monitor all .log files in /var/logexclude
prevents processing of compressed (.gz) filessincedb_path
specifies where Logstash tracks file positions between restartsstart_position
tells Logstash to read files from the beginning when first discovered These settings create a reliable log ingestion process that doesn’t miss data after restarts and focuses only on relevant log files.
Filter Patterns
filter {
grok {
patterns_dir => ["/path/to/patterns"]
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
}
}
This grok filter configuration shows how to use custom pattern files:
patterns_dir
points to a directory containing custom grok pattern definitions- The match pattern extracts three fields:
timestamp
using the built-in ISO8601 patternlevel
using the standard log level pattern (INFO, ERROR, etc.)message
capturing the remaining content Custom pattern directories are useful for organization-specific log formats, enabling reusable pattern definitions across multiple pipelines.
Complete Pipeline Examples
Full Log Processing Pipeline
input {
file {
path => "/var/log/application/*.log"
type => "application_logs"
}
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{WORD:service}\] %{LOGLEVEL:level}: %{GREEDYDATA:message}" }
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
mutate {
remove_field => [ "timestamp" ]
add_field => { "environment" => "production" }
}
if [level] == "ERROR" {
mutate {
add_tag => [ "error" ]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "application-logs-%{+YYYY.MM.dd}"
manage_template => true
template_overwrite => true
}
if "error" in [tags] {
email {
to => "alerts@example.com"
subject => "Application Error Detected"
body => "Error message: %{message}"
}
}
}
This comprehensive pipeline demonstrates enterprise-grade log processing:
- Input stage collects application logs from specified directory
- Filter stage:
- Parses log structure with grok, extracting timestamp, service name, log level, and message
- Normalizes timestamps to standard Elasticsearch format
- Removes the original timestamp field to avoid duplication
- Adds environment context
- Tags error messages for special handling
- Output stage:
- Sends all logs to Elasticsearch with daily indices
- Additionally routes error events to an email notification system This dual-output approach enables both comprehensive log archiving and real-time alerting, creating an effective monitoring system.
Metrics Collection Pipeline
input {
beats {
port => 5044
type => "system_metrics"
}
}
filter {
if [type] == "system_metrics" {
json {
source => "message"
}
mutate {
convert => {
"cpu_usage" => "float"
"memory_used" => "integer"
}
}
date {
match => [ "collected_at", "UNIX_MS" ]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "metrics-%{+YYYY.MM}"
document_type => "_doc"
}
}
This metrics processing pipeline handles system performance data:
- Input stage receives metrics data from Beats agents
- Filter stage:
- Processes only system metrics events
- Parses the JSON-formatted metrics data
- Converts numeric fields to appropriate data types (float for CPU, integer for memory)
- Transforms Unix epoch timestamps to standard format
- Output stage stores metrics in monthly Elasticsearch indices The type conversion is particularly important for metrics data, as it ensures that Elasticsearch creates appropriate field mappings for numeric analysis, enabling mathematical operations like averages, percentiles, and trend analysis.
Conclusion
Logstash provides a robust platform for log processing and analysis, with its flexible pipeline architecture and rich plugin ecosystem. Key takeaways include:
- Use appropriate input plugins for your data sources
- Leverage filters for effective data transformation
- Configure outputs with proper error handling
- Optimize performance through careful configuration