Sunday, October 7, 2018

A primer into monitoring of distributed systems

Monitoring Distributed Systems

As written in an older post, there are multiple reasons behind monitoring a computing system: collecting performance metrics to analyze the system scalability, as well as storing logs to be able to perform a retrospective analysis upon failures or even aiming at preventing them and ensure high availability. We refer to Chapter 6 of Google's "Site Reliability Engineering" book for further information.

Collecting logs and metrics of distributed systems requires large data storage. As known, there exists 2 possibilities to scale a system: vertically, by adding more resources (e.g., CPU and RAM), and horizontally, by adding more instances of the application server. Vertical scaling is a common peculiarity of traditional transactional databases, i.e., those that respect the ACID properties: i) atomicity, ii) consistency, iii) isolation, and iv) durability.

While reads can be easily scaled by load balancing multiple federated instances, write operations require keeping those strict consistency properties on data, which implies a loss of performance for growing number of queries, since even in presence of sharding integrity constraints will in practice lock multiple shards. While those schema constraints can be relaxed, this will imply maintaining that schema and its integrity at application level, with the higher burden and risk at developer side.

NoSQL Databases

NoSQL databases relax a few of those ACID properties to allow for the achievement of horizontal scaling. It is only by relaxing some of those constraints that higher availability can be achieved. The CAP theorem, where CAP stands for consistency, availability and partition tolerance explains this approach. The concept is that only 2 of the CAP properties can be satisfied. Intuitively, every distributed system has to deal with network and information failures. Partition tolerance is the ability of a system to continue working (i.e., offering its service) in spite of communication failures between any of its subsystems. For instance, the storage service is kept even if two of the nodes can't communicate any more, such as after a service crashed on one of the two, or even worse when in presence of transient failures that imply packets loss. A system with only availability and consistency, is a system that is consistent and available as long as no partition on the data is done, since when partitioning the data across nodes an information error would determine an inconsistency and the need to recover it, consequently becoming unavailable. Therefore CA systems are only possible for classic transactional databases, where scaling is typically not done horizontally (by adding network nodes) but rather increasing resources locally (as long as possible). The explanation provided here explains why partition tolerance is always present in NoSQL databases. Consequently, one can either achieve availability and partition tolerance (thus giving up consistency) or viceversa give up availability to ensure consistency. CP system guarantee consistency across nodes, but the system becomes unavailable to recover a failure on a partition. AP systems viceversa remain always on line even when certain nodes experience failures when synchronizing partitions, but this does not guarantee that all nodes will have consistent data between two contiguous synch operations. Most of todays systems employ the concept of eventual consistency, where partitions (e.g. replicas) are updated  (i.e., upon writes or updates on shards) asynchronously.

As reported on Wikipedia, there has been various ways of classifying NoSQL databases, namely:

  • Document databases (e.g., MongoDB, Couchbase, CouchDB, ElasticSearch)
  • Graph databases (e.g. Neo4j)
  • Key-value stores (e.g., Redis, Apache Ignite, Dynamo DB)
  • Columnar databases (e.g.,  HBase, Cassandra)

Based on the type of collected logs and metrics different solutions shall be used. If you want to get a handle on the power of such systems, I suggest watching the video posted here. Common stacks are the Elastic/ELK (ElasticSearch, LogStash and Kibana) and TICK (Telegraf, InfluxDB, Chronograf, and Kapacitor) which respectively address text-based data (e.g. logs) and time series data (e.g. time-stamped metrics). A very common alternative to the TICK is the Graphite time series database with the Graphana dashboarding tool. For a comparison of Kibana and Grafana see here. So the main difference between those stacks lays in the use of either a document-oriented database or a time series database. The post here explains further the difference between collecting logs and metrics. A comparison of time series databases for reporting purposes are presented here, herehere and here. Based on the type of data you wish to collect there exists plenty of daemons ready to use, such as collectd, fluentd, telegraf, see here a list. If you feel like trying any out, there is plenty of Docker images out there to let you start a whole stack in a matter of seconds.


Analysing Logs - ElasticSearch and Kibana

Setup

