#!/usr/local/bin/newlisp

; mapreduce - requires newLISP v.10.x - L.M. March 2007
;
; v.1.22 tested w/o changes on 10.3.3 (updated doc and utput)
; v.1.23 tested on 10.7.4 (updated newlisp executable path)
;
; MapReduce - example for distributed computing using a map-reduce style
; algorithm implemented in newLISP using the function 'bayes-train'
; for word counting and 'net-eval' for mapping the word counting task
; onto different nodes in a cluster. 'bayes-train' is normally used in
; conjunction with 'bayes-query' for calculating Bayesian probabilities
; of token sequences to occur in multiple data sets, but it can also be 
; used as a fast token counting function for individual data sets.
;
; See also from Google Labs: 
;   http://labs.google.com/papers/mapreduce.html
;   http://216.239.37.132/papers/mapreduce-osdi04.pdf
;
; In map-reduce terminology the documents analyzed in the program below 
; correspond to the 'keys' and the dictionary extracted for each document
; to the 'value'. The reduce step consists on reducing the multiple word 
; dictionaries to one dictionary with consolidated word counts.
;
; The result of the calulations will be in a file Totals.lsp in the
; working directory where this program was started. A directory called:
; "mrdemo" should be present in the working directory of the server
; node(s). This directory will contain node word counts for each
; node. The reducing master node will load the dictionaries from here
; via HTTP.
;
; For a demo all node processes and the master/reducer process can
; be run on the same CPU. This still cuts the total time in half
; compared to sequential processing, because of better CPU usage in
; a multi processing operating system.
;
; See also http://newlisp.org/CodePatterns.html#distributed for more
; details on configuring server nodes.

; The documents to aquire

