First stab at 'actors': using threads.

This commit is contained in:
Scott Richmond 2022-05-31 14:21:19 -04:00
parent 09696c212a
commit 1e28556baa
3 changed files with 238 additions and 52 deletions

View File

@ -11,6 +11,8 @@
[clojure.pprint :as pp]
[clojure.set]))
(def ^:dynamic self 1001)
;; right now this is not very efficient:
;; it's got runtime checking
;; we should be able to do these checks statically
@ -25,6 +27,62 @@
(declare interpret-ast match interpret)
(def processes (atom {}))
(def current-pid (atom 1001))
(defn new-process []
(let [pid @current-pid
process (atom {:pid pid
:queue clojure.lang.PersistentQueue/EMPTY
:in nil
:status :occupied
})]
(swap! processes #(assoc % pid process))
(swap! current-pid inc)
process))
(def vm-state (atom :stopped))
(defn- values [m] (into [] (map (fn [[_ v]] v)) m))
(defn- process-msg [process]
;;(println "processing message" self)
(let [q (:queue process)
in (:in process)]
(when (not (realized? in))
;;(println "delivering message in" self)
(deliver in (peek q))
(assoc process :queue (pop q) :in nil))))
(defn- run-process [process-atom]
(let [process @process-atom
status (:status process)
q (:queue process)
in (:in process)]
;;(println "running process" self ":" (into [] q))
(when (and (= status :idle) (not-empty q) in)
(swap! process-atom process-msg))))
(defn- start-vm []
;; (println "Starting Ludus VM")
(when (= @vm-state :stopped)
(future
(reset! vm-state :running)
(loop []
(if (= @vm-state :running)
(do
(run! run-process (values @processes))
(recur))
;; (println "Ludus VM shutting down")
)))))
(defn- stop-vm []
(reset! vm-state :stopped)
(reset! processes {})
(reset! current-pid 1001)
nil)
(defn- match-tuple [pattern value ctx-vol]
(cond
(not (vector? value)) {:success false :reason "Could not match non-tuple value to tuple"}
@ -425,8 +483,61 @@
(let [members (:members ast)]
(assoc (reduce (dict-term ctx) {} members) ::data/dict true)))
(defn- interpret-receive [ast ctx]
(let [process-atom (get @processes self)
in (promise)
clauses (:clauses ast)]
;; (println "receiving in" self)
(swap! process-atom #(assoc % :in in :status :idle))
(let [msg @in]
(swap! process-atom #(assoc % :status :occupied))
;; (println "message received by" self ":" msg)
(loop [clause (first clauses)
clauses (rest clauses)]
(if clause
(let [pattern (:pattern clause)
body (:body clause)
new-ctx (volatile! {::parent ctx})
match? (match pattern msg new-ctx)
success (:success match?)
clause-ctx (:ctx match?)]
(if success
(do
(vswap! new-ctx #(merge % clause-ctx))
(let [result (interpret-ast body new-ctx)]
(swap! process-atom #(assoc % :status :idle))
result))
(recur (first clauses) (rest clauses))))
(throw (ex-info "Match Error: No match found" {:ast ast})))))))
(defn- interpret-send [ast ctx]
(let [msg (interpret-ast (:msg ast) ctx)
pid (interpret-ast (:pid ast) ctx)
process-atom (get @processes pid)
process @process-atom
q (:queue process)
status (:status process)]
(when (not (= :dead status))
(swap! process-atom #(assoc % :queue (conj q msg)))
;;(println "sent" msg "to" (:pid process))
)
msg))
(defn- interpret-spawn [ast ctx]
(let [expr (:expr ast)
process (new-process)
pid (:pid @process)]
(with-bindings {#'self pid}
(future
(try (interpret-ast expr ctx)
(catch Exception e
(println "Panic in Ludus process" (str self ":") (ex-message e))))
(swap! @process #(assoc % :status :dead))))
pid))
(defn interpret-ast [ast ctx]
(case (::ast/type ast)
::ast/self self
::ast/atom (:value ast)
@ -456,6 +567,12 @@
::ast/panic (panic ast ctx)
::ast/spawn (interpret-spawn ast ctx)
::ast/send (interpret-send ast ctx)
::ast/receive (interpret-receive ast ctx)
::ast/recur
{::data/recur true :tuple (interpret-ast (:tuple ast) ctx)}
@ -500,20 +617,32 @@
(defn interpret [parsed file]
(try
(let [base-ctx (volatile! (merge {:file file} prelude/prelude))]
(interpret-ast (::parser/ast parsed) base-ctx))
(let [base-ctx (volatile! (merge {:file file} prelude/prelude))
process (new-process)]
(start-vm)
(with-bindings {#'self (:pid @process)}
(let [result (interpret-ast (::parser/ast parsed) base-ctx)]
(swap! process #(assoc % :status :dead))
(stop-vm)
result)))
(catch clojure.lang.ExceptionInfo e
(println "Ludus panicked in" file)
(println "On line" (get-in (ex-data e) [:ast :token ::token/line]))
(println (ex-message e))
;;(pp/pprint (ex-data e))
(System/exit 67))))
(defn interpret-safe [parsed]
(try
(let [base-ctx (volatile! (merge {} prelude/prelude))]
(interpret-ast (::parser/ast parsed) base-ctx))
(let [base-ctx (volatile! (merge {} prelude/prelude))
process (new-process)]
(start-vm)
(with-bindings {#'self (:pid @process)}
(let [result (interpret-ast (::parser/ast parsed) base-ctx)]
(swap! process #(assoc % :status :dead))
result))
(stop-vm))
(catch clojure.lang.ExceptionInfo e
(stop-vm)
(println "Ludus panicked!")
(println "On line" (get-in (ex-data e) [:ast :token ::token/line]))
(println (ex-message e))
@ -539,9 +668,31 @@
(println (ex-message e))
{:result ::error :ctx (volatile! orig-ctx)})))))
(comment
(def source "panic! :oops
(comment
(start-vm)
(def source "
fn echo () -> {
& print (self, :echoing)
loop () with () -> {
& print (self, :looping)
receive {
msg -> {
print (\"from \", self, \": \", msg)
recur ()
}
}
}
}
let echoer = spawn echo ()
send :hello to echoer
send :foo to echoer
send :bar to echoer
send :baz to echoer
sleep (1000)
")
@ -550,13 +701,13 @@
(println "*** *** NEW INTERPRETATION *** ***")
(println "")
(-> source
(let [result (-> source
(scanner/scan)
(parser/parse)
(interpret-safe)
(show/show)
;;(println)
))
)]
result))
(comment "

View File

@ -953,11 +953,7 @@
(defn- parse-spawn [parser]
(let [expr (parse-expr (advance parser))]
(case (node-type expr)
(::ast/word ::ast/fn ::ast/synthetic)
(assoc expr ::ast {::ast/type ::ast/spawn :token (current parser) :expr (::ast expr)})
(panic parser "Expected function literal, word, or synthetic expression after spawn."))))
(assoc expr ::ast {::ast/type ::ast/spawn :expr (::ast expr) :token (current parser)})))
(defn- parse-send [parser]
(let [msg (parse-expr (advance parser))
@ -1000,9 +996,12 @@
(::token/lparen ::token/keyword) (parse-synthetic parser)
(parse-word parser)))
(::token/nil ::token/true ::token/false ::token/self)
(::token/nil ::token/true ::token/false)
(parse-atomic-word parser)
(::token/self)
(assoc (advance parser) ::ast {::ast/type ::ast/self :token token})
::token/lparen (parse-tuple parser)
::token/lbracket (parse-list parser)
@ -1067,24 +1066,54 @@
(comment
(def pp pp/pprint)
(def source "
cond { x -> x
y -> y }
(def source1 "
fn echo () -> {
loop () with () -> {
receive {
msg -> {
print (msg)
recur ()
}
}
}
}
& let my = spawn echo ()
")
(def lexed (scanner/scan source))
(def tokens (:tokens lexed))
(def p (parser tokens))
(def source2 "
fn echo () -> {
loop () with () -> {
receive {
msg -> {
print (msg)
recur ()
}
}
}
}
")
(time (do (def lexed2 (scanner/scan source2))
(def tokens2 (:tokens lexed2))
(def p2 (parser tokens2))
(def ast2 (::ast (parse-script p2)))))
(time (do (def lexed1 (scanner/scan source1))
(def tokens1 (:tokens lexed1))
(def p1 (parser tokens1))
(def ast1 (::ast (parse-script p1)))))
(println "")
(println "")
(println "******************************************************")
(println "")
(println "*** *** NEW PARSE *** ***")
(println "*** *** TEST PARSE *** ***")
(-> p
(parse-script)
(::ast)
(pp)))
(println "asts are the same?" (= ast1 ast2))
(pp ast1)
(pp ast2))
(comment "
Further thoughts/still to do:

View File

@ -75,6 +75,10 @@
::data/type ::data/clj
:body ludus.show/show})
(def sleep- {:name "sleep"
::data/type ::data/clj
:body (fn [ms] (Thread/sleep ms))})
(def prelude {"eq" eq
"add" add
;;"panic!" panic!
@ -89,4 +93,6 @@
"deref" deref-
"set!" set!-
"and" and-
"or" or-})
"or" or-
"sleep" sleep-
})