This is the pipeline we are going to setup in this section:
I report hereby a reference to easily install ElasticSearch and Kibana.

  1. Pull the elasticsearch image
    docker pull docker.elastic.co/elasticsearch/elasticsearch:6.4.2
  2. Pull the Kibana image
    Since using the latest keyword is not supported when pulling an image we specify the latest version, as mentioned here.
    docker pull docker.elastic.co/kibana/kibana:6.4.2
  3. Create a bridged network to interconnect the services (check this docker guide for an idea)
    docker network create esearchnet
    the available networks can be shown with
    docker network ls
  4. Run a single-node Elasticsearch instance, as shown here
    docker run -p 9200:9200 -p 9300:9300 \
    --name esearch --net esearchnet \
    -e "discovery.type=single-node" \
    docker.elastic.co/elasticsearch/elasticsearch:6.4.2
    Now the elasticsearch container is exposing a REST interface at port 9200, which can be used to interact with the data. A health check to verify that the service is running correctly can be done by calling a GET at http://127.0.0.1:9200/_cat/health?v
  5. Run Kibana as simple container sharing its network with the host (to simplify connection to the already running elastic search container)
    docker run -d --name kibana --net esearchnet -p 5601:5601 docker.elastic.co/kibana/kibana:6.4.2
    Make sure Kibana has started correctly by inspecting its log
    docker logs --tail 500 <dockercontainerID>
    You can use docker ps to retrieve the container id for kibana
  6. Kibana can now be accessed at port 5601, for instance to inspect its status run curl http://elasticsearch:5601/ (which does not however returns anything per se, but that means it is accessible anyways) or access it directly from the browser. For the latter you might have received a message as the following:





    This means that Kibana can't find a runnig Elasticsearch cluster. In this case, please restart Kibana and overwrite the hostname of the elasticsearch service that Kibana is going to use (like here), as:
    docker run -d --name kibana --net esearchnet -p 5601:5601 -e ELASTICSEARCH_URL=http://esearch:9200 docker.elastic.co/kibana/kibana:6.4.2
    where esearch is the container name for the elasticsearch docker.
    Check again the logs to make sure Kibana runs correctly:
    docker logs --tail 500 <kibanacontainerid>
    You should have something like the following:
    ...
    {"type":"log","@timestamp":"2018-10-07T17:37:15Z","tags":["status","plugin:security@6.4.2","info"],"pid":1,"state":"green","message":"Status changed from yellow to green - Ready","prevState":"yellow","prevMsg":"Waiting for Elasticsearch"}
    {"type":"log","@timestamp":"2018-10-07T17:37:16Z","tags":["license","info","xpack"],"pid":1,"message":"Imported license information from Elasticsearch for the [monitoring] cluster: mode: basic | status: active"}
    {"type":"log","@timestamp":"2018-10-07T17:37:28Z","tags":["info","http","server","listening"],"pid":1,"message":"Server running at http://0:5601"}


Yeey! You are done setting up Elastic Search and Kibana for what needed here.

Introduction to ElasticSearch

Elasticsearch (hereby abbreviated as ES) is a near real-time distributed search engine, based on the apache Lucene project and thus meant for text data. While it is developed in Java, client API are provided for all major programming languages along with a standard REST interface, which we will be using in this section. Specifically, the main goal of this section is to introduce ElasticSearch and how to interact with this database, specifically to ingest log data, to process it and report a summary on Kibana. A complete overview of ElasticSearch would be useful, though is not part of the scope of this post. I therefore refer to this O'Reilly handbook and to a later post to overview its main functionalities.
ES provides full-text search capabilities over schema-free JSON documents. By default, each field in a document is indexed and consequently made searchable.

Accordingly, an ES instance contains multiple indexes, which in turn contain multiple types (i.e. tables in a RDBMS) each holding a bunch of documents, where each document is as mentioned a set of fields. Documents are generally composed of shards, which can be replicated and distributed across multiple nodes. A node is an instance of elasticsearch, be it running on a physical or a virtual computing environment. An ES cluster is specified by defining a common cluster.name property (in config/elasticsearch.yml) for each node's configuration. An ES cluster has a master-slave topology, with the master being elected to keep track of changes to the indexes and available nodes, though actual document modification and search is carried out by the individual slave nodes who own the shard, since every node is aware of the position of a seeked document.

