Implémenter un serveur Fluentd en Golang

le jeudi 12 novembre 2020

Bearstech | Dernière mise à jour : jeudi 12 novembre 2020

Le protocole Fluent permet la gestion des logs applicatifs. Dans cet article, on vous explique comment coder un serveur Fluentd en Golang, avec de vrais bouts de msgpack à l'intérieur.

Notre prochain webinar

Fluent est le protocole du CNCF pour faire balader ses logs et il a reçu des patchs et des tests de la part de grands (et petits) éditeurs.

Pour implémenter un protocole réseau, il faut des spécifications, et si on implémente le serveur, il est nécessaire d'avoir une implémentation de référence pour le client (et inversement, un serveur pour tester le client). Fluent fournit 2 serveurs, et plein de clients.

Fluent n'a pas de jolie RFC, mais une page wiki avec suffisamment d'informations.

Fluent utilise le format messagePack, qui lui, a une jolie page de spécification.

MessagePack

MessagePack, msgpack pour les gens pressés, se définit comme JSON en binaire, ce qui est un peu court comme description. MessagePack est un format de sérialisation sans description : il est auto descriptif. Concrètement, ça veut dire que le document contient les clefs et les valeurs.

Cette approche est moins concise que les formats à description (IDL) comme Thrift ou Protobuf, mais dispense de coordonner le client et le serveur avant de démarrer la discussion.

Msgpack est linéaire, donc il n'utilise aucune astuce pour éviter les répétitions, ce qui simplifie grandement sa lecture. Il permet même un traitement par flot : on n'est pas obligé de tout construire en mémoire avant de pouvoir traiter l'objet.

Le format de base est simple et de bon goût : chaque objet a un type, défini par son premier octet. Les 4 premiers bits de l'octet sont réservés au type, l'autre moitié peut être du contenu. Avec une approche radine, les entiers, par exemple, peuvent être encodés sur 1 à 9 octets.

Pour les types composés, les arrays et les map, on commence par annoncer la taille puis on empile les objets. Techniquement, les collections ne sont pas typées, et une map est une suite de clef/valeur. Il existe un type pour stocker du binaire, et la possibilité de créer ses propres types.

Fluent

Fluent grappille de la place en utilisant des tuples plutôt que des maps, des listes ordonnées d'objets, sans les étiqueter. Cet usage est courant, la bibliothèque golang permet de désérialiser une array en struct. Fluent peut aussi envoyer des nil comme pulsation cardiaque et rappeler que la connexion est active même si il n'y a rien à dire. Les messages sont donc des arrays commençant systématiquement par une string, qui peut être une commande (HELO, PING, PONG) ou l'étiquette d'un évènement.

Fluent est un protocole connecté utilisant TCP (et UDP pour faire du ping parce que pourquoi pas). Il est prévu dans les specs de pouvoir utiliser du TLS, et même avec un certificat côté client, pour authentifier.

Comme le protocole est connecté, on peut débuter la connexion par de l'authentification et ensuite envoyer les messages. Les salamalecs sont normalisés : HELO/PING/PONG, mais non implémentés dans les libs de base. Fluent doit estimer que c'est à l'ambassadeur fluentbit de s'en occuper, pas aux clients.

Client                     Serveur
  |                          |
  +------------------------->|
  |       HELO               |
  |<-------------------------+
  |       PING               |
  +------------------------->|
  |       PONG               |
  |<-------------------------+
  |       Message 1          |
  +------------------------->|
  |       Ack 1              |
  |<-------------------------+
  |       Message 2          |
  +------------------------->|
  |       Ack 2              |
  |<-------------------------+
  …

La négociation (à l'initiative du serveur) HELO/PING/PONG est optionnelle, tout comme les ack de message si le client ne les réclame pas.

Fluentd permet d'envoyer les messages un par un (mode message) ou par lot (mode suivi, suivi empaqueté, suivi empaqueté compressé) pour un même tag. Les messages sont horodatés, soit avec un int précis à la seconde, soit avec type défini précis à la nanoseconde.

le mode suivi utilise une array d'évènement, le mode empaqueté un gros binaire avec tous les messages à la queue leu leu encodé en msgpack, et il est possible de compresser en gzip ce gros binaire.

Dans les options du message, il est possible de préciser le nombre d'évènements (en binaire, il faut le lire pour le connaître et c'est encore pire quand c'est compressé). Il est aussi possible de préciser un chunk (les 128 premiers octets d'un UUID en base64) et à ce moment-là, le serveur est tenu de répondre un ack pour indiquer que les données sont bien arrivées, et qu'il n'est pas nécessaire de retenter l'envoi du message.

Implémentation d'un serveur en golang

Illustration fluent golang

Fluent a été créé en Ruby, et même si fluentbit est tout en C, le fantôme de Ruby continue de hanter le code.

