Apache Spark เก็บข้อมูลเข้า Elasticsearch

Apache Spark เป็นระบบประมวลแบบ cluster ซึ่งมี API ให้เราใช้ได้หลายภาษาอยู่เหมือนกัน ขึ้นอยู่กับความถนัดของเราเลย ไม่ว่าจะเป็น JAVA, Scala, Python, R และมีการปรับปรุงให้รองรับ Graph processing ด้วย

Apache Spark มีส่วนประกอบให้เราได้เลือกใช้คือ
1. Spark SQL สำหรับ SQL และ structured data
2. MLlib สำหรับ Machine Learning
3. GraphX สำหรับ Graph processing
4. Spark Streaming

สำหรับบทความนี้จะใช้ Spark Streaming ในการเตรียมข้อมูลที่จะเก็บใน Elasticsearch และตัวอย่างยอดนิยม นับจำนวนคำ (WordCount) ทั้ง Apache Spark และ Elasticsearch ติดตั้งอยู่ที่เครื่องผมเองนะครับ (localhost)

เตรียมพร้อมด้วยการสร้าง SparkContext

from pyspark import SparkContext<br />
from pyspark.streaming import StreamingContext</p>
<p># Create a local StreamingContext with two working thread and batch interval of 1 second<br />
sc = SparkContext(&quot;local[2]&quot;, &quot;NetworkWordCount&quot;)<br />
ssc = StreamingContext(sc, 1)

อ่านข้อมูลจากไฟล์ data.txt

text_file = sc.textFile('data.txt')

จากนั้นก็มาจัดการข้อมูลที่ได้จากไฟล์ data.txt

# map each line to its words<br />
words = text_file.flatMap(lambda line: line.split())</p>
<p># emit value:1 for each key:word<br />
word_map = words.map(lambda word: (word, 1))</p>
<p># add up word counts by key:word<br />
word_counts = word_map.reduceByKey(lambda a, b: a+b)</p>
<p># เตรียมข้อมูลสำหรับ Elasticsearch<br />
es_input = word_counts.map(lambda word: ('key',<br />
    {&quot;word&quot;: word[0], &quot;count&quot;: word[1]}))

เก็บข้อมูลเข้าไปที่ Elasticsearch

 es_input.saveAsNewAPIHadoopFile(<br />
    path='-',<br />
    outputFormatClass=&quot;org.elasticsearch.hadoop.mr.EsOutputFormat&quot;,<br />
    keyClass=&quot;org.apache.hadoop.io.NullWritable&quot;,<br />
    valueClass=&quot;org.elasticsearch.hadoop.mr.LinkedMapWritable&quot;,<br />
    conf={ &quot;es.resource&quot; : &quot;data/word_counts&quot; })

รวมแต่ละส่วนของโค้ดไว้ด้วยกันซะหน่อย

#! -*- coding: utf-8 -*-<br />
from pyspark import SparkContext<br />
from pyspark.streaming import StreamingContext</p>
<p># Create a local StreamingContext with two working thread and batch interval of 1 second<br />
sc = SparkContext(&quot;local[2]&quot;, &quot;NetworkWordCount&quot;)<br />
ssc = StreamingContext(sc, 1)</p>
<p>text_file = sc.textFile('data.txt')</p>
<p>words = text_file.flatMap(lambda line: line.split())<br />
word_map = words.map(lambda word: (word, 1))<br />
word_counts = word_map.reduceByKey(lambda a, b: a+b)<br />
es_input = word_counts.map(lambda word: ('key',<br />
    {&quot;word&quot;: word[0], &quot;count&quot;: word[1]}))</p>
<p>es_input.saveAsNewAPIHadoopFile(<br />
    path='-',<br />
    outputFormatClass=&quot;org.elasticsearch.hadoop.mr.EsOutputFormat&quot;,<br />
    keyClass=&quot;org.apache.hadoop.io.NullWritable&quot;,<br />
    valueClass=&quot;org.elasticsearch.hadoop.mr.LinkedMapWritable&quot;,<br />
    conf={ &quot;es.resource&quot; : &quot;data/word_counts&quot; })

พอเตรียมโค้ดเสร็จก็ถึงเวลารันคำสั่งครับ แต่ก่อนอื่นต้องดาวน์โหลด Elasticsearch Hadoop ก่อนนะครับ

./bin/spark-submit --master local[4] --jars jars/elasticsearch-hadoop-2.1.0.jar word_counts.py

ขอให้สนุกกับ Apache Spark นะครับ ^^

แหล่งข้อมูล:
– http://spark.apache.org/docs/latest/streaming-programming-guide.html