An index is therefore only a logic namespace grouping a bunch of shards, with each shard being a group of documents managed for a certain index by a specific cluster node. This allows ES to migrate shards to less loaded cluster nodes in order to keep the cluster balanced. Namely, primary shards contain the documents for the index (therefore their number defines the index size), while replica shards are mere copies used for failover and to speed up read requests. Upon failure on a master node a new master node is elected. Upon failure on primary shards, the replica shards are elected primary shard for the index. The index becomes unusable when not all primary shards are available any longer (i.e., data loss). An index can be created as follows:

PUT /index/
{
    "settings" : {
        "number_of_shards" : 5,
        "number_of_replicas" : 1
    }
}

Unless specified, the default number of primary shards is 5. This number constraints the resources available to the index. A number of replicas set to 1 means that we have 1 copy of each primary shard, thus 5 replica shards. Since it does not make any sense to handle replica shards on the same ES node used for primary shards, the replica shards will remain unassigned as long as not enough ES nodes are available to contain them. ES nodes can be dynamically added to and removed from the cluster. Consequently, also the number of replica shards can be updated at run time to follow changes on demand (i.e., search requests).

PUT /index/_settings
{
    "number_of_replicas" : 3
}

A document can be created with a PUT at /index/type/documentID:

PUT /index/type/documentID
{
    "field" : "value"
}

Alternatively, the documentID can be automatically generated at document creation. In this case POST instead of PUT is used. Also, a version number is associated to each document and incremented upon modifications and cancellations. A document can be deleted using the DELETE method at /index/type/documentID.
A document can be retrieved with a GET at the REST URI /index/type/documentID.  This is however no actual search, since we already knew the exact document ID. We can search for the value of a specific field in the document type, by using the _search endpoint, i.e. by calling a GET at /index/type/_search?q=field_name:field_value. This will look for those documents having value for the specified field. Alternatively, a search pattern can be provided in the request body, as below:

GET /index/type/_search
{
    "query" : {
        "match" : {
            "field_name" : "field_value"
        }
    }
}

Also, aggregators can be used on the fields to allow for the extraction of some sort of analytics:
  • bucketing - to group by a specific criteria and perform a computation within each bucket (e.g. group by a certain field); a bucket is then a collection of documents meeting a certain criteria.
  • metric - to compute certain metrics over one or more fields of documents in a bucket;
  • matrix - to perform aggregations over multiple fields 
  • pipeline - to nest multiple aggregations into some sort of computing pipeline
More complicated queries can be performed by adding filtering predicates and full-text search:

GET /index/type/_search
{
    "query" : {
        "filtered" : {
            "filter" : {
                "range" : {
                    "field_name_2" : { "gt" : 10 } 
                }
            }
            "query" : {
                "match" : {
                    "field_name" : "value1 value2"
                }
            }
        }
    }
}

Specifically, the "value1" and "value2" will be looked up independently in the field_name field and a relevance score will be computed so that documents having both will be ranked at a higher position. The alternative "match_phrase" instead of "match" would instead looking up for both "value1 value2" in the text so that no unrelated documents are returned. Behind the curtain, ES builds an inverted index to map all words (or tokens) encountered in each document to the documents they appeared in.
ES treats the index as immutable structure, both to prevent concurrency issues on updating the index, and to allow for greater compression when saving the index to disk. This, however, means that the whole index has to be rebuilt once new documents are added, which is a great limitation. To mitigate this issue, ES creates multiple inverted indexes named per-segment indexes, each related to a different point in time or commit point, so that when a search is performed, all built inverted indexes are queried chronologically so that aggregations can be calculated simply by adding up the result for each index, as a sort of epoch delta. It follows that the limit to real-time search is the period by which segments are being commited to disk. This issue is avoided by committing first the lighter and faster in-memory cache and only later on asynchronously persisting it to disk. By default, the shard can access new data from his cache every second, which makes ES a near real-time search engine. This can be overwritten by specifying the "refresh_interval" property in the index settings (See here).
An inverted index on words is what in natural-language processing is named bag-of-word document model (BOG) and it is the basis representation for similarity measures such as the term frequency-inverse document frequency (TF-IDF). Naturally, before counting the appearance of words in documents, words are normalized, such as by formatting them to lowercase, removing stop words (e.g., articles), and grouped based on synonims or common root (stemming). ES provides multiple analyzers for this purpose. By default the standard analyzer is applied to every string field, unless a specific mapping is provided for the document type. The mapping forces a datatype for each of the document fields and consequently instruct ES on how to process them.
ES initially supported only the datatypes boolean, string, float, double, date, byte, short, integer, long which were specialized to time in more specific ones, such as IP and Geo datatypes. See the official documentation here for a complete and up to date list.
A mapping can be retrieved with a GET at /index/_mapping/type and can only be updated for new fields or for search-related settings using the PUT method at /index/_mapping/type. So the mapping for existing fields in an index can not be changed. Intuitively, the mapping is most useful for string fields where we need to specify how the text will be processed. In detail, an "index" property is specified to control how the string is indexed: "analyzed" (default) i.e. indexed as full text after being processed, "not_analyzed" i.e. index as specified without any processing, "no" i.e. do not index it at all. Also, an "analyzer" property is specified to determine which analyzer is to be used for indexing and querying. In the latest versions, the datatype string has been specialized further into text and keyword, with keyword that is already implicitly indicating index:"not_analyzed".

