This guide provides optimization techniques, performance tips, and proven patterns for developing efficient and effective rules and filters in UTMStack v11.
Developer Reference : Apply these practices to create maintainable, performant, and reliable security detection logic.
Rule Development Best Practices
1. Start Simple and Iterate
Begin with basic conditions that identify the threat
Test with known-good and known-bad examples
Add complexity incrementally
Validate each addition before proceeding
Document reasoning for complex logic
Example : Progressive Rule Development
# Step 1: Basic detection
where : actionResult == "failure"
# Step 2: Add context
where : actionResult == "failure" && action == "login"
# Step 3: Add threshold
where : actionResult == "failure" && action == "login"
afterEvents :
- count : 5
within : now-1h
# Step 4: Refine with additional conditions
where : has(origin.ip) && actionResult == "failure" && action == "login" && !(origin.user.startsWith("test_"))
afterEvents :
- count : 5
within : now-1h
2. Be Specific with Data Types
Only include relevant data types
Avoid using too many data types in one rule
Create separate rules for different data sources when logic differs
# ❌ Too broad
dataTypes :
- windows
- linux
- macos
- apache
- nginx
# ✅ Specific
dataTypes :
- windows
- linux
3. Handle Missing Data Gracefully
Always check field existence with has() or exists()
Use safe() with sensible defaults
Test with incomplete events
Consider what happens when fields are missing
# ❌ Unsafe - will fail if field missing
where : origin.port > 1024
# ✅ Safe with default
where : safe(origin.port, 0) > 1024
# ✅ Check existence first
where : has(origin.port) && origin.port > 1024
Limit Data Scope
Narrow dataTypes selection
Use specific index patterns
Reduce time windows when possible
Efficient Expressions
Put cheapest checks first
Avoid complex string operations
Cache repeated calculations
Smart Correlation
Limit afterEvents searches
Use reasonable count thresholds
Set appropriate time windows
Proper Deduplication
Include key identifying fields
Avoid time-based fields
Balance between noise and visibility
5. Implement Effective Deduplication
# ✅ Good deduplication - identifies unique threats
deduplicateBy :
- adversary.ip
- target.ip
- attack.type
# ❌ Poor deduplication - too specific, won't dedupe enough
deduplicateBy :
- adversary.ip
- target.ip
- deviceTime
- log.message
# ❌ Poor deduplication - too broad, might miss threats
deduplicateBy :
- adversary.ip
6. Write Clear Descriptions
Include in Description :
What the rule detects
Why it’s important
What action should be taken
Known limitations or false positive scenarios
Related rules or techniques
Add References :
MITRE ATT&CK technique IDs
CVE numbers
Security advisories
Internal documentation
description : |
Detects multiple failed SSH authentication attempts from the same source IP
within a short time window, indicating a potential brute force attack.
This rule triggers when:
- 10+ failed login attempts occur within 1 hour
- Attempts come from the same source IP
- Target is an SSH service (port 22)
Known false positives:
- Users with forgotten passwords
- Automated monitoring tools
- Load balancers with health checks
Recommended action:
- Investigate source IP reputation
- Check for successful logins from same IP
- Consider implementing rate limiting
references :
- https://attack.mitre.org/techniques/T1110/001/
- https://www.cisecurity.org/controls/v8/
Filter Development Best Practices
1. Standardize Field Names
Use UTMStack Standard Fields :
origin.ip, origin.port, origin.user, origin.host
target.ip, target.port, target.user, target.host
deviceTime - Event timestamp
action - Normalized action (get, post, login, etc.)
actionResult - Result (success, failure, denied, accepted)
protocol - Network protocol
severity - Event severity
Benefits :
Consistent queries across data sources
Rules work with multiple data types
Dashboards work universally
Easier correlation
2. Parsing Strategy
# ✅ Good: Sequential, logical flow
steps :
# 1. Parse raw format
- json :
source : raw
# 2. Extract specific fields
- grok :
patterns :
- fieldName : origin.ip
pattern : '{{.ipv4}}'
source : log.message
# 3. Normalize and enrich
- rename :
from : [ log.src_ip ]
to : origin.ip
# 4. Type conversion
- cast :
fields : [ origin.port ]
to : int
# 5. Enrichment
- dynamic :
plugin : com.utmstack.geolocation
params :
source : origin.ip
destination : origin.geolocation
# 6. Cleanup
- delete :
fields : [ raw , log.message ]
3. Optimize Grok Patterns
Pattern Design :
Use specific patterns over generic ones
Order patterns from most to least specific
Test patterns with real data
Avoid greedy patterns when possible
Use built-in patterns
# ❌ Too greedy - captures everything
- fieldName : message
pattern : '{{.greedy}}'
# ✅ Specific patterns for each field
- fieldName : timestamp
pattern : '\[{{.data}}\]'
- fieldName : severity
pattern : '{{.word}}'
- fieldName : message
pattern : '{{.greedy}}' # Only for final field
4. Conditional Processing
# ✅ Use where clauses to optimize
steps :
# Only parse JSON if it exists
- json :
source : raw
where : raw.contains("{")
# Only add geolocation if IP exists
- dynamic :
plugin : com.utmstack.geolocation
params :
source : origin.ip
destination : origin.geolocation
where : exists(origin.ip) && origin.ip != "127.0.0.1"
# Only cast if field exists
- cast :
fields : [ statusCode ]
to : int
where : exists(statusCode)
5. Remove Unnecessary Fields Early
# ✅ Delete unnecessary fields as soon as possible
steps :
- json :
source : raw
# Delete raw immediately after parsing
- delete :
fields : [ raw ]
# Extract needed fields
- grok :
patterns : [ ... ]
source : log.message
# Delete intermediate field
- delete :
fields : [ log.message ]
where : exists(origin.ip)
6. Handle Edge Cases
Common Edge Cases :
Missing fields
Malformed data
Empty strings
Null values
Unexpected formats
Special characters
Encoding issues
# ✅ Robust parsing
steps :
# Parse with fallback
- json :
source : raw
where : raw != "" && raw.contains("{")
# Trim whitespace
- trim :
function : space
fields : [ origin.user , target.host ]
where : exists(origin.user)
# Provide defaults
- add :
function : string
params :
key : actionResult
value : "unknown"
where : !exists(actionResult)
Testing Strategies
Rule Testing
Unit Testing
Test individual conditions with sample events # Test events that should trigger
# Test events that should not trigger
# Test edge cases
Integration Testing
Deploy to development environment
Monitor for alerts
Verify alert content
Check performance impact
Load Testing
Test with realistic event volumes
Monitor CPU and memory usage
Check for bottlenecks
Verify deduplication works
False Positive Testing
Run against historical data
Identify false positives
Refine conditions
Add exclusions
Filter Testing
Sample Data Testing
Test with representative samples
Valid formatted data
Malformed data
Edge cases
Field Validation
Verify output fields
Check field names match standards
Verify data types are correct
Ensure required fields exist
Performance Testing
Measure processing time
Test with various event sizes
Monitor resource usage
Optimize slow steps
Rule Optimization Checklist
Filter Optimization Checklist
# Add temporary debugging
- add :
function : string
params :
key : debug.filterStart
value : '{{.timestamp}}'
# ... filter steps ...
- add :
function : string
params :
key : debug.filterEnd
value : '{{.timestamp}}'
Common Patterns and Anti-Patterns
Rules: Good Patterns ✅
# Pattern: Threshold-based detection
where : has(origin.ip)
afterEvents :
- indexPattern : v11-log-*
with :
- field : origin.ip.keyword
operator : filter_term
value : '{{origin.ip}}'
- field : actionResult.keyword
operator : filter_term
value : 'failure'
within : now-1h
count : 10
# Pattern: Time-based anomaly
where : |
has(deviceTime) &&
(time.getHours(deviceTime) < 6 || time.getHours(deviceTime) > 22) &&
actionResult == "success"
# Pattern: Geographic anomaly
where : |
has(origin.geolocation.country) &&
origin.geolocation.country in ["RU", "CN", "KP", "IR"] &&
action == "admin_login"
Rules: Anti-Patterns ❌
# ❌ No field existence check
where : origin.port > 1024
# ❌ Too broad data types
dataTypes :
- "*"
# ❌ Inefficient afterEvents
afterEvents :
- indexPattern : v11-log-*
within : now-30d # Too long
count : 1 # Too low
# ❌ Poor deduplication
deduplicateBy :
- timestamp # Changes every time
Filters: Good Patterns ✅
# Pattern: Progressive parsing
steps :
- json :
source : raw
- grok :
patterns : [ ... ]
source : log.message
where : exists(log.message)
- cast :
fields : [ ... ]
to : int
where : exists(field)
# Pattern: Conditional enrichment
- dynamic :
plugin : com.utmstack.geolocation
params :
source : origin.ip
destination : origin.geolocation
where : |
exists(origin.ip) &&
!origin.ip.startsWith("10.") &&
!origin.ip.startsWith("192.168.")
Filters: Anti-Patterns ❌
# ❌ No conditional checks
- cast :
fields : [ nonexistent_field ]
to : int
# ❌ Keeping unnecessary fields
- json :
source : raw
# raw is never deleted
# ❌ Inefficient grok patterns
- grok :
patterns :
- fieldName : everything
pattern : '{{.greedy}}'
Documentation Standards
Rule Documentation Template
- id : [ ID ]
dataTypes : [ ... ]
name : [ Clear , descriptive name ]
impact :
confidentiality : [ 0-5 ]
integrity : [ 0-5 ]
availability : [ 0-5 ]
category : [ Category ]
technique : [ Technique with MITRE ID ]
adversary : [ origin|target ]
references :
- [ MITRE ATT&CK URL ]
- [ Related CVE or advisory ]
description : |
[What it detects]
Triggers when:
- [Condition 1]
- [Condition 2]
Known false positives:
- [Scenario 1]
- [Scenario 2]
Recommended actions:
- [Action 1]
- [Action 2]
where : [ ... ]
afterEvents : [ ... ]
deduplicateBy : [ ... ]
Filter Documentation Template
pipeline :
- dataTypes :
- [ data_type ]
# Purpose: [What this filter does]
# Input format: [Expected raw format]
# Output fields: [Standard fields created]
# Dependencies: [Required plugins]
steps :
# Step 1: [Purpose]
- [ step_type ]:
[ params ]
# Step 2: [Purpose]
- [ step_type ]:
[ params ]