This is only about the setup of different logging, one being done with Filebeat and the other being done with sending logging to a dedicated port opened in Logstash using the TCP / UDP Inputs.
Prerequesites:
You'll need a working Elasticsearch Cluster with Logstash and Kibana.
Start by getting the Log Data you want to structure parsed correctly.
Mikrotik Logs are a bit difficult since they show you Data in the interface which is already enriched with Time / Date. That means a message that the remote logging will send to Logstash will look like this:
You can check them in the grok debugger and create your own filters and mapping. The following is my example which might not fit your needs.
Here are some custom patterns I wrote for my pattern matching:
Your router will now send its syslog to logstash which will then be processed and stored into elasticsearch. To make the Data visible, however, kibana needs to have an Index pattern created. You can do this from the web interface itself by doing the following
--> Click on 'Management'
--> Click on 'Index Patterns' in the Kibana section
--> Click on the 'Create Index Pattern' button in the top left corner and type in the index name. In my case, it's 'mikrotik-log*'
--> Click on 'Next Step', select the Time Filter field name and click on 'Create Index pattern'
Your data is now indexed in elasticsearch and visible in kibana.
If there are any _grokparsefailure, you'll need to check your message pattern with the grok debugger which can be found in the Dev Tools.
Feel free to comment and / or suggest a topic.
Prerequesites:
You'll need a working Elasticsearch Cluster with Logstash and Kibana.
Start by getting the Log Data you want to structure parsed correctly.
Mikrotik Logs are a bit difficult since they show you Data in the interface which is already enriched with Time / Date. That means a message that the remote logging will send to Logstash will look like this:
firewall,info forward: in:lan out:wan, src-mac aa:bb:cc:dd:ee:ff, proto UDP, 172.31.100.154:57061->109.164.113.231:443, len 76
You can check them in the grok debugger and create your own filters and mapping. The following is my example which might not fit your needs.
Here are some custom patterns I wrote for my pattern matching:
MIKROTIK_DATE \b(?:jan(?:uary)?|feb(?:ruary)?|mar(?:ch)?|apr(?:il)?|may|jun(?:e)?|jul(?:y)?|aug(?:ust)?|sep(?:tember)?|oct(?:ober)?|nov(?:ember)?|dec(?:ember)?)\b\/(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
MIKROTIK_TIME (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
MIKROTIK_TOPIC wireless|info|ipsec|interface|error|dhcp|system|account|critical
MIKROTIK_FAILED_PROPOSAL failed\ to\ get\ valid\ proposal|failed\ to\ pre\-process\ ph1\ packet|phase1\ negotiation\ failed|no\ suitable\ proposal\ found
MIKROTIK_PEER_NOT_COMPLIANT Unity\ mode\ config\ request\ but\ the\ peer\ did\ not\ declare\ itself\ as\ \ unity\ compliant
MIKROTIK_PACKET_RETRANSMISSION packet\ is\ retransmitted
MIKROTIK_TRAFFIC_FLOW traffic\ flow\ target\ removed
MIKROTIK_ACQUIRED_IP assigned|acquired
MIKROTIK_RELEASED_IP released|releasing address|deassigned
MIKROTIK_DISCO_REASON extensive\ data\ loss|group\ key\ exchange\ timeout|received\ deauth:\ unspecified\ \(1\)|received\ disassoc:\ sending station\ leaving\ \(8\)|ok|received\ deauth:\ sending\ station\ leaving\ \(3\)
MIKROTIK_WIFI_STATE reassociating
Now that we have the patterns, let's create the pipeline and define the fields to populate.
[archy@elk01 ~]$ cat /etc/logstash/conf.d/mikrotik-log.conf
# Input will be the tcp port specified, mikrotik config will be shown later.
input {
tcp {
port => 5514
tags => ["mikrotik-log"]
}
udp {
port => 5514
tags => ["mikrotik-log"]
}
}
# the tag mikrotik-log is added by the input
filter {
if "mikrotik-log" in [tags] {
grok {
id => "mikrotik-log-pipeline"
patterns_dir => "/etc/logstash/custom-patterns/"
tag_on_failure => "_grokparsefailure_mikrotik_log"
match => [
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:item} %{DATA:action} by %{DATA:user}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) user %{DATA:user} %{GREEDYDATA:action} from %{IP:host} via %{DATA:method}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} for user %{DATA:user} from %{IP:host} via %{DATA:method}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} for user: %{DATA:user}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} \(Identity Protection\): %{IP:local_address}%{GREEDYDATA}%{IP:remote_address}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action}%{IP:local_address}\[%{GREEDYDATA}\]-%{IP:remote_address}\[%{GREEDYDATA}\] spi:%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_ACQUIRED_IP:action} %{IP:acquired_ip} address for %{IP:remote_address}\[%{GREEDYDATA}\]$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action}%{IP:local_address}\[%{GREEDYDATA}\]<=>%{IP:remote_address}\[%{GREEDYDATA}\] spi=%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} %{IP:released_ip} $",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_FAILED_PROPOSAL:action}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{IP:remote_address} %{MIKROTIK_FAILED_PROPOSAL:action}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_PEER_NOT_COMPLIANT:action}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) the %{MIKROTIK_PACKET_RETRANSMISSION:action} by %{IP:remote_address}\[%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:interface} link %{GREEDYDATA:link_state}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_TRAFFIC_FLOW:action} by %{DATA:user}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:address_pool} %{MIKROTIK_ACQUIRED_IP:action} %{IP:acquired_ip} to %{DATA:mac_address}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:address_pool} %{MIKROTIK_RELEASED_IP:action} %{IP:released_ip} from %{DATA:mac_address} $",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{DATA:action}, signal strength %{INT:signal_strength}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{DATA:action}, %{MIKROTIK_DISCO_REASON:disconnect_reason}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{MIKROTIK_WIFI_STATE:wifi_state} $",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:chain}: in:%{DATA:in_interface} out:%{GREEDYDATA:out_interface}, src-mac %{DATA:mac_address}, proto %{DATA:protocol}, %{IP:local_address}:%{INT:src_port}->%{IP:remote_address}:%{INT:dst_port}, len %{INT:length}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:chain}: in:%{DATA:in_interface} out:%{GREEDYDATA:out_interface}, src-mac %{DATA:mac_address}, proto %{DATA:protocol} \(%GREEDYDATA}\), %{IP:local_address}->%{IP:remote_address}, len %{INT:length}%{GREEDYDATA}$"
]
}
if "_grokparsefailure_mikrotik_log" not in [tags] {
mutate {
remove_field => ["message"]
}
}
}
}
# output to all elasticsearch hosts
output {
if "mikrotik-log" in [tags] {
elasticsearch {
id => "mikrotik-log-output"
hosts => ["http://elk01.archyslife.lan:11920", "http://elk02.archyslife.lan:11920", "http://elk03.archyslife.lan:11920", "http://elk04.archyslife.lan:11920", "http://elk05.archyslife.lan:11920"]
index => "mikrotik-log-%{+YYYY.MM.ww}"
}
}
}
Next up, add the following lines to your pipelines.yml
- pipeline.id: mikrotik-log
path.config: "/etc/logstash/conf.d/mikrotik-log.conf"
With Logstash being configured and ready to receive logs, it's time to write the Index Template for the Index we are using. This is my template I defined, again, this might differ for what you need.
This configuration has to be done through the Kibana Interface using the DevTools or using Curl with 'XPUT' against the elasticsearch api.
This configuration has to be done through the Kibana Interface using the DevTools or using Curl with 'XPUT' against the elasticsearch api.
PUT _template/mikrotik-log
{
"index_patterns" : [
"mikrotik-log-*"
],
"settings" : {
"index" : {
"codec" : "best_compression",
"refresh_interval" : "5s",
"number_of_shards" : "1",
"number_of_replicas" : "1"
}
},
"mappings" : {
"doc" : {
"numeric_detection" : true,
"dynamic_templates" : [
{
"string_fields" : {
"mapping" : {
"type" : "keyword"
},
"match_mapping_type" : "string",
"match" : "*"
}
}
],
"properties" : {
"@version" : {
"type" : "keyword"
},
"@timestamp" : {
"type" : "date"
},
"date" : {
"type" : "keyword"
},
"time" : {
"type" : "keyword"
},
"topic1" : {
"type" : "keyword"
},
"topic2" : {
"type" : "keyword"
},
"topic3" : {
"type" : "keyword"
},
"item" : {
"type" : "text"
},
"action" : {
"type" : "text"
},
"user" : {
"type" : "keyword"
},
"host" : {
"type" : "keyword"
},
"method" : {
"type" : "keyword"
},
"local_address" : {
"type" : "ip"
},
"remote_address" : {
"type" : "ip"
},
"acquired_ip" : {
"type" : "ip"
},
"released_ip" : {
"type" : "ip"
},
"address_pool" : {
"type" : "keyword"
},
"mac_address" : {
"type" : "keyword"
},
"interface" : {
"type" : "keyword"
},
"link_state" : {
"type" : "keyword"
},
"signal_strength" : {
"type" : "byte"
},
"disconnect_reason" : {
"type" : "text"
},
"chain" : {
"type" : "keyword"
},
"in_interface" : {
"type" : "keyword"
},
"out_interface" : {
"type" : "keyword"
},
"protocol" : {
"type" : "keyword"
},
"src_port" : {
"type" : "keyword"
},
"dst_port" : {
"type" : "keyword"
},
"length" : {
"type" : "short"
},
"ap_ssid" : {
"type" : "keyword"
},
"wifi_state" : {
"type" : "keyword"
}
}
}
},
"aliases" : { }
}
Elasticsearch is now set up and will index each field accordingly. Now we'll have to configure the mikrotik router to send the data to logstash. This can be done easiest through the cli: [archy@MikroTik] > system logging action add name=logstash target=remote remote=172.31.0.12 remote-port=5514 src-address=172.31.0.254 bsd-syslog=no syslog-time-format=bsd-syslog syslog-facility=daemon syslog-severity=auto
[archy@MikroTik] > system logging edit 0 action
logstash
[archy@MikroTik] > system logging edit 1 action
logstash
[archy@MikroTik] > system logging edit 2 action
logstash
[archy@MikroTik] > system logging edit 3 action
logstash
If you haven't done this yet, enable logging on your drop rules. The IDs will most likely be different so adjust these before running: [archy@MikroTik] > ip firewall filter edit 8 log
yes
[archy@MikroTik] > ip firewall filter edit 65 log
yes
Your router will now send its syslog to logstash which will then be processed and stored into elasticsearch. To make the Data visible, however, kibana needs to have an Index pattern created. You can do this from the web interface itself by doing the following
--> Click on 'Management'
--> Click on 'Index Patterns' in the Kibana section
--> Click on the 'Create Index Pattern' button in the top left corner and type in the index name. In my case, it's 'mikrotik-log*'
--> Click on 'Next Step', select the Time Filter field name and click on 'Create Index pattern'
Your data is now indexed in elasticsearch and visible in kibana.
If there are any _grokparsefailure, you'll need to check your message pattern with the grok debugger which can be found in the Dev Tools.
Feel free to comment and / or suggest a topic.
Hello, I tried to use Your config in my lab, unfortunately without good result.
ReplyDeleteWhen I try to add an Index Template, im recieving error 400.
{
"error" : {
"root_cause" : [
{
"type" : "mapper_parsing_exception",
"reason" : "Root mapping definition has unsupported parameters: [doc : {numeric_detection=true, dynamic_templates=[{string_fields={mapping={type=keyword}, match_mapping_type=string, match=*}}], properties={date={type=keyword}, server_name={type=keyword}, @timestamp={type=date}, method={type=keyword}, @version={type=keyword}, time={type=keyword}, http_return_code={type=keyword}, ClientIP={type=keyword}, http-version={type=text}, request_uri={type=text}, username={type=keyword}}}]"
}
],
"type" : "mapper_parsing_exception",
"reason" : "Failed to parse mapping [_doc]: Root mapping definition has unsupported parameters: [doc : {numeric_detection=true, dynamic_templates=[{string_fields={mapping={type=keyword}, match_mapping_type=string, match=*}}], properties={date={type=keyword}, server_name={type=keyword}, @timestamp={type=date}, method={type=keyword}, @version={type=keyword}, time={type=keyword}, http_return_code={type=keyword}, ClientIP={type=keyword}, http-version={type=text}, request_uri={type=text}, username={type=keyword}}}]",
"caused_by" : {
"type" : "mapper_parsing_exception",
"reason" : "Root mapping definition has unsupported parameters: [doc : {numeric_detection=true, dynamic_templates=[{string_fields={mapping={type=keyword}, match_mapping_type=string, match=*}}], properties={date={type=keyword}, server_name={type=keyword}, @timestamp={type=date}, method={type=keyword}, @version={type=keyword}, time={type=keyword}, http_return_code={type=keyword}, ClientIP={type=keyword}, http-version={type=text}, request_uri={type=text}, username={type=keyword}}}]"
}
},
"status" : 400
I found that in Elasticsearch 7 using mapping types is not possible.
Is there any chance to have walkaround for this problem?
It's most likely the 'doc:' that's causing issues here.
DeleteI've upgraded my installation from 6->7 but this is the template I currently use in my cluster: https://pastebin.com/raw/y5URbbsq
Has same error with michal. Your link at https://pastebin.com/raw/y5URbbsq not found.
ReplyDeleteThank you
Yes that's because the pastebin was limited in time.
DeleteHere's one that should not expire:
https://pastebin.com/zQBpX8MX
Thank you Archy
DeleteHello Adrian,
ReplyDeleteTrying follow your documentation , but I can't create index pattern. Using ELK 7.12.
Any suggestion ?
Thankyou
Andre
Hi Andre,
Deletemake sure the router can reach logstash on the specified port, logstash can reach the elasticsearch api on the node and the data is properly pushed into the index.
Also note that with 7.12 creating index patterns can be done in 'Management' --> 'Stack Management' --> 'Kibana' --> 'Index Patterns'
I recreate the Template on Kibana 8.1
ReplyDelete{
"template": {
"settings": {
"index": {
"codec": "best_compression",
"routing": {
"allocation": {
"include": {
"_tier_preference": "data_content"
}
}
},
"refresh_interval": "5s",
"number_of_shards": "1",
"number_of_replicas": "1"
}
},
"mappings": {
"dynamic": "true",
"dynamic_templates": [
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"date_detection": false,
"numeric_detection": true,
"properties": {
"@timestamp": {
"type": "date"
},
"acquired_ip": {
"type": "ip"
},
"action": {
"type": "text"
},
"address_pool": {
"type": "keyword"
},
"ap_ssid": {
"type": "keyword"
},
"chain": {
"type": "keyword"
},
"date": {
"type": "keyword"
},
"disconnect_reason": {
"type": "text"
},
"dst_port": {
"type": "keyword"
},
"host": {
"type": "keyword"
},
"in_interface": {
"type": "keyword"
},
"interface": {
"type": "keyword"
},
"item": {
"type": "text"
},
"length": {
"type": "short"
},
"link_state": {
"type": "keyword"
},
"local_address": {
"type": "ip"
},
"mac_address": {
"type": "keyword"
},
"method": {
"type": "keyword"
},
"out_interface": {
"type": "keyword"
},
"protocol": {
"type": "keyword"
},
"released_ip": {
"type": "ip"
},
"remote_address": {
"type": "ip"
},
"signal_strength": {
"type": "byte"
},
"src_port": {
"type": "keyword"
},
"time": {
"type": "keyword"
},
"topic1": {
"type": "keyword"
},
"topic2": {
"type": "keyword"
},
"topic3": {
"type": "keyword"
},
"user": {
"type": "keyword"
},
"wifi_state": {
"type": "keyword"
}
}
},
"aliases": {}
}
}
hello, i succesful to create index pattern, but when i go to discover tab it show that "No results match your search criteria" can anyone help me with that problem thx
ReplyDelete