This is not the end of an overview of ES, but rather just the beginning!

Analyzing data in ElasticSearch using Kibana

As we have shown, ElasticSearch exposes a REST interface to manage data. This is however cumbersome and not handy when we desire to perform explorative analyzes on stored data. Kibana comes in the game by providing a more abstract interface to interact with data stored in ES, namely allowing developers to create and visualize queries over a web-based graphical user interface (GUI), which can be updated in real time and also shared across the team.

In the "Dev Tools" section Kibana provides a console to query ES. As a first example let's retrieve the cluster status:



Let's now try to load and analyze some data. A typical dataset used in ES tutorials is a preprocessed complete work of Shakespeare.

For its analysis we will do the following:

  1. Create an index and a mapping for the fields, namely to disable preprocessing of string ones (keyword). For this operation, we can either use curl or the dev tool console on Kibana;


    Create an index and mapping for the shakespeare data

  2. Download the datasets and load them on ES

    curl -H 'Content-Type: application/x-ndjson' \
    -XPOST 'localhost:9200/shakespeare/doc/_bulk?pretty' \
    --data-binary @shakespeare_6.0.json


    this is the file struct for a head -n 4
    {"index":{"_index":"shakespeare","_id":0}}
    {"type":"act","line_id":1,"play_name":"Henry IV", "speech_number":"","line_number":"","speaker":"","text_entry":"ACT I"}
    {"index":{"_index":"shakespeare","_id":1}}
    {"type":"scene","line_id":2,"play_name":"Henry IV","speech_number":"","line_number":"","speaker":"","text_entry":"SCENE I. London. The palace."}
  3. Create an index pattern to capture the ingested documents in Kibana
  4. Creating an index pattern for the shakespeare ES index

    The index pattern is created and documents were found
  5. Let's see what we have for "Romeo and Juliet"
  6. Documents for Romeo and Juliet
  7. Create an example chart to show the number of documents of type:line for a speaker in "Romeo and Juliet"
  8. Select the shakespeare index pattern

    Frequency distribution of speakers in Romeo and Juliet

    Here we basically created a bucket for each speaker and a metric based on the count of documents. This shows the frequency distribution of documents for each speaker in Romeo and Juliet. Save the visualization by clicking on save on the top right. You can also set a refresh period, shall this data change any time (not the case for our manually ingested data).

  9. Go to the dashboard section, create a dashboard and add the panel you just created. 

Introduction to Logstash & Beats

