Implementation of Clustering Algorithm in MapReduce
K-Means Clustering:
- ·
K-means
clustering is a classical clustering algorithm that uses an expectation
maximization
like technique to partition a
number of data points into k clusters.
- ·
K-means
clustering is commonly used for a number of classification applications.
Because
k-means is run on such large data
sets, and because of certain characteristics of the
algorithm, it is a good candidate
for parallelization.
- ·
The
goal of this project was to implement a framework in java for performing
k-means
clustering using Hadoop MapReduce.
- ·
In
this problem, we have considered inputs a set of n 1-dimensional points and
desired
clusters of size 3.
- ·
Once
the k initial centers are chosen, the distance is calculated(Euclidean
distance) from
every point in the set to each of
the 3 centers & point with the corresponding center is
emitted by the mapper. Reducer
collect all of the points of a particular centroid and calculate a new centroid and emit.
- ·
Termination
Condition:When difference between old and new centroid is less than or equal
to 0.1
Algorithm:
Step1:
Initially randomly centroid is selected based on data. In our implementation we
used 3
centroids.
Step2: The
Input file contains initial centroid and data.
Step3: In
Mapper class "configure" function is used to first open the
file and read the centroids and
store in
the data structure(we used ArrayList)
Step4:
Mapper read the data file and emit the nearest centroid with the point to the
reducer.
Step5:
Reducer collect all this data and calculate the new corresponding centroids and
emit.
Step6: In
the job configuration, we are reading both files and checking if difference
between old and
new
centroid is less than 0.1 then convergence is reached
else
repeat step 2 with new centroids.
● Samples
● For
Centroid, this should be fine:
20.0
30.0
40.0
● For data
something like this simple should work:
20
23
19
29
33
29
43
35
18
25
27
package
ClusterPackage;
/*public
class Cluster {
}*/
import
java.io.IOException;
import
java.util.*;
import
java.io.*;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.filecache.DistributedCache;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.*;
import
org.apache.hadoop.mapred.*;
import
org.apache.hadoop.mapred.Reducer;
@SuppressWarnings("deprecation")
public
class KMeans {
public
static String OUT = "outfile";
public static
String IN = "inputlarger";
public
static String CENTROID_FILE_NAME = "/centroid.txt";
public
static String OUTPUT_FILE_NAME = "/part-00000";
public
static String DATA_FILE_NAME = "/data.txt";
public
static String JOB_NAME = "KMeans";
public
static String SPLITTER = "\t| ";
public
static List<Double> mCenters = new
ArrayList<Double>();
/*
* In
Mapper class we are overriding configure function. In this we are
*
reading file from Distributed Cache and then storing that into instance
*
variable "mCenters"
*/
public
static class Map extends MapReduceBase implements
Mapper<LongWritable,
Text, DoubleWritable, DoubleWritable> {
@Override
public
void configure(JobConf job) {
try {
//
Fetch the file from Distributed Cache Read it and store the
//
centroid in the ArrayList
Path[]
cacheFiles = DistributedCache.getLocalCacheFiles(job);
if
(cacheFiles != null && cacheFiles.length > 0) {
String
line;
mCenters.clear();
BufferedReader
cacheReader = new BufferedReader(
new
FileReader(cacheFiles[0].toString()));
try {
// Read
the file split by the splitter and store it in
// the
list
while
((line = cacheReader.readLine()) != null) {
String[]
temp = line.split(SPLITTER);
mCenters.add(Double.parseDouble(temp[0]));
}
}
finally {
cacheReader.close();
}
}
} catch
(IOException e) {
System.err.println("Exception
reading DistribtuedCache: " + e);
}
}
/*
* Map
function will find the minimum center of the point and emit it to
* the
reducer
*/
@Override
public
void map(LongWritable key, Text value,
OutputCollector<DoubleWritable,
DoubleWritable> output,
Reporter
reporter) throws IOException {
String
line = value.toString();
double
point = Double.parseDouble(line);
double
min1, min2 = Double.MAX_VALUE, nearest_center = mCenters
.get(0);
// Find
the minimum center from a point
for
(double c : mCenters) {
min1 =
c - point;
if
(Math.abs(min1) < Math.abs(min2)) {
nearest_center
= c;
min2 =
min1;
}
}
// Emit
the nearest center and the point
output.collect(new
DoubleWritable(nearest_center),
new
DoubleWritable(point));
}
}
public
static class Reduce extends MapReduceBase implements
Reducer<DoubleWritable,
DoubleWritable, DoubleWritable, Text> {
/*
*
Reduce function will emit all the points to that center and calculate
* the
next center for these points
*/
@Override
public
void reduce(DoubleWritable key, Iterator<DoubleWritable> values,
OutputCollector<DoubleWritable,
Text> output, Reporter reporter)
throws
IOException {
double
newCenter;
double
sum = 0;
int
no_elements = 0;
String
points = "";
while
(values.hasNext()) {
double
d = values.next().get();
points
= points + " " + Double.toString(d);
sum =
sum + d;
++no_elements;
}
// We
have new center now
newCenter
= sum / no_elements;
// Emit
new center and point
output.collect(new
DoubleWritable(newCenter), new Text(points));
}
}
public
static void main(String[] args) throws Exception {
run(args);
}
public
static void run(String[] args) throws Exception {
IN =
args[0];
OUT =
args[1];
String
input = IN;
String
output = OUT + System.nanoTime();
String
again_input = output;
//
Reiterating till the convergence
int
iteration = 0;
boolean
isdone = false;
while
(isdone == false) {
JobConf
conf = new JobConf(KMeans.class);
if
(iteration == 0) {
Path
hdfsPath = new Path(input + CENTROID_FILE_NAME);
//
upload the file to hdfs. Overwrite any existing copy.
DistributedCache.addCacheFile(hdfsPath.toUri(),
conf);
} else
{
Path
hdfsPath = new Path(again_input +
OUTPUT_FILE_NAME);
//
upload the file to hdfs. Overwrite any existing copy.
DistributedCache.addCacheFile(hdfsPath.toUri(),
conf);
}
conf.setJobName(JOB_NAME);
conf.setMapOutputKeyClass(DoubleWritable.class);
conf.setMapOutputValueClass(DoubleWritable.class);
conf.setOutputKeyClass(DoubleWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,
new
Path(input + DATA_FILE_NAME));
FileOutputFormat.setOutputPath(conf,
new Path(output));
JobClient.runJob(conf);
Path
ofile = new Path(output + OUTPUT_FILE_NAME);
FileSystem
fs = FileSystem.get(new Configuration());
BufferedReader
br = new BufferedReader(new InputStreamReader(
fs.open(ofile)));
List<Double>
centers_next = new ArrayList<Double>();
String
line = br.readLine();
while
(line != null) {
String[]
sp = line.split("\t| ");
double
c = Double.parseDouble(sp[0]);
centers_next.add(c);
line =
br.readLine();
}
br.close();
String
prev;
if
(iteration == 0) {
prev =
input + CENTROID_FILE_NAME;
} else
{
prev =
again_input + OUTPUT_FILE_NAME;
}
Path prevfile
= new Path(prev);
FileSystem
fs1 = FileSystem.get(new Configuration());
BufferedReader
br1 = new BufferedReader(new InputStreamReader(
fs1.open(prevfile)));
List<Double>
centers_prev = new ArrayList<Double>();
String
l = br1.readLine();
while
(l != null) {
String[]
sp1 = l.split(SPLITTER);
double
d = Double.parseDouble(sp1[0]);
centers_prev.add(d);
l =
br1.readLine();
}
br1.close();
// Sort
the old centroid and new centroid and check for convergence
//
condition
Collections.sort(centers_next);
Collections.sort(centers_prev);
Iterator<Double>
it = centers_prev.iterator();
for
(double d : centers_next) {
double
temp = it.next();
if
(Math.abs(temp - d) <= 0.1) {
isdone
= true;
} else
{
isdone
= false;
break;
}
}
++iteration;
again_input
= output;
output
= OUT + System.nanoTime();
}
}
}
OUTPUT:
1.0 1.0
3.0 2.0 3.0 4.0
7.5 5.0 6.0 7.0 8.0 9.0 10.0
1.5 1.0 2.0
4.0 3.0 4.0 5.0
8.0 6.0 7.0 8.0 9.0 10.0
2.0 1.0 2.0 3.0
5.0 4.0 5.0 6.0
8.5 7.0 8.0 9.0 10.0
2.0 1.0 2.0 3.0
5.0 4.0 5.0 6.0
8.5 7.0 8.0 9.0 10.0
1.0 1.0
2.0 2.0
6.5 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
1.5 1.0 2.0
4.5 3.0 4.0 5.0 6.0
8.5 7.0 8.0 9.0 10.0
Comments
Post a Comment