Golang, langage compilé à typage fort, à la différence des langages à typage faible, déteste l'introspection même s’il est correctement outillé pour le faire. Msgpack est conçu pour définir des objets aux typages complexes, avec des collections pour construire des messages sans trop de limites. En golang, cette approche requiert de travailler avec des interface{}, la lib standard reflect et des cast en vérifiant que ça passe. Le code produit va être pénible à lire, fragile et peu performant. Clairement, c'est une approche à fuir.

Heureusement, la lib github.com/vmihailenco/msgpack/v5 permet de parler le msgpack sans introspection. La gestion du serveur TCP est triviale, on va créer une goroutine par connexion.

func ListenAndServe(address string) error {
    listener, _ := net.Listen("tcp", address) // on écoute en TCP
    if err != nil {
        return err
    }
    for {
        conn, err := listener.Accept() // Une nouvelle connexion
        if err != nil {
            return err
        }
        go handler(conn) // On gère ça dans une goroutine
    }
    return nil
}

La lib msgpack gère un io.Reader et va se charger de gérer les buffers puis de décoder le flot au fur et à mesure.

func handler(conn net.Conn) {
    defer conn.Close() // éteindre la lumière en quittant la pièce
    decoder := msgpack.NewDecoder(conn) // On commence à lire le flot de msgpack
    for { // jusqu'à la fin des temps
        err := oneMessage(decoder) // traiter un message
        if err != nil {
            log.Println(err)
            return // Ragnarok
        }
    }
}

On va maintenant pouvoir lire le flot de messages.

L'astuce est d'utiliser l'en-tête d'un object msgpack pour prendre des décisions plutôt que d'utiliser l'introspection sur une interface{}.

func oneMessage(decoder *msgpack.Decoder) error {
    code, err := decoder.PeekCode() // Type de l'objet
    if err != nil {
        return err
    }
    if code == msgpcode.Nil { // le type est nil, ce n'est qu'un battement de ❤️
        fmt.Println("Hearthbeat")
        return nil
    }
    if !msgpcode.IsFixedArray(code) { // le client ne doit envoyer que des arrays
        return errors.New("Not an array")
    }
    l, err := decoder.DecodeArrayLen() // taille de l'array
    if err != nil {
        return err
    }
    if l == 0 {
        return errors.New("Empty array")
    }
    if l > 10 {
        return errors.New("Flood")
    }
    type_, err := decoder.DecodeString() // le premier élément d'un message DOIT être une string
    if err != nil {
        return err
    }
    switch type_ {
    case "PING":
        doPing()
    default:
        return doMessage(type_, decoder, l)
    }
    return nil
}

Les clients officiels ne savent pas envoyer des lots ou gérer l'authentification, on va donc pouvoir commencer l'implémentation simplement.

func doMessage(tag string, decoder *msgpack.Decoder, l int) error {
    if l < 2 {
        return errors.New("Too short")
    }
    firstCode, err := decoder.PeekCode()
    if err != nil {
        return err
    }
    switch {
    case msgpcode.IsFixedArray(firstCode):
    // mode suivi
    case msgpcode.IsBin(firstCode):
    // mode empaqueté
    case firstCode == msgpcode.Uint32: // horodatage précis à la seconde
        if l > 4 {
            return fmt.Errorf("Message too large: %d", l)
        }
        ts, err := decoder.DecodeUint32() // horodatage
        if err != nil {
            return err
        }
        record, err := decoder.DecodeMap() // le log est une map
        if err != nil {
            return err
        }
        if l == 4 { // il est possible de passer des options
            option, err := decoder.DecodeMap()
            if err != nil {
                return err
            }
            return s.doEvent(tag, ts, record, option)
        }
        return doEvent(tag, ts, record, nil) // traitement de l'évènement
    default:
        return fmt.Errorf("Bad code %v", firstCode)
    }
    return nil
}

Le choix de msgpack facilite l'implémentation du client, par contre, côté serveur, ça oblige à vérifier plein de trucs, pour ne pas se faire surprendre et finir en panic ou en bouffant toute la RAM.

En implémentant de manière naïve la partie serveur, on constate :

  • Un protocole en msgpack à base de tuples plutôt que de map reste concis.
  • Il est possible de traiter les messages sans les lire entièrement.
  • Faire de l'inception avec du msgpack dans du msgpack permet de compresser le contenu sans toucher les métas qui permettent de prendre des décisions de routage.
  • Bien utilisés, les types et la liberté des maps permettent de faire évoluer le protocole sans le casser.

Le code, qui a plus une vocation pédagogique que de finir sur une prod, est disponible : https://github.com/factorysh/fluent-server

Service Audit de Performance web

Bearstech vous propose ses services Audit de Performance web

Découvrir ce service

Partager cet article

Flux RSS

flux rss

Partager cet article :