Logstash is a tool that can be used to collect and process logs from local or remote machines, i.e., it acts as log processing tool that can be used to both retrieve and process collected log streams before they are stored in ES. Logstash can be installed using your OS package manager, or docker if you prefer (i.e., "docker pull docker.elastic.co/logstash/logstash:6.4.2" and ). Logstash is a complete ETL framework for streaming log data. Alternatively, Beats offer a more lightweight alternative to logstash, which can be used to track changes on a number of data sources and directly send those events to either logstash (for processing) or directly to elasticsearch. As before, this can be installed as OS package or using docker (i.e., "docker pull docker.elastic.co/beats/filebeat:6.4.2"). Beats can either output collected logs to ES or even send them to logstash for further processing (by default listening on port 5044). A comparison of logstash and beats is reported here. FileBeat is the one Beat to track log files and ensures at-least-once delivery of log entries, i.e. with no data loss, by using an acknowledgement and a retrial mechanism. A status of each tracked file (i.e., its last sent line) is kept to ensure changes are timely detected and propagated. FileBeat can be configured by modifying the filebeat.yml file (see example). The first part is the so called prospector (or input in the newest versions), which is responsible for tracking files and perform basic processing. The output file defines the destination of the log stream, be it ES or logstash for further processing.


Let's try to send some data to ES using filebeats:

  • Running filebeats using docker (trickier, since the container environment is very limited)
    1. Download the docker image for filebeats
      docker pull docker.elastic.co/beats/filebeat:6.4.2
    2. Create an empty filebeat.yml configuration file on your host
      nano /path/to/config_file/filebeat.yml
    3. Run the filebeats docker container

      docker run -it --name fbeat --net esearchnet \
      -v /path/to/config_file/filebeat.yml:/usr/share/filebeat/filebeat.yml \
      --entrypoint bash \
      docker.elastic.co/beats/filebeat:6.4.2

      this way we have interactive access to the container and can modify the filebeat configuration file at wish, since otherwise the file system is mounted as read only; Any modification to the file on the host will be directly reflected to the one mounted inside the container;
    4. Define the filebeat.yml configuration file for filebeats as documented here and shown here

      filebeat.inputs:
      - type: log
        enabled: true
        paths:
        - '/var/log/*/*.log'
      output.elasticsearch:
        hosts: ["localhost:9200"]

      this tracks all logs from the container var folder and sets the target to ES on localhost:9200 (which should be reachable since we set --net esearchnet when starting it).  
    5. Start filebeats
      ./filebeat -c filebeat.yml -e
  • Running filebeats directly on your host OS (easier)

    1. Download filebeats as a tar and open the archive, for my MAC this just created a folder
      curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.4.2-darwin-x86_64.tar.gz

      tar xzvf filebeat-6.4.2-darwin-x86_64.tar.gz
    2. Define a filebeat config
      filebeat.inputs:
      - type: log
        enabled: true
        #include_lines: ['^ERR', '^WARN']
        paths:
        - /var/log/system.log
        - /var/log/wifi.log
      output.elasticsearch:
        hosts: ["localhost:9200"]
      this tracks log files from the system and the wifi. I initially did only track ERR and WARN messages, but being a bit impatient I then commented this out and ingested anything into ES.
    3. Start filebeats using the config file
      ./filebeat -c filebeat.yml -e

      you can see below the log of the filebeats agent, confirming the tracking of the files and the connection to ES.

      FileBeats Agent log
      Let's now check whether anything new was inserted in ES. By default (unless set esplicitly in the configuration file, see documentation here) the name filebeats-date is used, so we should expect for the time of writing something like "filebeat-6.4.2-2018.10.14". Let's use the REST interface to query ES.

      The newly created filebeats index
      After going to "Management">"Index Patterns">"Create Index" we can create an index pattern of kind "filebeat-*" to capture all ingested data. Also, I used @timestamp as time filter. As you can see our system log ended up in Kibana and is correctly indexed.

      The ingested log data in Kibana/ES

      Feel free to play around with other input and output agents. This should however give the idea of how the log collection chain works.
Finally, have a look at the default dashboards shipped with FileBeats. You can see an example in the video below:



Analysing Metrics - InfluxDB and Grafana

Setup

This is the pipeline we are going to setup in this section:

I report hereby a reference to easily install Influxdb and Grafana. I like using docker to speed up installation of packages and easily remove them afterwards. 


