Earlier this year I talked a lot about behavior chains and how someone would go about implementing theses in Splunk. In my last post I also talked about a need to know the capabilities of your tools so that you can take full advantage of them. I wanted to do something a little different with this post. I know that many of you are using an ELK stack today for hunting or daily ops and it’s one of the areas where I lack experience. I decided to dig into ELK and see if it was possible to surface events of interest and begin to chain them together. In this post I’m going to document what I’ve accomplished so far in the hopes that it may spur some ideas.
Before we begin, here’s an overly simplistic view of how I went about chaining behaviors in Splunk:
- Write queries for all actions that an attacker may take (regardless of volume that’s produced).
- Schedule queries to be ran routinely.
- Log all results of queries to separate index.
- Query separate index for multiple events by user/src/dest within specified timeframe.
Getting started, I first went about configuring Elasticsearch to consume logs being sent from Winlogbeat. I applied the same log sources from the same virtual machines. Once my logs were flowing I began to experiment with Kibana and the query language. I was able to query for many of the behaviors that I was able to in Splunk. The exceptions would be those that are correlating multiple events by time. Here would be an example of what I would like to accomplish with ELK:
sourcetype=wineventlog:security EventCode=5145 Object_Type=File Share_Name=*$ (Access_Mask=0x100180 OR Access_Mask=0x80 OR Access_Mask=0x130197) |bucket span=1s _time |rex "(?<thingtype>(0x100180|0x80|0x130197))" |stats values(Relative_Target_Name) AS Relative_Target_Name, values(Account_Name) AS Account_Name, values(Source_Address) AS Source_Address, dc(thingtype) AS distinct_things by ComputerName, _time |search distinct_things=3
The above query will try to identify 3 logs with event id's 5145 and an access mask of either 0x100180, 0x80 or 0x130197 and all generated within the same second. This would be indicative of a file being copied to a $ share from a command prompt. Unfortunately, after banging my head against a wall for an entire weekend, I have not found a way to do this with Kibana or Elasticsearch.
Realizing that Kibana and Elasticsearch probably wouldn't get me to where I wanted to be I decided to see what I could do with Logstash (I was putting this off simply because I didn’t have it installed). My goal was to still be able to group events and start chaining them together. I found that Logstash has a cool feature to help with this. The ability to tag events with unique identifiers. My thought for this was to use the tagging method as a replacement for the separate index. By tagging events that I’m interested in I can start grouping them by time. I would also have an easy method to dive directly into logs of interest as they would include that tag that I could pivot on. To begin doing this I first needed to define the things that I’m interested in so I created a file with the below regular expressions that can be used in Grok filters.
# Event Codes
PROCSTART (\s*(4688)\s*)
EXPLICITLOGIN (\s*(4648)\s*)
FILESHARE (\s*(5145)\s*)
SHAREPATH (\s*(5140)\s*)
# Access masks found in various windows security event logs
ACCESSMASK12019F (\s*(0x12019f)\s*)
ACCESSMASK1 (\s*(0x1)\s*)
# Specific expressions related to lateral movement
SUSPICIOUSPROCESSNAME (\\net\.exe|\\NET\.EXE|\\net1\.exe|\\NET1\.EXE|\\wmic|\\WMIC|\\powershell|\\POWERSHELL|\\PowerShell|\\cmd\.exe|\\CMD\.EXE|\\tasklist|\\TASKLIST|\\ping|\\PING)
SHARENAME (\\\\C$|\\\\c$|\\\\ADMIN$|\\\\admin$|\\\\IPC$|\\\\ipc$)
RELATIVETARGETNAME (srvsvc)
SYSMONCLISHARE ((CommandLine)[:]{1}.*?(c\$|C\$|admin\$|ADMIN\$))
SYSMONCLIIPV4 ((CommandLine)[:]{1}.*?(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9]))
# Suspicious Executions
PSWEBREQUEST ((CommandLine)[:]{1}.*?(://))
WMICEXEC ((CommandLine)[:]{1}.*?(wmic|WMIC).*?i(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9]).*?(process)\s*(call)\s*(create))
Making use of Grok filters, I simply want to match patterns in logs.
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
if [type] == "wineventlog" {
grok {
patterns_dir => [ "/etc/logstash/patterns" ]
match => { "message" => "%{SUSPICIOUSPROCESSNAME:SPNAME}" }
add_tag => ["Correlation_Suspicious_Process_Start"]
}
}
}
filter {
if [type] == "wineventlog" {
grok {
patterns_dir => [ "/etc/logstash/patterns" ]
match => { "event_id" => "%{FILESHARE:ECODE}" }
match => { "message" => "%{RELATIVETARGETNAME:RTN}" }
match => { "message" => "%{ACCESSMASK12019F:AM}" }
match => { "message" => "${SHARENAME:SHARE}" }
add_tag => ["Correlation_Share_Enumeration"]
}
}
}
filter {
if [type] == "wineventlog" {
grok {
patterns_dir => [ "/etc/logstash/patterns" ]
match => { "message" => "%{SYSMONCLIIPV4:CLI}" }
add_tag => ["Correlation_IP_Sysmon_CLI"]
}
}
}
filter {
if [type] == "wineventlog" {
grok {
patterns_dir => [ "/etc/logstash/patterns" ]
match => { "message" => "%{SYSMONCLISHARE:CLISHARE}" }
add_tag => ["Correlation_Sysmon_Dollar_Share_CLI"]
}
}
}
filter {
if [type] == "wineventlog" {
grok {
patterns_dir => [ "/etc/logstash/patterns" ]
match => { "message" => "%{PSWEBREQUEST:CLIWEB}" }
add_tag => ["Correlation_HTTP_in_PS_Command"]
}
}
}
filter {
if "beats_input_codec_plain_applied" in [tags] {
mutate {
remove_tag => ["beats_input_codec_plain_applied"]
}
}
}
filter {
if "_grokparsefailure" in [tags] {
mutate {
remove_tag => ["_grokparsefailure"]
}
}
}
output {
elasticsearch { hosts => ["192.168.56.50:9200"] }
stdout { codec => rubydebug }
}
When a pattern is found I will then tag the log with Correlation_(meaningful name). I can then focus attention on logs that are tagged with Correlation.
Using the commands from my last few blog posts as an example, we can see what the output would look like.
- net use z: \\192.168.56.10\c$ /user:admin1
- copy bad.bat z:\temp\bad.bat
- wmic /node:”192.168.56.10” /user:”pwned.com\admin1” /password:”Th1sisa-Test” process call create “cmd.exe /c c:\temp\bad.bat”
- copy z:\bad.temp .\bad.temp
And a little snippet of some logs that were captured by the above filters.
This is all still a work in progress and has not been tested other than in a lab. My main goal for this is to gain some knowledge around other logging solutions and if there are ways that I can improve what I am doing today. My next step is to begin piping this data into neo4j which I think has a lot of potential for finding unknown relationships in these logs of interest. If you have any thoughts, suggestions or recommendations, I would live to hear them.
Hey Jack, great article. Just to clarify, when you say "Log all results of queries to separate index." was this done via summary index with Splunk? By the way, your blog has really helped me organize my thoughts when approaching problems, specifically the mind maps. Thanks for the great work!
ReplyDeleteThanks for the comment, cabezon. Yes, it would be a summary index. I discussed it in more detail in this post. http://findingbad.blogspot.com/2017/02/hunting-for-chains.html
DeleteThanks for sharing. After three months, can you update where you are now please ? It seems that you don't need to use a correlation engine like Drools or Esper or the one included in x-pack - can you tell more on that ?
ReplyDeleteThe grok feature seems similar to Splunk's EventType ?
ReplyDeletehttps://answers.splunk.com/answers/345928/what-is-the-basic-difference-between-tags-and-even.html
Tối ưu elasticsearch như thế nào
ReplyDeleteelasticsearch tunning
This comment has been removed by a blog administrator.
ReplyDeleteThis comment has been removed by a blog administrator.
ReplyDelete