Some enterprises may already be using Logstash as part of their pipelines to centralize their logs. In such cases, you have the option to deploy custom collectors to seamlessly integrate your network infrastructure with Lumu while layering Continuous Compromise Assessment.
This article provides you with an example of the typical Logstash setup to collect DNS packets: using Packetbeat to send data from your Internal DNS servers to Logstash, and then using Logstash to transform and send the data to Lumu for Continuous Compromise Assessment.
Configure Packetbeat to include only fields that contain data that will be of interest when at the transformation stage in Logstash. It is recommended to ignore traffic started by the DNS server itself to avoid traffic duplication.
To configure, go to the Packetbeat installation folder, e.g.
C:/Program Files/Packetbeat
(for Windows)
/usr/share/packetbeat
(for Linux) and edit the file named
packetbeat.yml.
The following is an example of settings for Lumu. Replace the interface, IPs, and ports with those corresponding to your environment.
packetbeat.interfaces.device: eth0 ##the interface you want to capture traffic packetbeat.ignore_outgoing: true packetbeat.protocols: - type: dns ports: [53] output.logstash: hosts: ["logstash-server:5044"] ##The Logstash IP and port Processors: - drop_event: when: equals: client.ip: 192.168.0.11 ##Drop events form the DNS server - include_fields: fields: - client.ip - dns.id - dns.op_code - dns.response_code - dns.question.type - dns.question.name - dns.question.class - dns.flags.authoritative - dns.flags.recursion_available - dns.flags.truncated_response - dns.flags.checking_disabled - dns.flags.recursion_desired - dns.flags.authentic_data - dns.answers
The Logstash pipeline should be configured to receive from Packetbeat as JSON, and during the filtering phase, the data should be adjusted to fit the Lumu Custom Collection API specifications.
input { beats { type => "dns" port => 5044 codec => "json" } } filter { json { source => "message" } mutate { rename => {"@timestamp" => "timestamp"} add_field => {"id" => "%{[dns][id]}"} add_field => {"op_code" => "%{[dns][op_code]}"} add_field => {"response_code" => "%{[dns][response_code]}"} add_field => {"client_ip" => "%{[client][ip]}"} } mutate { convert => { "id" => "integer" } } if ([dns]) { ruby { code => ' event.get("dns").each { |k, v| if k == "answers" a = v.map { |x| x["ttl"] = x["ttl"].to_i x } event.set(k,a) else event.set(k,v) end } ' } } mutate { remove_field => ["tags", "@version", "type", "client", "dns"] } } output { http { format=>"json_batch" http_method=>"post" url=>"
https://api.lumu.io/collectors/{collector_id}/dns/packets?key={lumu_client_key}" } }
For Logstash versions older than v7.4
In some exceptional cases, the Packetbeat/Logstash pipeline is also used for internal pre-established purposes. Although Logstash supports multiple pipelines, Packetbeat does not support multiple outputs.
In this case, the recommended approach is to use an intermediary broker (such as Kafka) to post the Packetbeat packets and then use such a broker as input for multiple Logstash pipelines, each with its own filtering and output settings.
For more details, please refer to the vendors' documentation. Additionally, we recommend the following external articles:
For Logstash 7.4 and later
Starting with version 7.4, Logstash introduced Pipeline-to-Pipeline Communication, allowing multiple pipelines within the same Logstash instance. Now it is possible to isolate the execution of pipelines and break up the logic of pipelines.
For more details, please refer to the vendor’s documentation:
In this case, the Logstash pipelines configuration files would potentially look like this:
pipelines.yml
- pipeline.id: main config.string: | input { beats { port => 5044 } } output { pipeline { send_to => ["internal-pipe", "lumu-pipe"] } } - pipeline.id: lumu path.config: "/usr/share/logstash/pipeline/lumu.cfg" - pipeline.id: internal path.config: "/usr/share/logstash/pipeline/internal.cfg"
input { pipeline { address => "internal-pipe" } } filter { json { source => "message" } } output { stdout { codec => rubydebug } }
input { pipeline { address => "lumu-pipe" } } filter { json { source => "message" } mutate { rename => {"@timestamp" => "timestamp"} add_field => {"id" => "%{[dns][id]}"} add_field => {"op_code" => "%{[dns][op_code]}"} add_field => {"response_code" => "%{[dns][response_code]}"} add_field => {"client_ip" => "%{[client][ip]}"} } mutate { convert => { "id" => "integer" } } if ([dns]) { ruby { code => ' event.get("dns").each { |k, v| if k == "answers" a = v.map { |x| x["ttl"] = x["ttl"].to_i x } event.set(k,a) else event.set(k,v) end } ' } } mutate { remove_field => ["tags", "@version", "type", "client", "dns"] } } output { http { format=>"json_batch" http_method=>"post" url=>"
https://api.lumu.io/collectors/{collector_id}/dns/packets?key={lumu_client_key}" } }