> {-# OPTIONS -fglasgow-exts -fth -fno-monomorphism-restriction #-} > module QuicQuid.Router( > -- * End Points/Addresses Creation > newEndPoint,newNamedEndPoint > ,Address -- ,LocalAddress,subAddress > -- * Send/Receive Messages > -- The suffix "Msg" indicate a message in Term format. > -- The suffix "Message" indicate a message in Message format. > ,write,writeMsgs,readMessage,readBody,readMessages,readMsgs > -- * Types > ,Message,ReadChan > ) where > import System.IO.Unsafe(unsafePerformIO) > import System.FilePath.Posix > import Control.Concurrent.Chan.Closeable > import Data.List > import Control.Monad > import Control.Concurrent.STM > import qualified Data.Map as M > import Data.Maybe > import QuicQuid.Random > import QuicQuid.Log > import QuicQuid.Term Agents communicate by sending messages, in 'Term' format, to other agents' endpoints. Endpoints are identified by globally unique addresses. The global addressing space is a hierarchical federation of addressing spaces where super spaces "own" their subspaces, controlling the traffic addressed to them. The addressing spaces that are currently used: /temp Used for temporary spaces, as the ones created by Web clients connecting via HTTP. /dns/org/quicquid Used for agents provided by org.quicquid, such as dns.org.quicquid.broker This wil be made into a generic 'dns' subspace for owners of dns domains Possible future subspaces: /email owners of a given email /key owners of a given public key /openid owners of a given OpenID Addresses are represented as # or # where: - has the form /topspace/subspace/.. ("/" is the space/domain separator) - has the form bur without a leading / Agents connect to the system by registering endpoints with their local router, of the form: A few key agents have well-known addresses (currently: "/dns/org/quicquid/broker"). Agents addresses cannot be more persistent then that of their local router. Implementation: The QuicQuid messaging layer is composed by a set of connected routers. There is a router in every server and every user's browser. There is no special inter-router protocol, a router connects to another router as a normal agent. Therefore every router has its own address. The root router has the null length address "". A router receives messages at its own address and will dispatch messages to all its subaddresses. For example, sub router "5623." will receive from the root router all messages sent to "*5623.". It will receive directly messages to "5623." and dispatch all the others to its own local endpoints. TODO: - add: delEndPoint - fix function names (same as in JS API). - Write tests. - Distributed router implementation (probably based on spread/hspread or amazon messaging). > {-# NOINLINE router #-} > -- |The default router. > router :: TVar Router > router = unsafePerformIO $ newTVarIO $ M.empty > -- | A router maps the addresses of its endpoints (and their subdomains) to a channel. > type Router = M.Map Address WriteChan > type Message = (Address,Body) > -- |A globally unique address with a hierarchical structure (e.g.: "1234.3456." "broker."). > type Address = String > -- |A local address, a one-level address that identify a local endpoint. > -- A string that does not contain the domain separator character '.'. > type LocalAddress = String > type Body = Term > type ReadChan = Chan R Message > type WriteChan = Chan W Message -- |Convert a topdomain name (e.g. "broker") to the corresponding hierarchical address localAddress :: String -> Address localAddress domain = App (Str "#") (Arr [Str domain]) -- |Return a new address obtained by adding a subdomain to an existing address. subAddress :: Address -> LocalAddress -> Address subAddress (App (Str "#") (Arr domain)) subdomain = App (Str "#") (Arr $ domain ++ [Str subdomain]) -- |Given an address returns the part that corresponds to the local endpoint. -- localKey "22.c1.router." == "c1" (if router's address == "router.") localKey = reverse . takeWhile ((/=) '.') . tail . reverse --(App (Str "#") (Arr (Str domain:subdomains))) = domain data R = R {inChan:ReadChan,addr::Address} newRouter baseRouter relativePath = do (readCh,addr) <- newNamedEndPoint baseRouter relativePath return R readCh,addr > -- |Create an end point with a unique address rooted under the given address > newEndPoint :: Address -> IO (ReadChan,Address) > newEndPoint baseAddress = do > id <- uuid > let addr = baseAddress id > newNamedEndPoint addr -- BUG: will fail if address is duplicated. > -- |Create an end point with the given address. > -- An exception will be thrown if the address is already in use. > newNamedEndPoint :: Address -> IO (ReadChan,Address) > newNamedEndPoint addr = do > if not $ isAbsolute addr > then error $ "address must be absolute, was:" ++ addr > else do > (readCh,writeCh) <- newChan > atomically $ do > key2ch <- readTVar router > if M.member addr key2ch then error $ "Duplicated address: " ++ show addr else return () > writeTVar router $ M.insert addr writeCh key2ch > return (readCh,addr) > -- |Send a message to a given address. > write :: Address -> Body -> IO () > write to body = do > debugM "Router.writeMsg" $ show to ++ " " ++ show body > maybeWriteCh <- atomically $ do > key2ch <- readTVar router > -- Deliver message to longest registered prefix (if we have /x and /x/y, /x/y/z will be delivered to /x/y). > return $ listToMaybe $ mapMaybe (\dirs -> M.lookup (joinPath dirs) key2ch) (reverse $ inits $ splitDirectories to) > if isJust maybeWriteCh > then writeChan (fromJust maybeWriteCh) (to,body) >> return () > else err "Router.writeMsg" $ "Unknown address: " ++ to > -- |Send a Term array of messages > writeMsgs :: Term -> IO () > writeMsgs (Arr msgs) = mapM_ (\(Arr [Str to,msg]) -> write to msg) msgs > -- |Read a message from a channel > readMessage :: ReadChan -> IO Message > readMessage ch = do > Just v <- readChan ch > debugM "Router.readCh" $ show v > return v > -- |Read a message body from a channel > readBody :: ReadChan -> IO Body > readBody ch = readMessage ch >>= \(to,body) -> return body > -- |Returns all the messages that are currently available on a channel in a (Haskell) array. > readMessages :: ReadChan -> IO [Message] > readMessages ch = do > empty <- isEmptyChan ch > if empty > then return [] > else liftM2 (:) (readMessage ch) (readMessages ch) > -- |Returns all the messages that are currently available on a channel in a Term array. > readMsgs :: ReadChan -> IO Term > readMsgs ch = readMessages ch >>= return . Arr . map (\(t,b)-> Arr [Str t,b])