Thursday, 21 May 2020

Kafka to ELK message manipulation

Kafka to Elk Message Manipulation

Kafka to Elk Message Manipulation

2020-05-21T05:58:11+01:00



Introduction

We will be creating a message flow starting from Kafka and ending in Kibana. Flow will be like below:

console app (to send json message) -> kafka -> logstash -> elasticSearch -> kibana

We will be using ruby filter to manipulate the message as well and docker to setup the environment.

Setup

mkdir kafka-elk
cd kafka-elk
wget http://apache.mirror.anlx.net/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar xvf kafka_2.13-2.5.0.tgz

Note: We need kafka binaries to get the script that can send json message via console to kafka.

Populate docker-compose.yml file with following contents

version: '2'

services:
  zookeeper:
    image: 'bitnami/zookeeper:3'
    container_name: zookeeper
    ports:
      - '2181:2181'
    volumes:
      - 'zookeeper_data:/bitnami'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    networks:
      - elastic

  kafka:
    image: 'bitnami/kafka:2.5.0'
    container_name: kafka
    ports:
      - '9092:9092'
      - '29092:29092'
    volumes:
      - 'kafka_data:/bitnami'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
    depends_on:
      - zookeeper
    networks:
      - elastic

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.6.2
    container_name: elasticsearch
    environment:
      - node.name=elasticsearch
      - cluster.name=es-docker-cluster
      - cluster.initial_master_nodes=elasticsearch
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - elastic_data:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic

  kibana:
    image: docker.elastic.co/kibana/kibana:7.6.2
    container_name: kibana
    ports:
      - 5601:5601
    environment:
      ELASTICSEARCH_URL: http://elasticsearch:9200
      ELASTICSEARCH_HOSTS: http://elasticsearch:9200
    networks:
      - elastic

  logstash:
    image: docker.elastic.co/logstash/logstash:7.7.0
    container_name: logstash
    ports:
      - 5000:5000
    volumes:
    - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    - ./logstash.yml:/usr/share/logstash/config/logstash.yml
    - ./manipulate_msg.rb:/etc/logstash/manipulate_msg.rb
    networks:
      - elastic

networks:
  elastic:
    driver: bridge

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local
  elastic_data:
    driver: local

create logstash.conf file

cat <<EOF > logstash.conf
input {
  kafka {
    bootstrap_servers => "kafka:9092"
    client_id => "transform-text"
    group_id => "transform-text"
    consumer_threads => 3
    topics => ["transform-text"]

    # Following multiline json codec may not work on all the
    # possible multiline json records.
    # codec => multiline {
    #  pattern => "^\{"
    #  negate => true
    #  what => previous
    # }

    # use json record with no newline in between.
    codec => json
    tags => ["transformed-text", "kafka_source"]
    type => "kafka-test-messages"
  }

  # to test the logstah via telnet
  # e.g. cat some.json | nc localhost 5000
  tcp {
    port => 5000
    type => syslog
    codec => multiline {
      pattern => "^\{$"
      negate => true
      what => previous
    }
  }

  # to test the logstah via telnet
  # e.g. cat some.json | nc localhost 5000
  udp {
    port => 5000
    type => syslog
    codec => multiline {
      pattern => "^\{$"
      negate => true
      what => previous
    }
  }

}

filter {
  ruby {
    path => "/etc/logstash/manipulate_msg.rb"
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logstash-%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }
}
EOF

create logstash.yml with following contents

http.host: "0.0.0.0"
config.support_escapes: true

create manipulate_msg.rb with following contents.

def filter(event)
  # get the message line sent by kafka or any other source like syslog
  message = event.get("message")
  event.set("newField", "newValue")
  return [event]
end

Note: We are manipulating the message coming from kafka by adding an extra field/value pair to the message in logstash ruby filter which will be visible in kibana.

Note Kafka is not pushing messages to logstash. Its the logstash that pulling messages from kafka (acting as kafka consumer).

Create a sample.json file with following contents

{
  "menu": {
    "id": "file with space",
    "value": "File",
    "popup": {
      "menuitem": [
        {"value": "New", "onclick": "CreateNewDoc()"},
        {"value": "Open", "onclick": "OpenDoc()"},
        {"value": "Close", "onclick": "CloseDoc()"}
      ]
    }
  }
}

