Kafka to Elk Message Manipulation
2020-05-21T05:58:11+01:00
Introduction
We will be creating a message flow starting from Kafka and ending in Kibana. Flow will be like below:
console app (to send json message) -> kafka -> logstash -> elasticSearch -> kibana
We will be using ruby filter to manipulate the message as well and docker to setup the environment.
Setup
mkdir kafka-elk
cd kafka-elk
wget http://apache.mirror.anlx.net/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar xvf kafka_2.13-2.5.0.tgz
Note: We need kafka binaries to get the script that can send json message via console to kafka.
Populate docker-compose.yml file with following contents
version: '2'
services:
zookeeper:
image: 'bitnami/zookeeper:3'
container_name: zookeeper
ports:
- '2181:2181'
volumes:
- 'zookeeper_data:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- elastic
kafka:
image: 'bitnami/kafka:2.5.0'
container_name: kafka
ports:
- '9092:9092'
- '29092:29092'
volumes:
- 'kafka_data:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
depends_on:
- zookeeper
networks:
- elastic
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.6.2
container_name: elasticsearch
environment:
- node.name=elasticsearch
- cluster.name=es-docker-cluster
- cluster.initial_master_nodes=elasticsearch
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- elastic_data:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
kibana:
image: docker.elastic.co/kibana/kibana:7.6.2
container_name: kibana
ports:
- 5601:5601
environment:
ELASTICSEARCH_URL: http://elasticsearch:9200
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
networks:
- elastic
logstash:
image: docker.elastic.co/logstash/logstash:7.7.0
container_name: logstash
ports:
- 5000:5000
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
- ./logstash.yml:/usr/share/logstash/config/logstash.yml
- ./manipulate_msg.rb:/etc/logstash/manipulate_msg.rb
networks:
- elastic
networks:
elastic:
driver: bridge
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
elastic_data:
driver: local
create logstash.conf file
cat <<EOF > logstash.conf
input {
kafka {
bootstrap_servers => "kafka:9092"
client_id => "transform-text"
group_id => "transform-text"
consumer_threads => 3
topics => ["transform-text"]
# Following multiline json codec may not work on all the
# possible multiline json records.
# codec => multiline {
# pattern => "^\{"
# negate => true
# what => previous
# }
# use json record with no newline in between.
codec => json
tags => ["transformed-text", "kafka_source"]
type => "kafka-test-messages"
}
# to test the logstah via telnet
# e.g. cat some.json | nc localhost 5000
tcp {
port => 5000
type => syslog
codec => multiline {
pattern => "^\{$"
negate => true
what => previous
}
}
# to test the logstah via telnet
# e.g. cat some.json | nc localhost 5000
udp {
port => 5000
type => syslog
codec => multiline {
pattern => "^\{$"
negate => true
what => previous
}
}
}
filter {
ruby {
path => "/etc/logstash/manipulate_msg.rb"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
EOF
create logstash.yml with following contents
create manipulate_msg.rb with following contents.
def filter(event)
# get the message line sent by kafka or any other source like syslog
message = event.get("message")
event.set("newField", "newValue")
return [event]
end
Note: We are manipulating the message coming from kafka by adding an extra field/value pair to the message in logstash ruby filter which will be visible in kibana.
Note Kafka is not pushing messages to logstash. Its the logstash that pulling messages from kafka (acting as kafka consumer).
Create a sample.json file with following contents
{
"menu": {
"id": "file with space",
"value": "File",
"popup": {
"menuitem": [
{"value": "New", "onclick": "CreateNewDoc()"},
{"value": "Open", "onclick": "OpenDoc()"},
{"value": "Close", "onclick": "CloseDoc()"}
]
}
}
}
Note: We need to make above json input to be in one-line, as logstash cannot ingest multiline json record. You can use multiline codec in logstash.conf input plugin (to ingest multiline json) but then kibana will show that record as a single string and json record’s field/keys will not be shown as individual fields in kibana.
Multiline codec will make json record just a series of characters (a long string) and record’s keys/fields will not be recoginzed in kibana as separate fields. You might need to make changes to ruby filter so that kibana can show record’s key/fields as individual searchable fields.
To keep things simple, we will be converting our json in a flat structure and use codec json in logstash.conf’s kafka input plugin.
flatten the json record.
Start the whole setup
Note: You can bring down whole setup by running this command docker-compose down -v
Setup kibana
Wait for a minute for above setup to come up fully, and open Kibana URL: http://localhost:5601/ . You need to create an index pattern with logstash-* as index patten (inside management). But before you can create index-pattern you need to send some data to elastic search. That can be sent via running below mentioned kafka-console-producer.sh command. Once you have sent the json , you should be able to create index pattern. Now click on “discover” to view the sent data. Try sending more data.
send json data via kafka
kafka_2.13-2.5.0/bin/kafka-console-producer.sh --topic "transform-text" --bootstrap-server localhost:29092 < flat_sample.json
Note: Kafka input plugin is using json codec
Note: You should see json data visible in Kibana.
Note You can see data being ingested by logstash by viewing logstash logs docker logs -f logstash
send json data via syslog port 5000
Note: tcp input plugin is using port 5000 which is can be used by syslog as well.
Note: above command is sending data straight to logstash (skipping kafka)
Note: tcp input plugin is using multiline codec.
Please note the difference in reprsentation of two records sent above in kibana in order to understand the differnce between multiline and json codec.