Click here to Skip to main content
15,887,027 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
The output from the values iterator in the reduce method shows all point as zeros.Is there any flaw in the reduce method?

import java.io.IOException;
import java.util.*;
import java.io.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

@SuppressWarnings("deprecation")
public class KMEANS {
	public static String OUT = "OUT";
	public static String IN = "IN";
	public static String CENTROID_FILE_NAME = "/centroid.txt";
	public static String OUTPUT_FILE_NAME = "/part-00000";
	public static String DATA_FILE_NAME = "/data.txt";
	public static String JOB_NAME = "KMeans";
	public static String SPLITTER = "\t| ";
	public static List<Point> mCenters = new ArrayList<Point>();

	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, DoubleWritable, Point> {
		@Override
		public void configure(JobConf job) {
			// System.out.println("Second");
			try {
				// Fetch the file from Distributed Cache Read it and store the
				// centroid in the ArrayList
				Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job);
				if (cacheFiles != null && cacheFiles.length > 0) {
					String line;
					mCenters.clear();
					BufferedReader cacheReader = new BufferedReader(
							new FileReader(cacheFiles[0].toString()));
					try {
						while ((line = cacheReader.readLine()) != null) {
							String[] temp = line.split(SPLITTER);
							String[] temp2=temp[0].split(",");
							mCenters.add(new Point(
									Double.parseDouble(temp2[0]), Double
											.parseDouble(temp2[1]), Double
											.parseDouble(temp2[2])));
							// System.out.println(mCenters.get(0).toString());
						}
					} finally {
						cacheReader.close();
					}
				}
			} catch (IOException e) {
				System.err.println("Exception reading DistribtuedCache: " + e);
			}
		}

		public void map(LongWritable key, Text value,
				OutputCollector<DoubleWritable, Point> output,
				Reporter reporter) throws IOException {
			String line = value.toString();
			String temp[] = line.split(",");
			Point point = new Point(Double.parseDouble(temp[0]),
					Double.parseDouble(temp[1]), Double.parseDouble(temp[2]));
			// System.out.println(point.toString());
			double min1, min2 = Double.MAX_VALUE;
			Point nearest_center = mCenters
					.get(0);
			// Find the minimum center from a point
			for (Point c : mCenters) {
				min1 = c.z - point.z;
				if (Math.abs(min1) < Math.abs(min2)) {
					nearest_center = c;
					min2 = min1;
				}
			}
			// Emit the nearest center and the point
			output.collect(new DoubleWritable(nearest_center.z), new Point(
					point));
		}
	}

	public static class Reduce extends MapReduceBase implements
			Reducer<DoubleWritable, Point, Point, Text> {

		public void reduce(DoubleWritable key, Iterator<Point> values,
				OutputCollector<Point, Text> output, Reporter reporter)
				throws IOException {
			Point newCenter = new Point(0, 0, 0);
			Point sum = new Point(0, 0, 0);
			int no_elements = 0;
			String points = "";
			while (values.hasNext()) {
				Point d = values.next().get();
				points = points + " " + d.toString();
				sum.z = sum.z + d.z;
				sum.x = sum.x + d.x;
				sum.y = sum.y + d.y;
				++no_elements;
			}
			// Find new center
			newCenter.z = sum.z / no_elements;
			newCenter.x = sum.x / no_elements;
			newCenter.y = sum.y / no_elements;

			// Emit new center and point
			output.collect(new Point(newCenter), new Text(points));
		}
	}

	public static void main(String[] args) throws Exception {
		run();
	}

	public static void run() throws Exception {
		IN = "IN";
		OUT = "OUT";
		String input = IN;
		String output = OUT;
		String again_input = output;
		boolean isdone = false;
		int iteration = 0;
		// Reiterating till the convergence
		while (isdone == false) {
			JobConf conf = new JobConf(KMEANS.class);
			if (iteration == 0) {
				Path hdfsPath = new Path(input + CENTROID_FILE_NAME);
				// upload the file to hdfs. Overwrite any existing copy.
				DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
			} else {
				Path hdfsPath = new Path(again_input + OUTPUT_FILE_NAME);
				// upload the file to hdfs. Overwrite any existing copy.
				DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
			}
			conf.setJobName(JOB_NAME);
			conf.setMapOutputKeyClass(DoubleWritable.class);
			conf.setMapOutputValueClass(Point.class);
			conf.setOutputKeyClass(Point.class);
			conf.setOutputValueClass(Text.class);
			conf.setMapperClass(Map.class);
			conf.setReducerClass(Reduce.class);
			conf.setInputFormat(TextInputFormat.class);
			conf.setOutputFormat(TextOutputFormat.class);

			FileInputFormat.setInputPaths(conf,
					new Path(input + DATA_FILE_NAME));
			FileOutputFormat.setOutputPath(conf, new Path(output));

			JobClient.runJob(conf);
			// System.out.println("First");
			Path ofile = new Path(output + OUTPUT_FILE_NAME);
			FileSystem fs = FileSystem.get(new Configuration());
			BufferedReader br = new BufferedReader(new InputStreamReader(
					fs.open(ofile)));
			List<Point> centers_next = new ArrayList<Point>();
			String line = br.readLine();
			while (line != null) {
				String[] sp = line.split("\t| ");
				String[] temp = sp[0].split(",");
				Point c = new Point(Double.parseDouble(temp[0]),
						Double.parseDouble(temp[1]),
						Double.parseDouble(temp[2]));
					centers_next.add(c);

				line = br.readLine();
			}
			br.close();

			String prev;
			if (iteration == 0)
				prev = input + CENTROID_FILE_NAME;
			else
				prev = again_input + OUTPUT_FILE_NAME;
			Path prevfile = new Path(prev);
			FileSystem fs1 = FileSystem.get(new Configuration());
			BufferedReader br1 = new BufferedReader(new InputStreamReader(
					fs1.open(prevfile)));
			List<Point> centers_prev = new ArrayList<Point>();
			String l = br1.readLine();
			while (l != null) {
				String[] sp1 = l.split(SPLITTER);
				String temp[] = sp1[0].split(",");
				Point d = new Point(Double.parseDouble(temp[0]),
						Double.parseDouble(temp[1]),
						Double.parseDouble(temp[2]));
				centers_prev.add(d);
				l = br1.readLine();
			}
			br1.close();

			// Sort the old centroid and new centroid and check for convergence
			// condition
			List<Double> prev_z = new ArrayList<Double>();
			List<Double> next_z = new ArrayList<Double>();
			for (int i = 0; i < centers_next.size(); i++) {
				prev_z.add(centers_prev.get(i).z);
				next_z.add(centers_next.get(i).z);
			}

			Collections.sort(prev_z);
			Collections.sort(next_z);
			Iterator<Point> it = centers_prev.iterator();
			for (Point d : centers_next) {
				double temp = it.next().z;
				if (Math.abs(temp - d.z) <= 0.1) {
					isdone = true;
				} else {
					isdone = false;
					break;
				}
			}
			++iteration;
			again_input = output;
			output = OUT + System.nanoTime();
			// System.out.println("Third");
			// isdone = true;
		}
	}
}


//This is the code for point class that represents a 3D point.

C#
public class Point {
    public double x;
    public double y;
    public double z;

    public Point(double x, double y, double z) {
        this.x = x;
        this.y = y;
        this.z = z;
    }

    public Point(Point a) {
        this.x = a.x;
        this.y = a.y;
        this.z = a.z;
    }

    Point get() {
        return this;
    }
    public String toString() {
        return this.x + "," + this.y + "," + this.z;
    }

}
Posted
Updated 5-Apr-14 19:29pm
v3

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900