Data Transformation in Elastic Stack

Data Transformation in Elastic Stack

Data_Transformation_Elastic_Stack

Here in this article we will look at the different options we have to transform the source data which is required to be indexed in Elasticsearch.

Test Environment

Fedora 37 workstation
Elastic Stack

Most of the organisation follow this standard process in implementing the Elasticsearch. The primary purpose of Elasticsearch is for indexing the data so that it can be searched and analysed using the Kibana later. But before the source data is destined towards Elasticsearch, let’s say you want to transform the data or enrich the data. What Options we have to do it and at What different layers we can carry out this transformation. Let’s look into this in further details.

If you are interested in watching the video. Here is the YouTube video on the same step by step procedure outline below.

Procedure

Data Transformation @ Filebeat

Most of us use filebeat as log shipper because its the lightweight solution available to forward and centralize logs and files. But filebeat can also be used a data transformation tool with the help of processors that are available. There are different types of processors that are available for us to use which help in transforming the data like we can , decode the json fields from the source data, map the data fields to specific data types, use script (ie. javascript) to execute a function to transform the field value and truncate the field value. Here i am showing you a sinppet of configuration that we can use in filebeat.yml configuration file processor section to carry out this transformation.

But is it worth carrying out this data transformation at the filebeat layer. Yes, we can but it depends on how big you source data is that you are applying the transformation on.

If let’s say you json data documents are not large you can use filebeat to carry out the transformation before sending the data to Elasticsearch for indexing. You can remove the addition logstash layer which can save you one server or service resources.

But if your json data documents are huge its better to carry out the transformation of data at another layer rather than using filebeat for it. Reason being mostly we run filebeat on the same server where our production services are running and if filebeat is going to utilize most of the cpu and mem resources for data transformation it might impact or have resource congestion on for your other production services.

...
#================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
  - decode_json_fields:
      fields: ["message"]
      process_array: false
      max_depth: 1
      target: data
      overwrite_keys: false
      add_error_key: true
  - convert:
      fields:
        - {from: "data.@timestamp", to: "index_date", type: "string"}
        - {from: "data.body_bytes_sent", to: "data.body_bytes_sent", type: "integer"}
        - {from: "data.bytes_sent", to: "data.bytes_sent", type: "integer"}
        - {from: "data.request_length", to: "data.request_length", type: "integer"}
        - {from: "data.request_time", to: "data.request_time", type: "float"}
      ignore_missing: true
      fail_on_error: false
  - script:
      lang: javascript
      source: >
        var value = "";
        function process(event) {
            value = event.Get("index_date");
            value = value.replace(/-/g, ".");
            event.Put('index_date', value);
        }
  - truncate_fields:
      fields:
        - index_date
      max_characters: 10
      fail_on_error: false
      ignore_missing: true
...

Data Transformation @ logstash

This is the most preferred method and most organization follow to carry out the data transformation and enrichment activity before its sent to Elasticsearch for indexing. Here is my sippet of logstash.conf file to carry out the same data transformation but now at logstash layer. In logstash we use filters to transform the data and enrich it.

The benefits that we achieve by using logstash as an addition layer in between filebeat and elasticsearch is that we reduce the resource congestion on filebeat which mostly would be a production server and carry out the same transformation at the logstash layer. But the only drawback that we have here is an addition layer just to carry out the data transformtaion for us.

Let’s see what is the third option we have to carry out the data transformation.

...
filter {
  json {
    source => "message"
    target => "data"
  }
  mutate {
    convert => {
        "[data][body_bytes_sent]" => "integer"
        "[data][bytes_sent]" => "integer"
        "[data][request_length]" => "integer"
        "[data][request_time]" => "float"
    }
    add_field => {
        "index_date" => "%{[data][@timestamp]}"
    }
    convert => {
        "index_date" => "string"
    }
  }
  ruby {
    code => "
    value = event.get('index_date');
    value = value[0..9];
    value = value.gsub('-', '.');
    event.set('index_date', value);
    "
  }
}
...

Data Transformation @ Elastichsearch

Now that we have seen how we can carry out data transformation at filebeat and logstash layer and looked at some benefits and drawbacks. Let’s look at this third option where in we can use elasticsearch for carrying out the data transformation. Elasticsearch provides us with a feature called pipelines which can be used to transform the data as per the pipeline defintion before its sent to elasticsearch for indexing. Here is the snippet of pipeline configuration that you can load into elasticsearch and carry out the data transformation.

Once your pipeline is loaded into elasticsearch cluster you can forward the data from filebeat directly to ingestion pipeline for data transformation before its indexed.

The benefit that you achieve here is there is no need of addtional logstash layer in between filebeat and elasticsearch and also we can avoid from overloading our production grade server which is running filebeat just for log or event data shipping. Though you might see notice some slowness in your search and analysis capabilities that might be very nominal if you can afford that.

...
{
  "description" : "Logging Pipeline",
  "processors" : [
    {
      "json" : {
        "field" : "message",
        "target_field" : "data"
      }
    },
    {
      "convert" : {
        "field" : "data.@timestamp",
        "target_field": "index_date",
        "type": "string"
      }
    },
    {
      "convert" : {
        "field" : "data.body_bytes_sent",
        "type": "integer",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "convert" : {
        "field" : "data.bytes_sent",
        "type": "integer",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "convert" : {
        "field" : "data.request_length",
        "type": "integer",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "convert" : {
        "field" : "data.request_time",
        "type": "float",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "gsub": {
        "field": "index_date",
        "pattern": "-",
        "replacement": "."
      }
    },
    {
      "script": {
        "description": "truncate index_date",
        "lang": "painless",
        "source": "ctx['index_date'] = ctx['index_date'].substring(0,10);"
      }
    }
  ]
}
...

Hope you enjoyed reading this article. Thank you..