Note: We need to make above json input to be in one-line, as logstash cannot ingest multiline json record. You can use multiline codec in logstash.conf input plugin (to ingest multiline json) but then kibana will show that record as a single string and json record’s field/keys will not be shown as individual fields in kibana.

Multiline codec will make json record just a series of characters (a long string) and record’s keys/fields will not be recoginzed in kibana as separate fields. You might need to make changes to ruby filter so that kibana can show record’s key/fields as individual searchable fields.

To keep things simple, we will be converting our json in a flat structure and use codec json in logstash.conf’s kafka input plugin.

flatten the json record.

cat sample.json |  perl -wp -e 's/\n+//g' > flat_sample.json

Start the whole setup

docker-compose up -d

Note: You can bring down whole setup by running this command docker-compose down -v

Setup kibana

Wait for a minute for above setup to come up fully, and open Kibana URL: http://localhost:5601/ . You need to create an index pattern with logstash-* as index patten (inside management). But before you can create index-pattern you need to send some data to elastic search. That can be sent via running below mentioned kafka-console-producer.sh command. Once you have sent the json , you should be able to create index pattern. Now click on “discover” to view the sent data. Try sending more data.

send json data via kafka

kafka_2.13-2.5.0/bin/kafka-console-producer.sh --topic "transform-text" --bootstrap-server localhost:29092 < flat_sample.json

Note: Kafka input plugin is using json codec

Note: You should see json data visible in Kibana.

Note You can see data being ingested by logstash by viewing logstash logs docker logs -f logstash

send json data via syslog port 5000

 cat flat_sample.json | nc localhost 5000

Note: tcp input plugin is using port 5000 which is can be used by syslog as well.

Note: above command is sending data straight to logstash (skipping kafka)

Note: tcp input plugin is using multiline codec.

Please note the difference in reprsentation of two records sent above in kibana in order to understand the differnce between multiline and json codec.

Saturday, 14 March 2020

Blogger Go Cli

Blogger Go Cli

Blogger Go Cli

2020-03-14T21:16:00Z



Introduction

We will be creating a commandLine utility in Go to upload a post in Google blogger.

