Breadth First Search in Haskell
Haskell is a really great programming language. It is elegant, the type system is beautiful, and nowadays the compiler is quite good. I've been using functional languages off and on for more than 30 years. I studied at the University of Kent which is the home Miranda which is a precursor to Haskell. All this is a warning. I don't use Haskell that much. The language has changed a lot since I last used it regularly, and so my code might not be optimal or idiomatic Haskell.
On some of our programs we teach the first year students Haskell. This includes their first algorithms course where they learn such things as search trees, hash tables, and some elementary graph algorithms. Since I had to co-teach our Haskell course at rather short notice, I had to do get up to speed with Haskell again. The hard part about learning a language is getting up to sped on idioms, the standard library and what cool libraries there are out there. One thing that always seems to be true is that almost every problem can be solved with Monads.
There is a danger when trying to get things working in Haskell that you start googling. You find some Stack Exchange discussion where somebody says: "Your problem can be solved if you use multi-coloured Frobenius bi-applicative monad transformers. Here is a short piece of code that solves your problem, a link to a package and some academic paper, but don't forget to enable the Haskell extension that allows you to have colour coded syntax to semantic transformers." You then spend a good couple of days going down the rabbit hole of trying to understand everything. You eventually write your beautiful piece of code that nobody can understand unless they spend another two days trying to understand how everything works. Some people have trouble accepting the fact that verbose maintainable code is much better than short elegant one liners. As you can see in this post, I've made a rather simple piece of code more and more complicated and eventually ended up using monad transformers, but I have tried to stick to the standard Haskell libraries.
Some algorithms do not have natural functional implementations, and
Breadth first search is one such algorithm. The way it is normally
described involves picking a node, enqueuing it in a queue. Find
all the neighbours explore the neighbours. Keep track of all the nodes
that you have visited to avoid cycles. If you implement the algorithm
naively in pure Haskell then you will need implement (or use the
standard implementation) the visited set as some
sort of search tree. You will then incur an \(O(n\log n)\) overhead. You
can do various things to improve the performance: use the IO Monad
or use some of the approaches to functional graphs. Here I have chosen to
live with the overhead. Even so, there are some interesting challenges
especially when you want to list all breadth first traversals of a
graph, rather than just one.
Warning my Haskell is somewhat rusty, and if you have any suggestions or improvements to the code please contact me.
We will assume that we have a Queue
and a Graph
library with
suitable functions (at the end of the post, I'll give a simple
implementation of these modules).
import qualified Queue_Seq as Q
import qualified Graph_Simple as G
G.fromEdges :: Ord a => [(a, a)] -> G.Graph a
G.neighbours :: Ord a => G.Graph a -> a -> [a]
t1 = G.fromEdges [(1,2) , (1,3) , (2,4) , (2,5), (3,6) , (3,7) ]
t1_neighbour_fun node =
G.neighbours t1 node
Q.empty :: Q.Queue a
Q.enqueue :: Q.Queue a -> a -> Q.Queue a
Q.enqueue_fromList :: Q.Queue a -> [a] -> Q.Queue a
Q.dequeue :: Q.dequeue :: Q.Queue a -> (a, Q.Queue a)
Imperative Style Breadth First Search
So now we can translate the imperative algorithm into Haskell. The state is maintained by passing it around. First the main loop:
import qualified Data.Set as S
bfs_loop :: (G.Graph Integer) ->
(Q.Queue Integer) -> (S.Set Integer) -> [Integer] -> [Integer]
bfs_loop g c_queue c_visited c_path
| (Q.isempty c_queue) = c_path
| otherwise =
let (next_node, c_dequed) = Q.dequeue c_queue
new_neighbours = G.neighbours g next_node
in
if not (next_node `S.member` c_visited)
then
let
new_visited = S.insert next_node c_visited
new_path = next_node : c_path
new_queue = Q.enqueue_fromList c_dequed new_neighbours
in
bfs_loop g new_queue new_visited new_path
else
bfs_loop g c_dequed c_visited c_path
There are probably better ways of writing the code. I tend to use lots
of named expressions using let
or where
this makes the code a bit
more verbose, but you actually know what is going on. If our queue is
empty then there is nothing to do. Otherwise each time around
the loop we dequeue an element, check if we have seen it before. If we
have not seen it before, then look at the neighbours and enqueue them.
We then wrap the code up with a driver to make it more userfriendly
bfs g start_node =
let initial_queue = Q.enqueue Q.empty start_node
initial_visited = S.empty
initial_path = []
in
reverse (bfs_loop g initial_queue initial_visited initial_path)
For example
> bfs g1 1
[1,3,2,5,6,4]
Finding all Breadth First Paths (The Level Algorithm)
I am not sure how you would modify an imperative implementation to
list all bread first paths. Essentially you want some
non-determinism. When you enqueue
the neighbours of a node you need
to consider all possible orders that you can enqueue
them. I spent
quite a lot of time look at various libraries that support
non-determinism. Surely I could do something clever with
Control.Monad.Logic. I
started to write a version using Continuation-passing
style to
have a list of non-deterministic continuations of the computation. All
of this was too hard for me.
A simple approach is to to construct breadth first traversal level by
level. If you have a partial \([n_1,\ldots, n_k]\) breadth first
traversal, then you find the first node \(n_i\) in the list that does
not have all of its neighbours in the list, and then explore the
neighbours of that node. This gives you a set of possible
continuations of that path. This made me think about paths as an
abstract data type. The set visited
in the above code it really to
keep track if you are going to loop back on the path, and so it really
should be part of your path datatype. This gives you code something
like:
data Path a = Path [a] (S.Set a)
instance (Show a) => Show (Path a) where
show (Path l _) = "Path = " ++ (show (reverse l))
-- Insert the next node.
empty_path :: Path a
empty_path = (Path [] S.empty)
-- Insert a node into the path. Note that
-- paths are stored backwards.
insert_node ::(Ord a) => Path a -> a -> Path a
insert_node (Path p_l p_s) n =
(Path new_path new_set)
where
new_path = n:p_l
new_set = S.insert n p_s
last_node :: Path a -> a
last_node (Path p_l _ ) = head p_l
-- tells you if the second argument has been seen.
visited :: (Ord a) => Path a -> a -> Bool
visited (Path _ p_s) n = n `S.member` p_s
visited_set :: (Ord a) => Path a -> (S.Set a) -> Bool
visited_set (Path _ p_s) s = s `S.isSubsetOf` p_s
visited_list :: (Ord a) => Path a -> [a] -> Bool
visited_list (Path _ p_s) l = (S.fromList l) `S.isSubsetOf` p_s
path_list :: Path a -> [a]
path_list (Path l _) = reverse l
You really should package this up in a module.
You can then write a function that extends a path
new_paths :: (Ord a) => (a -> [a]) -> (Path a) -> [Path a]
new_paths _ (Path [] _) = []
new_paths neighbour_fun current_path =
let nodes = path_list current_path
all_neighbours_not_present =
filter (\n -> not (visited_list current_path (neighbour_fun n)) )
nodes
in
if null all_neighbours_not_present
then
[]
else
paths_from_n neighbour_fun current_path (head all_neighbours_not_present)
-- paths_from_n :: (Ord a) => (a -> [a]) -> (Path a) -> a -> [Path a]
-- n has to be a node which does not have all neighbours. You do that
-- in new_paths not paths_from_n
paths_from_n neighbour_fun current_path n =
let new_neighbours = (neighbour_fun n)
neighbours_not_in_path =
filter (\n -> not (visited current_path n)) new_neighbours
new_paths = map (insert_node current_path ) neighbours_not_in_path
in
new_paths
The argument neighbour_fun
tells the neighbours of the current
graph. The fact that we are working with the graph function is not
really important.
Writing the recursion to get to the fixed point.
all_new_paths_next_lvl :: (Ord a) => (a -> [a]) -> [Path a] -> [Path a]
all_new_paths_next_lvl _ [] = []
all_new_paths_next_lvl neighbour_fun path_list =
concat (map (new_paths neighbour_fun) path_list)
all_new_paths :: (Ord a) => (a -> [a]) -> [Path a] -> [Path a]
all_new_paths neighbour_fun path_list =
let next_lvl = all_new_paths_next_lvl neighbour_fun path_list
in
if null next_lvl
then
path_list
else
all_new_paths neighbour_fun next_lvl
Finally the wrapper. Notice that we take our graph and construct the neighbourhood function.
bfs_level_algo g start_node =
all_new_paths neighbour_fun [initial_path]
where
initial_path = insert_node empty_path start_node
neighbour_fun node =
G.neighbours g node
Breath First Search again but with Paths.
Now we have a Path datatype we can rewrite our original implementation. This is a step to making a non-deterministic breadth first search.
bfs_path g start_node =
let initial_queue = Q.enqueue Q.empty start_node
initial_path = empty_path
neighbour_fun node =
G.neighbours g node
in
(bfs_path_loop neighbour_fun initial_queue initial_path)
bfs_path_loop :: (Ord a) => ( a -> [a]) ->(Q.Queue a) ->
(Path a) -> (Path a)
bfs_path_loop neighbour_fun c_queue c_path
| (Q.isempty c_queue) = c_path
| otherwise =
let (next_node, c_dequed) = Q.dequeue c_queue
new_neighbours = neighbour_fun next_node
in
if not (visited c_path next_node)
then -- We have visited the node
let
new_path = insert_node c_path next_node
new_queue = Q.enqueue_fromList c_dequed new_neighbours
in
bfs_path_loop neighbour_fun new_queue new_path
else -- We have visited the node , but we still have to deque
bfs_path_loop neighbour_fun c_dequed c_path
Refactoring Breadth First Search
We are going to use lists as a monad. Lists can be used to model
non-determinism. Suppose you have a function f = \v -> [v, (-1)*v]
this takes an number v
and returns v
and -v
. The list gives you
the two options. If you have a list, say [1,2]
then applying f
to
that list should give you the list [1,-1,2,-2]
. It is not hard to
write the correct function. But this is built into Haskell via the
List monad. For example:
Prelude> let f = \x -> [x, (-1)*x ] Prelude> [1,2]
>>= f [1,-1,2,-2]
There are a lot of tutorials on using Moands and the List monad, and I'm not going to repeat things here.
Before we get to non-determinism we are going to refactor our code a bit. We need a type to represent the current state of a compuation.
type BFS_State a = ( (a -> [a] ) , (Q.Queue a), (Path a) )
Then next_path
dequeues a node from the current_queue
and
constructs the next path.
next_path :: (Ord a) => BFS_State a -> BFS_State a
next_path (neighbour_fun , current_queue , current_path) =
if Q.isempty current_queue
then
(neighbour_fun, current_queue, current_path) -- The fixpoint of the function.
else
let (next_node, c_dequed) = Q.dequeue current_queue
new_neighbours = neighbour_fun next_node
in
if not (visited current_path next_node)
then
let
new_path = insert_node current_path next_node
new_queue = Q.enqueue_fromList c_dequed new_neighbours
in
(neighbour_fun , new_queue , new_path)
else
(neighbour_fun , c_dequed , current_path)
We can then wrap this up in an iterator. At the moment we are just doing some re-factoring of the code to make it easier to use monads for non-determinism.
bfs_next_path_loop :: (Ord a) => BFS_State a -> BFS_State a
bfs_next_path_loop state =
let
(new_fun, new_queue , new_path) = next_path state
in
if not (Q.isempty new_queue)
then
bfs_next_path_loop (new_fun, new_queue , new_path)
else
(new_fun, new_queue , new_path)
bfs_with_next_path g start_node =
let initial_queue = Q.enqueue Q.empty start_node
initial_path = empty_path
neighbour_fun node =
G.neighbours g node
(_,_,path) =
bfs_next_path_loop (neighbour_fun , initial_queue , initial_path)
in
path
Using the list Monad for non-determinism.
So the key idea is that we are going to write a function that takes
the current state and returns all possible continuations. When look at
the neighbours of a node you have to consider all the possible
permutations of enqueue
operations. In Data.List
there is a handy
permutations
function.
import Data.List
all_next_path :: (Ord a) => BFS_State a -> [BFS_State a]
all_next_path (neighbour_fun , current_queue , current_path) =
if Q.isempty current_queue
then
[(neighbour_fun, current_queue, current_path)]
else
let (next_node, c_dequed) = Q.dequeue current_queue
new_neighbours = neighbour_fun next_node
all_permutations = permutations new_neighbours
in
if not (visited current_path next_node)
then
let
new_path = insert_node current_path next_node
new_queues = map (\x -> Q.enqueue_fromList c_dequed x)
all_permutations
new_states = map (\x -> (neighbour_fun, x , new_path) )
new_queues
in
new_states
else
[(neighbour_fun , c_dequed , current_path)]
The type signature of >>=
when specialised to lists is [a] -> (a -> [b]) -> [b]
(a
can equal b
). We have that all_next_path :: (Ord a) => BFS_State a -> [BFS_State a]
so this means that we can
use >>=
. Again we can wrap up all_next_path
into a function that
does the iteration. To make sure that our algorithm terminates we
stop trying to expand states where the queues are empty. This gives us
some code that looks like:
bfs_all_paths_loop :: (Ord a) => [BFS_State a] -> [BFS_State a]
bfs_all_paths_loop state_list =
if (null state_list) then []
else
let next_paths =
state_list >>= all_next_path
(empty_queues , non_empty_queues ) =
partition state_queue_empty next_paths
in
(bfs_all_paths_loop non_empty_queues) ++ empty_queues
The use of ++
might be inefficient. I don't know without some
profiling and in a lazy complied language it is hard to know. If it is
inefficient then you should probably use difference lists. I spent a
long time trying to come up with a nice short equivalent formulation
using the do
notation. I'm not sure if there is. If anybody finds
one, then please contact me. Again you will need to wrap the loop up
in a more user friendly way.
bfs_with_all_paths g start_node =
let
initial_queue = Q.enqueue Q.empty start_node
initial_path = empty_path
neighbour_fun node =
G.neighbours g node
state_list =
bfs_all_paths_loop [(neighbour_fun , initial_queue , initial_path)]
in
map (\(a,b,c) -> c) state_list
Using the writer monad.
With our above code we are iterating until a fixed point. States with an empty queue are saved as completed path. The above code mixes these together. Haskell's Writer monad is designed for computations where you also want to log information. We can use this to record the complete path. Again there are lots of tutorials on the writer monad, and this post has gone on far to long. So here is the code.
The type deceleration
bfs_all_paths_loop_writer :: (Ord a) =>
[BFS_State a] -> Writer [BFS_State a] [BFS_State a]
Gives you a new Monad on top of the list monad [BFS_State a]
where
you use the list monad [BFS_State a]
. Given an element x
of this
monad the function runWriter a
gives you a pair (r,v)
where r
is
an element of the recording type and v
is an element of the writer
class. In this code writer(v,r)
allows you to construct an element of the
writer monad where v
is the value and r
is the record. The writer
monad does all the book keeping of gluing together the record logs.
import Control.Monad.Trans.Writer
import Control.Monad
bfs_all_paths_loop_writer :: (Ord a) =>
[BFS_State a] -> Writer [BFS_State a] [BFS_State a]
bfs_all_paths_loop_writer state_list =
if (null state_list) then writer ([],[])
else
let next_paths =
state_list >>= all_next_path
(empty_queues , non_empty_queues ) =
partition state_queue_empty next_paths
(new_states, history) =
runWriter (bfs_all_paths_loop_writer non_empty_queues)
in
writer(new_states, history `mappend` empty_queues)
The mappend
function in this case is a fancy generic way of writing
++
for lists. There are lots of ways of improving the above code,
and it could probably be written in more idiomatic Haskell. The
expression writer ([],[])
could be rewritten using return
. Using
++
on lists can be inefficient, and you might want to replace the
type recording type [BFS_State a]
with difference
lists.
Again you want to wrap this up in something more user friendly.
bfs_with_all_paths_writer g start_node =
let
initial_queue = Q.enqueue Q.empty start_node
initial_path = empty_path
neighbour_fun node =
G.neighbours g node
state_list =
snd (runWriter (bfs_all_paths_loop_writer [(neighbour_fun , initial_queue , initial_path)]))
in
map (\(a,b,c) -> c) state_list
Again I tried to come up with something elegant using do
blocks and
guards, but failed. The problem with Monad transformers is that you
often require nested do blocks to get the types right. There is a lift
function lift
that allows you get do down inside your tower of monad
transformers.
Extra Libraries
These are not meant to be efficient. I just wanted to check that I still understood how modules worked.
First Graph_Simple.hs
module Graph_Simple (fromEdges, neighbours , Graph (..) ) where
import qualified Data.Set as S
data Graph a = G (S.Set a , S.Set (a,a) ) deriving (Show)
allnodes [] = S.empty
allnodes ((v,w) : xs) =
let nodes_of_rest = (allnodes xs)
in
S.insert w (S.insert v nodes_of_rest)
fromEdges :: (Ord a) => [(a,a)] -> Graph a
fromEdges edge_list =
let nodes = allnodes edge_list
edges = S.fromList edge_list
in
G (nodes, edges)
neighbours :: (Ord a) => Graph a -> a -> [a]
neighbours (G (nodes, edges) ) n =
if n `S.member` nodes
then
[ y | (x,y) <- S.toList (edges) , x == n]
else
error "Node not a member of the graph."
Then Queue_Seq.hs
that implements queues using Haskell's sequence
data type.
module Queue_Seq (empty,
singelton,
enqueue,
dequeue,
isempty,
enqueue_fromList,
Queue (..) ) where
import qualified Data.Sequence as Seq
--- More efficient Queue data type.
data Queue a = Q (Seq.Seq a) deriving (Eq,Show)
empty = Q Seq.empty
isempty :: Queue a -> Bool
isempty (Q q) = case q of
Seq.Empty -> True
otherwise -> False
singelton :: a -> Queue a
singelton e = Q (Seq.singleton e)
enqueue :: Queue a -> a -> Queue a
enqueue (Q q) a =
Q (a Seq.<| q)
dequeue :: Queue a -> ( a , Queue a)
dequeue (Q (new_q Seq.:|> elem)) = (elem, (Q new_q))
enqueue_fromList :: Queue a -> [a] -> Queue a
enqueue_fromList q [] = q
enqueue_fromList q (x : xs) =
(enqueue qs x)
where
qs = enqueue_fromList q xs
You probably want to rewrite enqueue_fromList
using foldl
or one
of its strict versions.