Apache Spark เก็บข้อมูลเข้า Elasticsearch

Apache Spark เป็นระบบประมวลแบบ cluster ซึ่งมี API ให้เราใช้ได้หลายภาษาอยู่เหมือนกัน ขึ้นอยู่กับความถนัดของเราเลย ไม่ว่าจะเป็น JAVA, Scala, Python, R และมีการปรับปรุงให้รองรับ Graph processing ด้วย

Apache Spark มีส่วนประกอบให้เราได้เลือกใช้คือ
1. Spark SQL สำหรับ SQL และ structured data
2. MLlib สำหรับ Machine Learning
3. GraphX สำหรับ Graph processing
4. Spark Streaming

สำหรับบทความนี้จะใช้ Spark Streaming ในการเตรียมข้อมูลที่จะเก็บใน Elasticsearch และตัวอย่างยอดนิยม นับจำนวนคำ (WordCount) ทั้ง Apache Spark และ Elasticsearch ติดตั้งอยู่ที่เครื่องผมเองนะครับ (localhost)

เตรียมพร้อมด้วยการสร้าง SparkContext

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

อ่านข้อมูลจากไฟล์ data.txt

text_file = sc.textFile('data.txt')

จากนั้นก็มาจัดการข้อมูลที่ได้จากไฟล์ data.txt

# map each line to its words
words = text_file.flatMap(lambda line: line.split())

# emit value:1 for each key:word
word_map = words.map(lambda word: (word, 1))

# add up word counts by key:word
word_counts = word_map.reduceByKey(lambda a, b: a+b)

# เตรียมข้อมูลสำหรับ Elasticsearch
es_input = word_counts.map(lambda word: ('key',
    {"word": word[0], "count": word[1]}))

เก็บข้อมูลเข้าไปที่ Elasticsearch

 es_input.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={ "es.resource" : "data/word_counts" })

รวมแต่ละส่วนของโค้ดไว้ด้วยกันซะหน่อย

#! -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

text_file = sc.textFile('data.txt')

words = text_file.flatMap(lambda line: line.split())
word_map = words.map(lambda word: (word, 1))
word_counts = word_map.reduceByKey(lambda a, b: a+b)
es_input = word_counts.map(lambda word: ('key',
    {"word": word[0], "count": word[1]}))

es_input.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={ "es.resource" : "data/word_counts" })

พอเตรียมโค้ดเสร็จก็ถึงเวลารันคำสั่งครับ แต่ก่อนอื่นต้องดาวน์โหลด Elasticsearch Hadoop ก่อนนะครับ

./bin/spark-submit --master local[4] --jars jars/elasticsearch-hadoop-2.1.0.jar word_counts.py

ขอให้สนุกกับ Apache Spark นะครับ ^^

แหล่งข้อมูล:
– http://spark.apache.org/docs/latest/streaming-programming-guide.html