Account Preparation

  • You need to have a google account (gmail) (Paid account NOT required).
  • Once you have a google account, Login to https://console.developers.google.com/apis/credentials and create a new Project here.
  • Click on Dashboard on left hand column, and click on `+ ENABLE APIS AND SERVICES`.
    • Type in blogger in search bar, and select Blogger API v3 and then click ENABLE.`
  • Click on OAuth consent screen on left hand column, and select User Type to External and click CREATE.
    • On next screen type in Application Name as Blogger CLI (It can be any string).
    • Click on Add Scope and select both the Blogger API v3 options and click ADD and click Save.
  • Click on Credentials on the left hand side and you will be presented with following two options of creating credentials for Blogger API.
    • API Keys
    • OAuth 2.0 Client IDs
    • Service Accounts
    • Click on `+ CREATE CREDENTAILSon the top and selectOAuth Client ID` from drop down menu.
    • Select Application Type to other.
    • Type in Name to CLI Utility (it can be any string) and click create.
    • You will see an entry with name CLI Utility under OAuth 2.0 Client IDs.
    • Download the credential files by clicking on a down arrow button.
    • This will be a json file and we need it to authenticate it with google OAuth2.0 services.
  • Login to blogger site https://www.blogger.com
    • Sign-in to blogger.
    • Enter a display name you like.
    • Create a NEW BLOG and give it a name you like in Ttile
    • Type in address. It has to be a unique e.g somethingrandom23455.blogspot.com and click create blog
    • On next screen, look at the address bar of browser, it will be like https://www.blogger.com/blogger.g?blogID=3342324243435#allposts
    • Note down blog ID (a string after blogID=, excluding #allposts) in above URL. We need this ID to put in Go script.

Go development environment creation

I am using following Go version.

$ go version
go version go1.13.8 darwin/amd64

Setup following Go development directory structure.

mkdir -p BloggerCli/go-cli
touch BloggerCli/go-cli/createAndActivateGoWorkSpace.sh

Populate BloggerCli/go-cli/createAndActivateGoWorkSpace.sh with following contents

#! /bin/bash
# current directory is workspace
WORKSPACE=$(pwd)
cd "$WORKSPACE"
mkdir -p src bin pkg
export GOPATH=$WORKSPACE
export GOBIN=$WORKSPACE/bin

Create directory structure

cd BloggerCli/go-cli
source ./createAndActivateGoWorkSpace.sh
mkdir src/blogger

Install required libraries.

go get -v google.golang.org/api/blogger/v3
go get -v golang.org/x/oauth2/google

Copy credential file.

Copy above downloaded credential json file in BloggerCli/go-cli/src/blogger folder and rename it to OAuth2.0_secret.json.

Note: You can rename it any name

Create Go file BloggerCli/go-cli/src/blogger/bloggerCli.go file with following contents.

package main

/*
https://pkg.go.dev/google.golang.org/api/blogger/v3?tab=doc
https://godoc.org/google.golang.org/api/option
https://pkg.go.dev/golang.org/x/oauth2/google?tab=doc
https://pkg.go.dev/golang.org/x/oauth2?tab=doc#Config.AuthCodeURL
*/

import (
    "context"
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "strings"

    "golang.org/x/oauth2"
    "golang.org/x/oauth2/google"

    "google.golang.org/api/blogger/v3"
    "google.golang.org/api/option"
)

func checkErrorAndExit(err error) {
    if err != nil {
        log.Fatal(err)
    }
}

func printNoOfPosts(postsService *blogger.PostsService, BlogID string) {
    postListCall := postsService.List(BlogID).MaxResults(500)
    postList, err := postListCall.Do()
    checkErrorAndExit(err)
    fmt.Printf("Total posts in this blog: %d\n", len(postList.Items))
}

func getBloggerClient() *blogger.Service {
    ctx := context.Background()

    contents, err := ioutil.ReadFile("OAuth2.0_secret.json")
    checkErrorAndExit(err)

    conf, err := google.ConfigFromJSON(contents, blogger.BloggerScope)
    checkErrorAndExit(err)

    url := conf.AuthCodeURL("state")
    fmt.Println("Visit the following URL for the auth dialog:")
    fmt.Printf("\n%v\n", url)

    var code string
    fmt.Printf("\nEnter the code obtained from above here: ")
    fmt.Scanln(&code)

    token, err := conf.Exchange(oauth2.NoContext, code)
    checkErrorAndExit(err)

    bloggerService, err := blogger.NewService(ctx, option.WithTokenSource(conf.TokenSource(ctx, token)))
    checkErrorAndExit(err)

    return bloggerService
}

func checkMandatoryArguments(args []*string) {

    allArgsExists := true
    for _, val := range args {
        if len(*val) == 0 {
            allArgsExists = allArgsExists && false
        } else {
            allArgsExists = allArgsExists && true
        }
    }

    if !allArgsExists {
        flag.PrintDefaults()
        os.Exit(2)
    }
}

func createNewPost(title *string, labels *string, fileToUpload *string) *blogger.Post {

    labelsSlice := strings.Split(*labels, ",")
    contents, err := ioutil.ReadFile(*fileToUpload)
    checkErrorAndExit(err)

    newPost := blogger.Post{}

    if len(*labels) == 0 {
        newPost = blogger.Post{
            Content: string(contents),
            Title:   *title,
        }
    } else {
        newPost = blogger.Post{
            Content: string(contents),
            Labels:  labelsSlice,
            Title:   *title,
        }
    }
    return &newPost
}

func insertAPost(title *string, labels *string, fileToUpload *string, BlogID string, postsService *blogger.PostsService) {
    fmt.Println("============= Inserting a new Post ===============")

    newPost := createNewPost(title, labels, fileToUpload)

    postInsertCall := postsService.Insert(BlogID, newPost)
    post, err := postInsertCall.Do()
    checkErrorAndExit(err)
    fmt.Println("A new post added to Blog with following details")
    fmt.Printf("Title: %s\nPostID: %s\n", post.Title, post.Id)
    fmt.Println("=================================================")
    printNoOfPosts(postsService, BlogID)
}

func updateAPost(title *string, labels *string, fileToUpload *string, BlogID string, postsService *blogger.PostsService, postID string) {
    fmt.Println("============= updating a Post ===============")

    // Search post from Title if postID not specified.
    if len(postID) == 0 {
        postsSearchCall := postsService.Search(BlogID, *title)
        postsList, err := postsSearchCall.Do()
        checkErrorAndExit(err)
        if len(postsList.Items) > 1 {
            fmt.Printf("Following multiple posts found for title = %s\n", *title)
            for _, post := range postsList.Items {
                fmt.Println(post.Title)
            }
            log.Fatal("\n\nERROR: Not updating post. Title must identify a single post only. Try specifying PostID")
        } else {
            postID = postsList.Items[0].Id
        }
    }

    newPost := createNewPost(title, labels, fileToUpload)
    postsPatchCall := postsService.Patch(BlogID, postID, newPost)
    post, err := postsPatchCall.Do()
    checkErrorAndExit(err)
    fmt.Println("Following post has been updated in Blog with following details")
    fmt.Printf("Title: %s\nPostID: %s\n", post.Title, post.Id)
    fmt.Println("=================================================")
    printNoOfPosts(postsService, BlogID)
}

func main() {

    fileToUpload := flag.String("f", "", "html file to upload (mandatory)")
    title := flag.String("t", "", "Post title (mandatory)")
    blogid := flag.String("b", "", "Blod ID (mandatory)")
    labels := flag.String("l", "", "comma separated list of labels (optional)")
    modify := flag.Bool("m", false, "Modify a post contents (optional: modifies post in mentioned in -t )")
    postID := flag.String("p", "", "post ID (optional: To be specified when updating a post)")
    flag.Parse()

    if flag.NFlag() == 0 {
        flag.PrintDefaults()
        os.Exit(1)
    }

    mandatoryArgs := []*string{fileToUpload, title, blogid}
    checkMandatoryArguments(mandatoryArgs)

    BlogID := *blogid
    PostID := *postID

    bloggerService := getBloggerClient()
    postsService := bloggerService.Posts

    printNoOfPosts(postsService, BlogID)

    if !*modify {
        insertAPost(title, labels, fileToUpload, BlogID, postsService)
    } else {
        updateAPost(title, labels, fileToUpload, BlogID, postsService, PostID)
    }
}

Create a test file to be uploaded.

echo 'This is a first post' > sometext.txt

Note: You can create an HTML file as well.

Run the script

Create a New Post

$ cd BloggerCli/go-cli/src/blogger

$ go run bloggerCli.go -b 232343232322 -f sometext.txt -t "A new Post" -l "label1,label2"
Visit the URL for the auth dialog: https://accounts.google.com/o/oauth2/auth?client_id=99999999-ccc55566666iiioo999ddd.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&response_type=code&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fblogger&state=state
Enter the code obtained from above here: 9/rf7hM-sds98JnHgsdK90OiYhsss790NHdsaDs
Total posts in this blog: 51
============= Inserting a new Post ===============
A new post added to Blog with following details
Title: A new Post
PostID: 2344555555555999
=================================================
Total posts in this blog: 52

Update a Post

$ cd BloggerCli/go-cli/src/blogger

$ go run bloggerCli.go -b 232343232322 -f sometext.txt -t "Multiple Vms Vagrantfile" -m
Visit the URL for the auth dialog: https://accounts.google.com/o/oauth2/auth?client_id=99999999-ccc55566666iiioo999ddd.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&response_type=code&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fblogger&state=state
Enter the code obtained from above here: 9/rf7hM-sds98JnHgsdK90OiYhsss790NHdsaDs
Total posts in this blog: 51
============= updating a Post ===============
Following post has been updated in Blog with following details
Title: Multiple Vms Vagrantfile
PostID: 334556669992323
=================================================
Total posts in this blog: 51

Note: You need to open the URL mentioned in the above output in a browser and authorize the request. This will return a code that need to enter above.

References used

  • https://pkg.go.dev/google.golang.org/api/blogger/v3?tab=doc
  • https://godoc.org/google.golang.org/api/option
  • https://pkg.go.dev/golang.org/x/oauth2/google?tab=doc
  • https://pkg.go.dev/golang.org/x/oauth2?tab=doc#Config.AuthCodeURL