Modius - Techblog

  • Ansible
  • Docker
  • DevOps
  • Gastautor werden
  • Newsletter abonnieren
  • Über Mich
  • Kontakt

Elasticsearch – Laden von Daten mit Python

Veröffentlicht am 20. März 2019 von Christian Piazzi Hinterlasse ein Kommentar , Aktualisiert am 19. März 2019
Geschätzte Lesezeit: 59 Sekunden

Bereits in den den letzten Artikel habe ich mich mit dem Installieren von Elasticsearch und Kibana beschäftigt. Heute will ich mir mal anschauen, wie man Daten in Elasticsearch importieren kann. Ziel ist es das ganze mit Python zu realisieren.

Als Datensatz nehme ich Sensordaten von luftdaten.info. Hier habe ich mir aus dem Archiv die Daten für den SDS011 un den DHT22 Sensor heruntergeladen und entpackt. Das sind in Summe ungefähr 11 GB.

Implementieren des Python import Scripts

Wir starten damit, eine paar Libraries zu importieren und ein paar Variablen für die Ordner mit den CSV Dateien festzulegen.

1
2
3
4
5
6
7
8
9
import csv
import sys
import os
from elasticsearch import Elasticsearch
 
currentDirectory = os.path.dirname(os.path.realpath(__file__))
 
dht22 = currentDirectory + '/dht22/'
sds011 = currentDirectory + '/sds011/'

Im nächsten Schritt wird die Klasse zum Importieren der Daten implementiert. Ich habe das ganze extra in eine eigene Klasse gepackt, um das ganze später auch in größeren Scripten verwenden zu können.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class ElasticSearchImporter(object):
  def importToDb(self, directory, fileName, indexDbName, indexType="default"):
    csv.field_size_limit(sys.maxsize)
    es = Elasticsearch(['http://192.168.10.107:9200/'])
    if not es.ping():
      raise ValueError("Connection failed")
    headers = []
    index = 0
 
    f = open(directory + fileName, 'rt')
    reader = csv.reader(f)
 
    try:
      for row in reader:
        try:
          if (index == 0):
            headers = row
          else:
            obj = {}
            for i, val in enumerate(row):
              obj[headers[i]] = val
 
            es.index(index=indexDbName, doc_type=indexType, body=obj)
 
        except Exception as e:
          print(index)
          print(e)
 
        index = index + 1
    except:
      print('Loading Error')
 
    if not f.closed:
      f.close()

Im wesentlichen definiere ich in der Klassen den Elasticsearch Server und iteriere über die CSV Datei um diese nach Elasticsearch zu übergeben.

Ganz am Ende rufe ich dann die Funktion der Klasse auf und übergebe die entsprechenden Parameter für die CSV Dateien und die indexDB auf Elastiscsearch Seite.

1
2
3
importer = ElasticSearchImporter()
importer.importToDb(dht22,"2019-01_dht22.csv", "dht22", indexType="default")
importer.importToDb(sds011,"2019-01_sds011.csv", "sds011", indexType="default")

Hier nun noch mal den kompletten Quellcode zum Kopieren.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import csv
import sys
import os
from elasticsearch import Elasticsearch
 
currentDirectory = os.path.dirname(os.path.realpath(__file__))
 
dht22 = currentDirectory + '/dht22/'
sds011 = currentDirectory + '/sds011/'
 
class ElasticSearchImporter(object):
  def importToDb(self, directory, fileName, indexDbName, indexType="default"):
    csv.field_size_limit(sys.maxsize)
    es = Elasticsearch(['http://192.168.10.107:9200/'])
    if not es.ping():
      raise ValueError("Connection failed")
    headers = []
    index = 0
 
    f = open(directory + fileName, 'rt')
    reader = csv.reader(f)
 
    try:
      for row in reader:
        try:
          if (index == 0):
            headers = row
          else:
            obj = {}
            for i, val in enumerate(row):
              obj[headers[i]] = val
 
            es.index(index=indexDbName, doc_type=indexType, body=obj)
 
        except Exception as e:
          print(index)
          print(e)
 
        index = index + 1
    except:
      print('Loading Error')
 
    if not f.closed:
      f.close()
 
 
importer = ElasticSearchImporter()
importer.importToDb(dht22,"2019-01_dht22.csv", "dht22", indexType="default")
importer.importToDb(sds011,"2019-01_sds011.csv", "sds011", indexType="default")

Prüfen des Uploads zu Elasticsearch

Ob die Daten in Elasticsearch angekommen sind, kann mit einer Anfrage via curl geprüft werden.

1
2
3
4
5
6
7
8
curl -XGET 192.168.10.107:9200/_cat/indices?v
health status index                        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
red    open   .kibana_1                    aeBnd403TWuFYIRtsyE5Lg   1   0
red    open   dht22                        _OuocHXxRsO-BgA03zGQ4Q   5   1
red    open   kibana_sample_data_logs      4W_9kGB_Qfq7Gzbe3PSJvQ   1   0
red    open   kibana_sample_data_flights   472QoOYpSKSplvrNBbASVw   1   0
red    open   sds011                       kIycXUzBTf2GlqxRIAkvvg   5   1
red    open   kibana_sample_data_ecommerce Gq61wDHEQYC-Sxqm-vn0jg   1   0

Wenn alles geklappt hat, dann haben wir hier jetzt Einträge für dht22 und sds011.

Kategorie: Big Data Tags: data load, Elasticsearch, python

Über Christian Piazzi

Ich blogge hier über alles, was mir so in meinem ITler Altag über den Weg läuft =)
Man findet mich privat bei Google+ und Twitter

Schreibe einen Kommentar Antworten abbrechen

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Kategorien

  • Linux
  • Internet, Bloggen & Co
  • Programmierung
  • Sicherheit
  • Netzwerk & Co
  • Mikrokontroller
  • Windows

Neueste Kommentare

  • Prometheus Installation unter Ubuntu - Modius - Techblog bei Prometheus Installation unter CentOS
  • Rainer bei Docker Container – anzeigen, starten, stoppen und löschen
  • Rainer Wohlfarth bei Docker Container – anzeigen, starten, stoppen und löschen
  • Rainer Wohlfarth bei Docker Container – anzeigen, starten, stoppen und löschen
  • Rainer Wohlfarth bei Docker Container – anzeigen, starten, stoppen und löschen

Werbung

Archive

Kontakt, Datenschutz und Impressum

  • Kontakt
  • Datenschutz
  • Impressum

Schlagwörter

Anleitung Ansible Apache Apple App Store Automatisierung Blogparade C++ Centos centos 7 CentOS7 Container Datenbank DevOps Docker Dr. Racket Dr. Scheme funktional Gastartikel Google HowTo httpd Icinga2 Icinga 2 Installation itsm Linux Minecraft Monitoring mooc MySQL owncloud PHP Plugin Programmierung python Raspberry Pi Schritt für Schritt Server Sicherheit Tutorial Ubuntu Update Windows Wordpress

Copyright © 2025 · Outreach Pro on Genesis Framework · WordPress · Anmelden