1. Install InfluxDB
  • Bare-metal installation as a service
    https://docs.influxdata.com/influxdb/v0.9/introduction/installation/
  • Lazy docker installation
    The influxdb Dockerfile is located here. A container can be created as follow:
    INFLUXDB_VOLUME="/Users/andreamonacchi/Downloads/monitoring_tests/influxdb_volume"
    docker run -p 8086:8086 -v $INFLUXDB_VOLUME:/var/lib/influxdb influxdb

    2. Install Grafana
    • Bare-metal installation as a service
      https://grafana.com/grafana/download
    • Docker installation
      docker run \
        -d \
        -p 3000:3000 \
        --name=grafana \
        -e "GF_SERVER_ROOT_URL=http://grafana.server.name" \
        -e "GF_SECURITY_ADMIN_PASSWORD=secret" \
        grafana/grafana
      If you ran this correctly on Docker you should see the containers running:

      Unless configured otherwise, Grafana will be available at port 3000 (http://localhost:3000). Login with the "admin", "secret" credentials you provided at startup.


      Once logged, you should see something like this:

      Analysing data from InfluxDB

      As seen, InfluxDB exposes the port 8086 to allow interaction with the DB. For instance:

      curl http://localhost:8086/query --data-urlencode "q=CREATE DATABASE metrics"

      which creates a new database called metrics and returns {"results":[{"statement_id":0}]}.
      Another possibility is to run a command line client, shipped with the influxdb server and therefore already available in our docker container. This can be run as follows:



      As visible, our database was correctly created. Let's push some example data in the database, as shown in the influxdb documentation.


      We now need to make sure Grafana can read data from InfluxDB.

      Let's first add it as data source:



      If you notice, we used the host IP address rather than localhost. This is due to the fact that Grafana runs in a different container than that of InfluxDB. Consequently, Grafana will only be able to communicate with the other container by using the host, such as by using --link when starting Grafana directly using the host IP if static. This topic is an advanced Docker setting whose explanation is out of the scope of this post (since you could do all steps without using Docker).

      Let's now see if we have data in this data source, by creating a new dashboard:


      Mind that in the top right it is written "Last 6 hours". This is the time interval for which data will be queried and displayed. Since our example data are older than that, let's click on "Last 5 years" to have the widest range possible.

      Let's then add a graph to show the h2o_temperature table as time serie:


      The tool shows the query being composed and sent to the server, and the received response.
      As visible (you can check an epoch converter), the time series starts in August and ends in September 2018. 


      Setting up collectd

      Collectd is a daemon designed to collect system and application metrics. Here are instructions for its download and setup. On my mac I just typed:
      brew install collectd

      I then checked the daemon configuration file, as in this example, available for unix-like systems at /usr/local/etc/collectd.conf. The file is already present after installation with default values.

      I uncommented the preamble, the plugin autoload and the CPU plugin. Since the default file is very long and a total mess of commented lines, I initially had a few errors to load the plugins, so simply decided to start from the example and change only minor things:

      Hostname "localhost"
      FQDNLookup false
      BaseDir "/usr/local/var/lib/collectd"
      PIDFile "/usr/local/var/run/collectd.pid"
      TypesDB "/usr/local/Cellar/collectd/5.8.0_2/share/collectd/types.db"

      AutoLoadPlugin true

      Interval 1

      MaxReadInterval 86400
      Timeout 2
      ReadThreads 5
      WriteThreads 5

      <Plugin cpu>
       ReportByCpu true
       ReportByState true
       ValuesPercentage true
      </Plugin>

      <Plugin load>
       ReportRelative true
      </Plugin>

      <Plugin memory>
       ValuesAbsolute true
       ValuesPercentage false
      </Plugin>

      <Plugin "network">
        Server "localhost" "25826"
      </Plugin>

      The idea is to use "AutoLoadPlugin true" to automatically load all plugins mentioned later on, namely the CPU, load, memory and network. I then opened the collectd log at its end (i.e., with tail -f /usr/local/var/log/collectd.log) and restarted the daemon on my MAC (i.e., with a sudo brew services restart collectd), which wrote:

      Exiting normally.
      collectd: Stopping 5 read threads.
      collectd: Stopping 5 write threads.
      plugin_load: plugin "cpu" successfully loaded.
      plugin_load: plugin "load" successfully loaded.
      plugin_load: plugin "memory" successfully loaded.
      plugin_load: plugin "network" successfully loaded.
      cpu plugin: Found 8 processors.
      Initialization complete, entering read-loop.

      As mentioned, collectd will send via UDP the values collected by those plugins to the Influxdb connector listening on port 25826. Mind that, however, our influxdb listener runs inside a docker container. We therefore need to add the 25826 port to those forwarded, and re-start the container. To this end, you can add -p 25826:25826/udp to your docker run command for influxdb (see above). 

      We also need to configure influxdb to listen on port 25826 and read incoming data in the collectd format. Let's then restart the container to forward 25826 (as UDP port, I spent 30 mins trying to understand why my data were not coming) and in interactive mode to skip the entrypoint (which would directly run the influxd daemon) and be able to do those changes to the files:

      docker run -it -p 8086:8086 -p 25826:25826/udp \
      -v $INFLUXDB_VOLUME:/var/lib/influxdb --entrypoint bash influxdb

      Now, another problem, there is no common file editor in this minimal docker container.
      Due modification can be done by appending an entry for collectd at the influxdb config file.  An example is provided here and here:


      We then copy the collectd type file, as this would not be otherwise available inside the container:
      1. create a folder inside the influxdb container
        root@98e6e2e6b05f:/# mkdir /usr/share/collectd/
      2. copy the type file in the folder just created
        docker cp /usr/local/Cellar/collectd/5.8.0_2/share/collectd/types.db 98e6e2e6b05f:/usr/share/collectd/types.db
      3. The type file is now available
        root@98e6e2e6b05f:/# ls /usr/share/collectd/
        types.db
      We now need to restart influxdb.
      Let's also check for incoming log entries. We can firstly open a bash session by running a docker exec -it c99a15189381 bash, where c99a15189381 is the id of the running container.  A tail -f /var/log/influxdb/influxd.log will open the log file at its end and will wait for updates.
      We can finally restart influxdb, by running influxd -config /etc/influxdb/influxdb.conf.
      The log will append something as follows, to show that the plugin are correctly sending data to Influxdb:

      2018-09-29T20:05:10.092937Z info Opened service {"log_id": "0AqjnlyW000", "service": "subscriber"}
      2018-09-29T20:05:10.093466Z info Starting monitor service {"log_id": "0AqjnlyW000", "service": "monitor"}
      2018-09-29T20:05:10.094688Z info Registered diagnostics client {"log_id": "0AqjnlyW000", "service": "monitor", "name": "build"}
      2018-09-29T20:05:10.095207Z info Registered diagnostics client {"log_id": "0AqjnlyW000", "service": "monitor", "name": "runtime"}
      2018-09-29T20:05:10.095832Z info Registered diagnostics client {"log_id": "0AqjnlyW000", "service": "monitor", "name": "network"}
      2018-09-29T20:05:10.096502Z info Registered diagnostics client {"log_id": "0AqjnlyW000", "service": "monitor", "name": "system"}
      2018-09-29T20:05:10.097546Z info Starting precreation service {"log_id": "0AqjnlyW000", "service": "shard-precreation", "check_interval": "10m", "advance_period": "30m"}
      2018-09-29T20:05:10.098361Z info Starting snapshot service {"log_id": "0AqjnlyW000", "service": "snapshot"}
      2018-09-29T20:05:10.099413Z info Starting continuous query service {"log_id": "0AqjnlyW000", "service": "continuous_querier"}
      2018-09-29T20:05:10.100837Z info Starting HTTP service {"log_id": "0AqjnlyW000", "service": "httpd", "authentication": false}
      2018-09-29T20:05:10.100964Z info opened HTTP access log {"log_id": "0AqjnlyW000", "service": "httpd", "path": "stderr"}
      2018-09-29T20:05:10.099867Z info Storing statistics {"log_id": "0AqjnlyW000", "service": "monitor", "db_instance": "_internal", "db_rp": "monitor", "interval": "10s"}
      2018-09-29T20:05:10.101958Z info Listening on HTTP {"log_id": "0AqjnlyW000", "service": "httpd", "addr": "[::]:8086", "https": false}
      2018-09-29T20:05:10.102082Z info Starting retention policy enforcement service {"log_id": "0AqjnlyW000", "service": "retention", "check_interval": "30m"}
      2018-09-29T20:05:10.102116Z info Starting collectd service {"log_id": "0AqjnlyW000", "service": "collectd"}
      2018-09-29T20:05:10.102199Z info Loading types from file {"log_id": "0AqjnlyW000", "service": "collectd", "path": "/usr/share/collectd/types.db"}
      2018-09-29T20:05:10.108573Z info Listening on UDP {"log_id": "0AqjnlyW000", "service": "collectd", "addr": "[::]:25826"}
      2018-09-29T20:05:10.111804Z info Listening for signals {"log_id": "0AqjnlyW000"}
      2018-09-29T20:05:10.114864Z info Sending usage statistics to usage.influxdata.com {"log_id": "0AqjnlyW000"}

      We can now use influx CLI to check if any data actually arrived:

      > use collectd_metrics
      Using database collectd_metrics
      > show measurements
      name: measurements
      name
      ----
      cpu_value
      load_longterm
      load_midterm
      load_shortterm
      memory_value

      Yeey! We have a new database and mesurements (aka tables) for those plugins we enabled on collectd.

      It is also possible to check the metadata in influxdb, to see the status of the collectd connector:
      > use _internal
      Using database _internal
      > select * from collectd;
      name: collectd
      time                batchesTx batchesTxFail bind   bytesRx droppedPointsInvalid hostname     pointsParseFail pointsRx pointsTx readFail
      ----                --------- ------------- ----   ------- -------------------- --------     --------------- -------- -------- --------
      1538188750000000000 0         0             :25826 0       0                    1a01eefffc74 0               0        0        0
      1538188760000000000 0         0             :25826 0       0                    1a01eefffc74 0               0        0        0
      ...
      1538255600000000000 12        0             :25826 198314  0                    c7e451341eac 0               4786     4694     0
      1538255610000000000 13        0             :25826 212863  0                    c7e451341eac 0               5130     5097     0
      1538255620000000000 14        0             :25826 230100  0                    c7e451341eac 0               5540     5475     0

      More detailed information on the status of the connectors can be retrieved using "stats":

      > show stats
      ...
      name: collectd
      tags: bind=:25826
      batchesTx batchesTxFail bytesRx droppedPointsInvalid pointsParseFail pointsRx pointsTx readFail
      --------- ------------- ------- -------------------- --------------- -------- -------- --------
      12        0             199631  0                    0               4815     4694     0

      Make sure to have a look at this blog post for a complete overview of stats.

      Let's now try to visualize any of our data, for instance CPU utilization has this format:
      > select * from cpu_value limit 100;
      name: cpu_value
      time                host      instance type    type_instance value
      ----                ----      -------- ----    ------------- -----
      1538255486864770000 localhost 7        percent system        4.62962962962963
      1538255486864771000 localhost 7        percent idle          93.51851851851852
      1538255486864771000 localhost 7        percent nice          0
      1538255487861407000 localhost 0        percent user          9
      1538255487861424000 localhost 0        percent system        35
      1538255487861463000 localhost 0        percent nice          0
      ....

      Let's see if we can see anything of this in Grafana now:


      Our data are there and visible in Grafana. Please refer to the official Grafana documentation for an overview of available panels. 

      The setup of the Collectd, InfluxDB and Grafana ends here. I hope I can discuss in a later post the dashboarding functionalities of Grafana, as well as better compare it to Kibana, for each one's purposes.

      Summary

      In this post, we recalled Google's Site Reliability Engineering approach to operating large scale distributed systems. We identified monitoring as fundamental aspect of the process, and consequently the need to scalable DBMSes to collect logs and metrics. We then discussed scalability limits of traditional transactional databases, given their consistency constraints, as well as introduced the CAP theorem to classify modern NoSQL technologies. From there, we followed two branches: i) the setup of the FileBeats + ElasticSearch + Kibana stack to collect and analyze logs, and ii) the setup of the collectd + InfluxDB + Grafana stack to collect and analyze metrics.

      While this only scratches the surface of the world of daemons and formats for the collection of system metrics and logs, I hope it can be of help to anybody to get started with this topic.

      Have fun!
      Andrea

      No comments:

      Post a Comment