vl-db-agent1.0.0Provides an API for coordinated database writing dependencies
alias dependencies
| (this space intentionally left almost blank) | |||||||||||||||||||||||||||||||||||||||
RationaleIn some cases Metis (A)
and DevProxy (B) both
write data direct to the same calibration document. With this setup
there is a chance that conflicts occur due to uncoordinated
writing. A and B try to store the document with | ||||||||||||||||||||||||||||||||||||||||
┌─────┐ ┌─────┐ │ │ │ │ │ A │ │ B │ │ │ │ │ └─┬──▲┘ └▲──┬─┘ │ │ │ │ rev 1 │ ✓ ┌──────┐ ✓ │ rev 1 │ └─┤ ├─┘ │ ✓ │ DB │ x └────► ◄────┘ └──────┘ The workaround so far was sequencing related steps. vl-db-agent provides an endpoint for coordinated writing of vaclab style measurement results (`data) to calibration documents. This is realized by means of a clojure agent. ┌─────┐ ┌─────┐ │ │ │ │ │ A │ │ B │ │ │ │ │ └┬────┘ └────┬┘ │ │ data │ ┌──────────────┐ │ data │ │ │ │ └─► db-agent ◄─┘ │ │ │ rev 1 rev 2 │ └─▲──┬───▲──┬──┘ | | | | ✓ ✓ ✓ ✓ | | | | ┌─┴──▼───┴──▼──┐ │ │ │ DB │ │ │ └──────────────┘ | (ns vl-db-agent.core ^{:author "Thomas Bock <thomas.bock@ptb.de>"} (:require [compojure.core :refer [POST]] [compojure.handler :as handler] [com.brunobonacci.mulog :as µ] [integrant.core :as ig] [libcdb.core :as db] [libcdb.configure :as cf] [org.httpkit.server :refer [run-server]] [ring.util.response :as res] [ring.middleware.json :as middleware] [vl-data-insert.core :as i]) (:gen-class)) | |||||||||||||||||||||||||||||||||||||||
Configurationvl-db-agent uses integrant. This enables a controlled way to start and stop the system. integrant style configuration map. | (def config {:log/mulog {:type :multi :log-context {:app-name "vl-db-agent" :facility (or (System/getenv "DEVPROXY_FACILITY") (System/getenv "DEVHUB_FACILITY") (System/getenv "METIS_FACILITY"))} :publishers[{:type :elasticsearch :url "http://a75438:9200/" :els-version :v7.x :publish-delay 1000 :data-stream "vl-log-stream" :name-mangling false}]} :db/couch {:prot "http", :host "localhost", :port 5984, :usr (System/getenv "CAL_USR") :pwd (System/getenv "CAL_PWD") :name "vl_db_work"} :system/agent {:ini {}} :endpoint/results {:db (ig/ref :db/couch) :agnt (ig/ref :system/agent)} :server/http-kit {:port 9992 :join? false :json-opt {:keywords? true} :handler (ig/ref :endpoint/results)}}) | |||||||||||||||||||||||||||||||||||||||
Database io functionsThe three functions used here are simply passed through from library libcdb | (defn db-config [opts] (cf/config opts)) (defn get-doc [id db] (db/get-doc id db)) (defn put-doc [doc db] (db/put-doc doc db)) | |||||||||||||||||||||||||||||||||||||||
HandlerThe handler receives the | (defn store-data [get-fn {:keys [DocPath Result] :as data} put-fn] (µ/trace ::store-data [:function "core/store-data"] (let [doc (get-fn)] (put-fn (reduce (fn [d [r p]] (i/store-results d [r] p)) doc (zipmap Result (if (string? DocPath) (repeat DocPath) DocPath))))))) | |||||||||||||||||||||||||||||||||||||||
ChecksSome simple checks about the shape of the | (defn results-ok? [v] (and (vector? v) (empty? (filter empty? v)))) | |||||||||||||||||||||||||||||||||||||||
vl-db-agent provides the opportunity to store | (defn docpath-ok? [p r] (if (string? p) (not (empty? p)) (and (vector? p) (vector? r) (= (count p) (count r)) (empty? (filterv empty? p))))) | |||||||||||||||||||||||||||||||||||||||
(defn data-ok? [{:keys [DocPath Result]}] (and (docpath-ok? DocPath Result) (results-ok? Result))) | ||||||||||||||||||||||||||||||||||||||||
Route and agentvl-db-agent provides one | (defn proc [db a] (POST "/:id" [id :as req] (let [data (-> req :body) get-fn (fn [] (get-doc id db)) put-fn (fn [doc] (put-doc doc db))] (if (data-ok? data) (do (µ/log ::proc :doc-id id :message "doc and data ok") (send a (fn [m] (assoc m id {:res (store-data get-fn data put-fn) :data data}))) (-> {:ok true} (res/status 202) (res/response))) (do (let [msg "missing database doc or maleformed request data"] (µ/log ::proc :error msg) (-> {:error msg} (res/status 412) (res/response)))))))) | |||||||||||||||||||||||||||||||||||||||
The first | ||||||||||||||||||||||||||||||||||||||||
SystemThe entire system is stored in an System up multimethods | (defonce system (atom nil)) | |||||||||||||||||||||||||||||||||||||||
The | (defmethod ig/init-key :endpoint/results [_ {:keys [db agnt]}] (proc db agnt)) | |||||||||||||||||||||||||||||||||||||||
(defmethod ig/init-key :log/mulog [_ opts] (µ/set-global-context! (:log-context opts)) (µ/start-publisher! opts)) | ||||||||||||||||||||||||||||||||||||||||
(defmethod ig/init-key :db/couch [_ opts] (db-config opts)) | ||||||||||||||||||||||||||||||||||||||||
Initialization of the agent. The error handler function logs the error and the state of the agent before the error occured so that the agent can be restarted by means of restart-agent | (defmethod ig/init-key :system/agent [_ {:keys [ini]}] (agent ini :error-handler (fn [a e] (µ/log ::agent-error-handler :error e :agent @a)))) | |||||||||||||||||||||||||||||||||||||||
(defmethod ig/init-key :server/http-kit [_ {:keys [handler json-opt] :as opts}] (run-server (-> handler (middleware/wrap-json-body json-opt) (middleware/wrap-json-response)) (-> opts (dissoc :handler)))) | ||||||||||||||||||||||||||||||||||||||||
System down multimethodsThe | (defmethod ig/halt-key! :server/http-kit [_ server] (server)) | |||||||||||||||||||||||||||||||||||||||
(defmethod ig/halt-key! :log/mulog [_ logger] (logger)) | ||||||||||||||||||||||||||||||||||||||||
(defmethod ig/halt-key! :system/agent [_ a] (send a (fn [_] {}))) | ||||||||||||||||||||||||||||||||||||||||
Start, stop and restart The following functions are intendedfor REPL usage. | (defn start [] (keys (reset! system (ig/init config)))) | |||||||||||||||||||||||||||||||||||||||
(defn stop [] (µ/log ::start :message "halt system") (ig/halt! @system) (reset! system {})) | ||||||||||||||||||||||||||||||||||||||||
(defn restart [] (stop) (Thread/sleep 1000) (start)) | ||||||||||||||||||||||||||||||||||||||||
MainThe | (defn -main [& args] (start)) | |||||||||||||||||||||||||||||||||||||||
Playground | (comment (start) (stop) (restart) (:system/agent @system) (agent-error (:system/agent @system)) (map (comp :ok :res) (vals @(:system/agent @system)))) | |||||||||||||||||||||||||||||||||||||||