; make program compatible with older versions of newLISP
(when (< (sys-info -2) 10111)
    (constant (global 'term) name))

;(define remote true) ; retrieve doc from http://www.gutenberg.org/
(define remote false) ; retrieve docs from a local harddisk

(if remote
	(set 'docs '(
		; A Comedy of Masks - Ernest Dowson and Arthur Moore, 547KB
		"http://www.gutenberg.org/files/16703/16703.txt"  
		; The Adventures of Sherlock Holmes - Conan Doyle, 576KB
		"http://www.gutenberg.org/cache/epub/1661/pg1661.txt"
		; The tale of Beowulf - anonymous, 219KB
		"http://www.gutenberg.org/files/20431/20431-8.txt" 
		))

	; when running local copies of the text files, then place them
	; in a subdirectory 'mrdemo' of the current working directory
	(set 'docs '(
		"mrdemo/Comedy.txt"
		"mrdemo/Sherlock.txt"
		"mrdemo/Beowulf.txt"
	))
)

; The three nodes may run on different CPUs or all on the same CPU,
; either in 'newlisp -c -w <workdir>' mode running as an inetd or xinetd
; service or in 'newlisp -c -d <port> -w <workdir>' mode, where <port> 
; is a port number and <workdir> is the directory containing 'mrdemo'.
; When running inetd or xinetd nodes on the same CPU, the port numbers
; can be the same. For each request a new process will be started by
; (x)inetd. When running on different computers each node would have 
; a different IP address or hostname. 
;
; the following configuration is for 3 xinetd/inetd nodes on the same
; CPU as the master reducer node.

;(set 'nodes '(
;	("localhost" 4711)
;	("localhost" 4712)
;	("localhost" 4713)
;))

; This configuration would be 'newlisp -c -d <port> -w <workdir>' type
; proccesses started either on the same or different CPUs. Here shown
; for the same CPU. For remote machines specify different host names
; or IP numbers, the port numbers then can be identical. A directory
; 'mrdemo' should be created in <workdir>. The '-w <workdir>' spec can
; be omitted when placing 'mrdemo' in the the directory where the nodes
; are started, i.e.:
;    newlisp -c -d 10001 -w /Users/newlisp
;

(set 'pid1 (process "/usr/local/bin/newlisp -c -d 10001 -w /Users/lutz/Sites"))
(set 'pid2 (process "/usr/local/bin/newlisp -c -d 10002 -w /Users/lutz/Sites"))
(set 'pid3 (process "/usr/local/bin/newlisp -c -d 10003 -w /Users/lutz/Sites"))
(sleep 1000)
(println "started servers -> " pid1 " " pid2 " " pid3)

(set 'nodes '(
	("localhost" 10001)
	("localhost" 10002)
	("localhost" 10003)
))

; The worker node program could be loaded by a node during startup, but

; the program with the request. In most caseas the newLISP startup
; and program load time is very small compared to the node processing
; time. 

(set 'task [text]
(local (url id start text)
	(setq WORDS:total '(0))
	(set 'url "%s")
	(set 'id %d)
	(set 'start (time-of-day))
	(set 'text (lower-case (read-file url)))
	(set 'text (parse text "[^a-z]+" 0))
	(bayes-train text 'WORDS)
	(save (string "mrdemo/words-" id ".lsp") 'WORDS)

	; return a list of node id, doc, words proccessed and time spent
	(list 	(list 'node id) 
		(list 'doc (last (parse url "/")))
		(list 'words (WORDS:total 0))
		(list 'ms  (- (time-of-day) start)) 
	)
)
[/text])

; the idle procedure is called when a node has finished processing

(define (idle-proc param)
	(if param (println param)))


; map the counting task onto the nodes using 'net-eval'.
; wait a maximum of 30 seconds for all nodes to finish.

(net-eval (list
	(list (nodes 0 0) (nodes 0 1) (format task (docs 0) 0) ) 
	(list (nodes 1 0) (nodes 1 1) (format task (docs 1) 1) )
	(list (nodes 2 0) (nodes 2 1) (format task (docs 2) 2) )
	) 30000 idle-proc)

(println)
; reduce procedure to consolidate all node name spaces in to one 
; Totals name space

(define (reduce space totals)
    (dotree (w space)
        (let (prior (sym (term w) totals))
            (if (not (eval prior))
                (set prior (eval w))
                (setf ((eval prior) 0) (+ $it (first (eval w))))
            )
        )
    )
)

(dotimes (n (length nodes))
	(set 'url (string "http://" (nodes n 0) ":" (nodes n 1) "/mrdemo/words-" n ".lsp"))
	(println "reduce from " url)
	(load url)
	(reduce WORDS 'Totals)
	; zero out for next node aquired
	(dotree (w WORDS) (set w '(0)))
)

(println "Saving Totals")
(save "Totals.lsp" 'Totals)

(println "destroying processes ...")
(destroy pid1)
(destroy pid2)
(destroy pid3)

(exit)

; On a MacOS X  10.6 MacMini Intel 1.834 Ghz Core 2 Duo he following 
; output is generated when using local doc files (slightly different 
; from the remote versions).
;
; ~> ./mapreduce 
; started servers -> 28402 28403 28404
; ("localhost" 10003 ((node 2) (doc "Beowulf.txt") (words 38764) (ms 174)))
; ("localhost" 10001 ((node 0) (doc "Comedy.txt") (words 98107) (ms 321)))
; ("localhost" 10002 ((node 1) (doc "Sherlock.txt") (words 108215) (ms 326)))
;
; reduce from http://localhost:10001/mrdemo/words-0.lsp
; reduce from http://localhost:10002/mrdemo/words-1.lsp
; reduce from http://localhost:10003/mrdemo/words-2.lsp
; Saving Totals
; destroying processes ...
; ~> 

; When nodes retrieve the docouments over the Internet from
; www.gutenberg.org the whoole process takes a longer time:
;
; ~> ./mapreduce 
; started servers -> 28379 28380 28381
; ("localhost" 10001 ((node 0) (doc "16703.txt") (words 98107) (ms 6829)))
; ("localhost" 10003 ((node 2) (doc "20431-8.txt") (words 38754) (ms 8358)))
; ("localhost" 10002 ((node 1) (doc "pg1661.txt") (words 109002) (ms 11084)))
;
; reduce from http://localhost:10001/mrdemo/words-0.lsp
; reduce from http://localhost:10002/mrdemo/words-1.lsp
; reduce from http://localhost:10003/mrdemo/words-2.lsp
; Saving Totals
; destroying processes ...
; ~> 
 

; eof ;



syntax highlighting with newLISP and syntax.cgi