Introduction to machinate
The primitive machinate provides is Events. The fundamental operation on an event is synchronizing on it. Synchronizing on an event means waiting for that event to occur. When an event occurs a value is communicated to anyone synchronizing on the event. Events can be combined into a new event that represents an unordered choice of those events (select, alts, etc). Events can be guarded to add pre-synchronization actions, and wrapped to provide post-synschronization actions. There is a nack mechanism that allows for registering actions to take if an event is not the one selected when doing the unordered choice thing.
Machinate also provides channels(queues) for communicating values between threads of control. Sending and receiving data over channels is done by constructiong events that represent sending and receiving and then sync’ing on those events.
The following examples all assume:
(require '[com.manigfeald.machinate :as m])
Sync
m/sync!
does not block, but takes a callback to be run after an event occurs. m/sync!!
blocks until the given event occurs and returns and returns the value of the event.
Never and always
nil
is the never event. It never synchronizes, so if you try you will just wait forever. com.manigfeald.machinate/always
is an event that always has already happend. Syncrhonizing on always succeeds immiediately.
(m/sync!! nil) ; blocks forever
(m/sync!! m/always) ; never blocks
Clojure reference types
m/watch-reference
creates an event that occurs when some clojure reference type (atom, ref, agent, anything add-watch works on really) is observed to have some given value.
(def a (atom 0))
(def e (m/watch-reference a 1))
(m/sync! (m/wrap e (fn [_] (println "yo")))) ; "yo" doesn't print
(swap! a inc) ; e occurs and "yo" prints
Wrapping
m/wrap
and m/wrap-handler
take an event and create a new one that will apply the given function to the original events value and then use the result as the value of the new event.
m/wrap
is for normal values, m/wrap-handler
is for handling errors.
(m/sync!! (m/wrap m/always (constantly :hello))) ; results in :hello
Timeouts
com.manigfeald.machinate/timeout
returns an event that occurs sometime in the future after a given delay.
(m/sync!! (m/timeout 100)) ; blocks for 100 milliseconds
Create a rendevous channel
A rendevous channel is unbufffered. Senders and receivers must stop and wait for each other.
(def a-channel (m/channel))
Send over a channel
(m/sync!! (m/send a-channel :some-message))
Receive over a channel
(m/sync!! (m/receive a-channel))
Receive from a channel or timeout
(m/sync!! (m/choice [(m/receive a-channel) (m/timeout 1000)]))
Close a channel
(m/close! a-channel)
A channel of 1s
Publishes 1s to channel c
until it is closed.
(def c (m/channel))
((fn this-fn [] (sync! (wrap (send c 1) (fn [was-sent] (when was-sent (this-fn)))))))
A port of a core.async example
This example sends 1000 messages and then receivers them all.
(let [n 1000
cs (repeatedly n m/channel)
begin (System/currentTimeMillis)]
(doseq [c cs] (m/sync! (m/send c "hi")))
(loop [receives (for [c cs] (m/receive c))]
(when (seq m/receives)
(let [[the-val the-evt] (m/sync!! (m/alts! receives))]
(assert (= "hi" the-val))
(recur (remove #{the-evt} receives)))))
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
Dining philosophers
(let [philosopher-count 10
hunger 100
eating-seconds (/ 1 100)
done (atom 0)
left-forks (vec (repeatedly philosopher-count #(m/buffered-channel 1)))
announcements (m/channel)
log (fn [& args]
(m/sync!! (m/send announcements args)))
yield-virtual-thread (fn []
(m/sync!! (m/timeout 0)))]
((fn l []
(m/sync!
(m/wrap (m/receive announcements)
(fn [args]
(apply println args)
(l))))))
(dotimes [i philosopher-count]
(let [left-fork (nth left-forks i)
right-fork (nth left-forks (mod (inc i) philosopher-count))]
(Thread/startVirtualThread
(fn []
(try
(log i 'seated)
(loop [h hunger
is-thinking false]
(yield-virtual-thread)
(when-not (neg? h)
(if (m/sync!! (m/ordered-choice [(m/receive left-fork)
(m/wrap m/always
(constantly false))]))
(if (m/sync!! (m/ordered-choice [(m/receive right-fork)
(m/wrap m/always
(constantly false))]))
(do
(log i 'eating h)
(m/sync!! (m/timeout (* (rand) 1000 eating-seconds)))
(m/sync!! (m/send left-fork true))
(m/sync!! (m/send right-fork true))
(recur (dec h) true))
(do
(m/sync!! (m/send left-fork true))
(recur h is-thinking)))
(do
(when-not is-thinking
(log i 'thinking))
(recur h true)))))
(log i 'satisfied)
(swap! done inc)
(log i 'left)
(catch Throwable t
(prn t)))))))
(doseq [fork left-forks] (m/sync! (m/send fork true)))
(m/sync!! (m/watch-reference done philosopher-count))
(log 'table-empty))