Custom Collector API Integration With Packetbeat and Logstash

Custom Collector API Integration With Packetbeat and Logstash

Some enterprises may already be using Logstash as part of their pipelines to centralize their logs. In such cases, you have the option to deploy custom collectors to seamlessly integrate your network infrastructure with Lumu while layering Continuous Compromise Assessment.

Lumu Custom Collector API allows sending network metadata captured from a third-party platform/service/appliance to Lumu for Continuous Compromise Assessment. It can also be used as an alternative for obtaining greater visibility in cases where the enterprise network restricts the use of Virtual Appliances. If you want more context on custom collectors, consult our documentation.

This article provides you with an example of the typical Logstash setup to collect DNS packets: using Packetbeat to send data from your Internal DNS servers to Logstash, and then using Logstash to transform and send the data to Lumu for Continuous Compromise Assessment.

Typical Packetbeat / Logstash Setup with Lumu API

Packetbeat configuration

Configure Packetbeat to include only fields that contain data that will be of interest when at the transformation stage in Logstash. It is recommended to ignore traffic started by the DNS server itself to avoid traffic duplication.

To configure, go to the Packetbeat installation folder, e.g. C:/Program Files/Packetbeat (for Windows) /usr/share/packetbeat (for Linux) and edit the file named packetbeat.yml.

The following is an example of settings for Lumu. Replace the interface, IPs, and ports with those corresponding to your environment.

You can run .\packetbeat devices (Windows) or packetbeat devices (Linux) to get the number of the interface that receives your DNS traffic, then add that number to the configuration file in the parameter packetbeat.interfaces.device.
  1. packetbeat.interfaces.device: eth0 ##the interface you want to capture traffic packetbeat.ignore_outgoing: true packetbeat.protocols: - type: dns ports: [53] output.logstash: hosts: ["logstash-server:5044"] ##The Logstash IP and port Processors: - drop_event: when: equals: client.ip: 192.168.0.11 ##Drop events form the DNS server - include_fields: fields: - client.ip - dns.id - dns.op_code - dns.response_code - dns.question.type - dns.question.name - dns.question.class - dns.flags.authoritative - dns.flags.recursion_available - dns.flags.truncated_response - dns.flags.checking_disabled - dns.flags.recursion_desired - dns.flags.authentic_data - dns.answers
For more details on the configuration settings, consult the Elastic official documentation.

Logstash Pipeline Configuration

The Logstash pipeline should be configured to receive from Packetbeat as JSON, and during the filtering phase, the data should be adjusted to fit the Lumu Custom Collection API specifications.

For more details on the Logstash configuration files, consult the Elastic official documentation.
  1. input { beats { type => "dns" port => 5044 codec => "json" } } filter { json { source => "message" } mutate { rename => {"@timestamp" => "timestamp"} add_field => {"id" => "%{[dns][id]}"} add_field => {"op_code" => "%{[dns][op_code]}"} add_field => {"response_code" => "%{[dns][response_code]}"} add_field => {"client_ip" => "%{[client][ip]}"} } mutate { convert => { "id" => "integer" } } if ([dns]) { ruby { code => ' event.get("dns").each { |k, v| if k == "answers" a = v.map { |x| x["ttl"] = x["ttl"].to_i x } event.set(k,a) else event.set(k,v) end } ' } } mutate { remove_field => ["tags", "@version", "type", "client", "dns"] } } output { http { format=>"json_batch" http_method=>"post" url=>"https://api.lumu.io/collectors/{collector_id}/dns/packets?key={lumu_client_key}" } }

Special cases

For Logstash versions older than v7.4

In some exceptional cases, the Packetbeat/Logstash pipeline is also used for internal pre-established purposes. Although Logstash supports multiple pipelines, Packetbeat does not support multiple outputs.

In this case, the recommended approach is to use an intermediary broker (such as Kafka)  to post the Packetbeat packets and then use such a broker as input for multiple Logstash pipelines, each with its own filtering and output settings.

For more details, please refer to the vendors' documentation. Additionally, we recommend the following external articles:

  1. Introducing Multiple Pipelines in Logstash
  1. Support multiple outputs of the same type (as two independent Logstash clusters)

For Logstash 7.4 and later

Starting with version 7.4, Logstash introduced Pipeline-to-Pipeline Communication, allowing multiple pipelines within the same Logstash instance. Now it is possible to isolate the execution of pipelines and break up the logic of pipelines.

For more details, please refer to the vendor’s documentation:

  1. Pipeline-to-Pipeline Communication - The forked path pattern

In this case, the Logstash pipelines configuration files would potentially look like this:

The examples below assume ‘internal’ as a simple pass-through pipeline, and the Lumu pipeline performs the appropriate transformation to the expected Lumu format. Notice that the Lumu pipeline may vary depending on the fields you decide to include in Packetbeat.
Click here to download the content of the three configuration files.

pipelines.yml

  1. - pipeline.id: main config.string: | input { beats { port => 5044 } } output { pipeline { send_to => ["internal-pipe", "lumu-pipe"] } } - pipeline.id: lumu path.config: "/usr/share/logstash/pipeline/lumu.cfg" - pipeline.id: internal path.config: "/usr/share/logstash/pipeline/internal.cfg"
internal.cfg
  1. input { pipeline { address => "internal-pipe" } } filter { json { source => "message" } } output { stdout { codec => rubydebug } }
lumu.cfg
  1. input { pipeline { address => "lumu-pipe" } } filter { json { source => "message" } mutate { rename => {"@timestamp" => "timestamp"} add_field => {"id" => "%{[dns][id]}"} add_field => {"op_code" => "%{[dns][op_code]}"} add_field => {"response_code" => "%{[dns][response_code]}"} add_field => {"client_ip" => "%{[client][ip]}"} } mutate { convert => { "id" => "integer" } } if ([dns]) { ruby { code => ' event.get("dns").each { |k, v| if k == "answers" a = v.map { |x| x["ttl"] = x["ttl"].to_i x } event.set(k,a) else event.set(k,v) end } ' } } mutate { remove_field => ["tags", "@version", "type", "client", "dns"] } } output { http { format=>"json_batch" http_method=>"post" url=>"https://api.lumu.io/collectors/{collector_id}/dns/packets?key={lumu_client_key}" } }

        • Related Articles

        • Custom Collector API Specifications

          The Custom Collector API is a seamless way to integrate your network infrastructure with Lumu while layering Continuous Compromise Assessment. It allows sending network metadata captured from third-party platforms/services/appliances to Lumu, and it ...
        • Custom Collector API

          Some enterprises may already be using defense solutions such as Endpoint Detection and Response (EDR) or network monitoring tools as part of their pipelines to centralize their logs. Lumu gives the option to deploy custom collectors to send your ...
        • Custom Collector API integration with Netskope Next Gen Secure Web Gateway (SWG)

          This article shows how to leverage Netskope Next Gen SWG Events API to collect and inject network metadata into Lumu. Figure 1 - Data collection setup from Netskope Events API to Lumu. The script polls data from Netskope’s page and application ...
        • Collect DNS Packets with Lumu VA and Packetbeat

          Requirements The latest Packetbeat version. You can download it from the official website. The latest Npcap version. We recommend downloading it from the official website. The most recent version of the Lumu Virtual Appliance installed. You can check ...
        • Lumu Virtual Appliance Metadata Collection with Logstash

          Some enterprises use the Elastic stack (ELK) to collect, index, and analyze logs from multiple devices. If yours is one of those organizations, deploy a Lumu Virtual Appliance and create collectors that will receive data directly from existing ...