cglmon.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. /*
  2. Tideland Common Go Library - Monitoring
  3. Copyright (C) 2009-2011 Frank Mueller / Oldenburg / Germany
  4. Redistribution and use in source and binary forms, with or
  5. modification, are permitted provided that the following conditions are
  6. met:
  7. Redistributions of source code must retain the above copyright notice, this
  8. list of conditions and the following disclaimer.
  9. Redistributions in binary form must reproduce the above copyright notice,
  10. this list of conditions and the following disclaimer in the documentation
  11. and/or other materials provided with the distribution.
  12. Neither the name of Tideland nor the names of its contributors may be
  13. used to endorse or promote products derived from this software without
  14. specific prior written permission.
  15. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  16. AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  17. IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  18. ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  19. LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  20. CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  21. SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  22. INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  23. CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  24. ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  25. THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. package cgl
  28. //--------------------
  29. // IMPORTS
  30. //--------------------
  31. import (
  32. "fmt"
  33. "io"
  34. "log"
  35. "os"
  36. "time"
  37. )
  38. //--------------------
  39. // GLOBAL VARIABLES
  40. //--------------------
  41. var monitor *SystemMonitor
  42. //--------------------
  43. // CONSTANTS
  44. //--------------------
  45. const (
  46. etmTLine = "+------------------------------------------+-----------+-----------+-----------+-----------+---------------+-----------+\n"
  47. etmHeader = "| Name | Count | Min Time | Max Time | Avg Time | Total Time | Op/Sec |\n"
  48. etmFormat = "| %-40s | %9d | %9.3f | %9.3f | %9.3f | %13.3f | %9d |\n"
  49. etmFooter = "| All times in milliseconds. |\n"
  50. etmELine = "+----------------------------------------------------------------------------------------------------------------------+\n"
  51. ssiTLine = "+------------------------------------------+-----------+---------------+---------------+---------------+---------------+\n"
  52. ssiHeader = "| Name | Count | Act Value | Min Value | Max Value | Avg Value |\n"
  53. ssiFormat = "| %-40s | %9d | %13d | %13d | %13d | %13d |\n"
  54. dsrTLine = "+------------------------------------------+---------------------------------------------------------------------------+\n"
  55. dsrHeader = "| Name | Value |\n"
  56. dsrFormat = "| %-40s | %-73s |\n"
  57. )
  58. const (
  59. cmdMeasuringPointsMap = iota
  60. cmdMeasuringPointsDo
  61. cmdStaySetVariablesMap
  62. cmdStaySetVariablesDo
  63. cmdDynamicStatusRetrieversMap
  64. cmdDynamicStatusRetrieversDo
  65. )
  66. //--------------------
  67. // MONITORING
  68. //--------------------
  69. // Command encapsulated the data for any command.
  70. type command struct {
  71. opCode int
  72. args interface{}
  73. respChan chan interface{}
  74. }
  75. // The system monitor type.
  76. type SystemMonitor struct {
  77. etmData map[string]*MeasuringPoint
  78. ssiData map[string]*StaySetVariable
  79. dsrData map[string]DynamicStatusRetriever
  80. measuringChan chan *Measuring
  81. valueChan chan *value
  82. retrieverRegistrationChan chan *retrieverRegistration
  83. commandChan chan *command
  84. }
  85. // Monitor returns the system monitor if it exists.
  86. // Otherwise it creates it first.
  87. func Monitor() *SystemMonitor {
  88. if monitor == nil {
  89. // Create system monitor.
  90. monitor = &SystemMonitor{
  91. etmData: make(map[string]*MeasuringPoint),
  92. ssiData: make(map[string]*StaySetVariable),
  93. dsrData: make(map[string]DynamicStatusRetriever),
  94. measuringChan: make(chan *Measuring, 1000),
  95. valueChan: make(chan *value, 1000),
  96. retrieverRegistrationChan: make(chan *retrieverRegistration, 10),
  97. commandChan: make(chan *command),
  98. }
  99. go monitor.backend()
  100. }
  101. return monitor
  102. }
  103. // BeginMeasuring starts a new measuring with a given id.
  104. // All measurings with the same id will be aggregated.
  105. func (sm *SystemMonitor) BeginMeasuring(id string) *Measuring {
  106. return &Measuring{sm, id, time.Nanoseconds(), 0}
  107. }
  108. // Measure the execution of a function.
  109. func (sm *SystemMonitor) Measure(id string, f func()) {
  110. m := sm.BeginMeasuring(id)
  111. f()
  112. m.EndMeasuring()
  113. }
  114. // MeasuringPointsMap performs the function f for all measuring points
  115. // and returns a slice with the return values of the function that are
  116. // not nil.
  117. func (sm *SystemMonitor) MeasuringPointsMap(f func(*MeasuringPoint) interface{}) []interface{} {
  118. cmd := &command{cmdMeasuringPointsMap, f, make(chan interface{})}
  119. sm.commandChan <- cmd
  120. resp := <-cmd.respChan
  121. return resp.([]interface{})
  122. }
  123. // MeasuringPointsDo performs the function f for
  124. // all measuring points.
  125. func (sm *SystemMonitor) MeasuringPointsDo(f func(*MeasuringPoint)) {
  126. cmd := &command{cmdMeasuringPointsDo, f, nil}
  127. sm.commandChan <- cmd
  128. }
  129. // MeasuringPointsWrite prints the measuring points for which
  130. // the passed function returns true to the passed writer.
  131. func (sm *SystemMonitor) MeasuringPointsWrite(w io.Writer, ff func(*MeasuringPoint) bool) {
  132. pf := func(t int64) float64 { return float64(t) / 1000000.0 }
  133. fmt.Fprint(w, etmTLine)
  134. fmt.Fprint(w, etmHeader)
  135. fmt.Fprint(w, etmTLine)
  136. lines := sm.MeasuringPointsMap(func(mp *MeasuringPoint) interface{} {
  137. if ff(mp) {
  138. ops := 1e9 / mp.AvgTime
  139. return fmt.Sprintf(etmFormat, mp.Id, mp.Count, pf(mp.MinTime), pf(mp.MaxTime), pf(mp.AvgTime), pf(mp.TtlTime), ops)
  140. }
  141. return nil
  142. })
  143. for _, line := range lines {
  144. fmt.Fprint(w, line)
  145. }
  146. fmt.Fprint(w, etmTLine)
  147. fmt.Fprint(w, etmFooter)
  148. fmt.Fprint(w, etmELine)
  149. }
  150. // MeasuringPointsPrintAll prints all measuring points
  151. // to STDOUT.
  152. func (sm *SystemMonitor) MeasuringPointsPrintAll() {
  153. sm.MeasuringPointsWrite(os.Stdout, func(mp *MeasuringPoint) bool { return true })
  154. }
  155. // SetValue sets a value of a stay-set variable.
  156. func (sm *SystemMonitor) SetValue(id string, v int64) {
  157. sm.valueChan <- &value{id, v}
  158. }
  159. // StaySetVariablesMap performs the function f for all variables
  160. // and returns a slice with the return values of the function that are
  161. // not nil.
  162. func (sm *SystemMonitor) StaySetVariablesMap(f func(*StaySetVariable) interface{}) []interface{} {
  163. cmd := &command{cmdStaySetVariablesMap, f, make(chan interface{})}
  164. sm.commandChan <- cmd
  165. resp := <-cmd.respChan
  166. return resp.([]interface{})
  167. }
  168. // StaySetVariablesDo performs the function f for all
  169. // variables.
  170. func (sm *SystemMonitor) StaySetVariablesDo(f func(*StaySetVariable)) {
  171. cmd := &command{cmdStaySetVariablesDo, f, nil}
  172. sm.commandChan <- cmd
  173. }
  174. // StaySetVariablesWrite prints the stay-set variables for which
  175. // the passed function returns true to the passed writer.
  176. func (sm *SystemMonitor) StaySetVariablesWrite(w io.Writer, ff func(*StaySetVariable) bool) {
  177. fmt.Fprint(w, ssiTLine)
  178. fmt.Fprint(w, ssiHeader)
  179. fmt.Fprint(w, ssiTLine)
  180. lines := sm.StaySetVariablesMap(func(ssv *StaySetVariable) interface{} {
  181. if ff(ssv) {
  182. return fmt.Sprintf(ssiFormat, ssv.Id, ssv.Count, ssv.ActValue, ssv.MinValue, ssv.MaxValue, ssv.AvgValue)
  183. }
  184. return nil
  185. })
  186. for _, line := range lines {
  187. fmt.Fprint(w, line)
  188. }
  189. fmt.Fprint(w, ssiTLine)
  190. }
  191. // StaySetVariablesPrintAll prints all stay-set variables
  192. // to STDOUT.
  193. func (sm *SystemMonitor) StaySetVariablesPrintAll() {
  194. sm.StaySetVariablesWrite(os.Stdout, func(ssv *StaySetVariable) bool { return true })
  195. }
  196. // Register registers a new dynamic status retriever function.
  197. func (sm *SystemMonitor) Register(id string, rf DynamicStatusRetriever) {
  198. sm.retrieverRegistrationChan <- &retrieverRegistration{id, rf}
  199. }
  200. // Unregister unregisters a dynamic status retriever function.
  201. func (sm *SystemMonitor) Unregister(id string) {
  202. sm.retrieverRegistrationChan <- &retrieverRegistration{id, nil}
  203. }
  204. // DynamicStatusValuesMap performs the function f for all status values
  205. // and returns a slice with the return values of the function that are
  206. // not nil.
  207. func (sm *SystemMonitor) DynamicStatusValuesMap(f func(string, string) interface{}) []interface{} {
  208. cmd := &command{cmdDynamicStatusRetrieversMap, f, make(chan interface{})}
  209. sm.commandChan <- cmd
  210. resp := <-cmd.respChan
  211. return resp.([]interface{})
  212. }
  213. // DynamicStatusValuesDo performs the function f for all
  214. // status values.
  215. func (sm *SystemMonitor) DynamicStatusValuesDo(f func(string, string)) {
  216. cmd := &command{cmdDynamicStatusRetrieversDo, f, nil}
  217. sm.commandChan <- cmd
  218. }
  219. // DynamicStatusValuesWrite prints the status values for which
  220. // the passed function returns true to the passed writer.
  221. func (sm *SystemMonitor) DynamicStatusValuesWrite(w io.Writer, ff func(string, string) bool) {
  222. fmt.Fprint(w, dsrTLine)
  223. fmt.Fprint(w, dsrHeader)
  224. fmt.Fprint(w, dsrTLine)
  225. lines := sm.DynamicStatusValuesMap(func(id, dsv string) interface{} {
  226. if ff(id, dsv) {
  227. return fmt.Sprintf(dsrFormat, id, dsv)
  228. }
  229. return nil
  230. })
  231. for _, line := range lines {
  232. fmt.Fprint(w, line)
  233. }
  234. fmt.Fprint(w, dsrTLine)
  235. }
  236. // DynamicStatusValuesPrintAll prints all status values to STDOUT.
  237. func (sm *SystemMonitor) DynamicStatusValuesPrintAll() {
  238. sm.DynamicStatusValuesWrite(os.Stdout, func(id, dsv string) bool { return true })
  239. }
  240. // Return the supervisor.
  241. func (sm *SystemMonitor) Supervisor() *Supervisor {
  242. return GlobalSupervisor()
  243. }
  244. // Recover after an error.
  245. func (sm *SystemMonitor) Recover(recoverable Recoverable, err interface{}) {
  246. log.Printf("[cgl] recovering system monitor backend after error '%v'!", err)
  247. go sm.backend()
  248. }
  249. // Backend of the system monitor.
  250. func (sm *SystemMonitor) backend() {
  251. defer func() {
  252. HelpIfNeeded(sm, recover())
  253. }()
  254. for {
  255. select {
  256. case measuring := <-sm.measuringChan:
  257. // Received a new measuring.
  258. if mp, ok := sm.etmData[measuring.id]; ok {
  259. // Measuring point found.
  260. mp.update(measuring)
  261. } else {
  262. // New measuring point.
  263. sm.etmData[measuring.id] = newMeasuringPoint(measuring)
  264. }
  265. case value := <-sm.valueChan:
  266. // Received a new value.
  267. if ssv, ok := sm.ssiData[value.id]; ok {
  268. // Variable found.
  269. ssv.update(value)
  270. } else {
  271. // New stay-set variable.
  272. sm.ssiData[value.id] = newStaySetVariable(value)
  273. }
  274. case registration := <-sm.retrieverRegistrationChan:
  275. // Received a new retriever for registration.
  276. if registration.dsr != nil {
  277. // Register a new retriever.
  278. sm.dsrData[registration.id] = registration.dsr
  279. } else {
  280. // Deregister a retriever.
  281. if dsr, ok := sm.dsrData[registration.id]; ok {
  282. sm.dsrData[registration.id] = dsr, false
  283. }
  284. }
  285. case cmd := <-sm.commandChan:
  286. // Receivedd a command to process.
  287. sm.processCommand(cmd)
  288. }
  289. }
  290. }
  291. // Process a command.
  292. func (sm *SystemMonitor) processCommand(cmd *command) {
  293. switch cmd.opCode {
  294. case cmdMeasuringPointsMap:
  295. // Map the measuring points.
  296. var resp []interface{}
  297. f := cmd.args.(func(*MeasuringPoint) interface{})
  298. for _, mp := range sm.etmData {
  299. v := f(mp)
  300. if v != nil {
  301. resp = append(resp, v)
  302. }
  303. }
  304. cmd.respChan <- resp
  305. case cmdMeasuringPointsDo:
  306. // Iterate over the measurings.
  307. f := cmd.args.(func(*MeasuringPoint))
  308. for _, mp := range sm.etmData {
  309. f(mp)
  310. }
  311. case cmdStaySetVariablesMap:
  312. // Map the stay-set variables.
  313. var resp []interface{}
  314. f := cmd.args.(func(*StaySetVariable) interface{})
  315. for _, ssv := range sm.ssiData {
  316. v := f(ssv)
  317. if v != nil {
  318. resp = append(resp, v)
  319. }
  320. }
  321. cmd.respChan <- resp
  322. case cmdStaySetVariablesDo:
  323. // Iterate over the stay-set variables.
  324. f := cmd.args.(func(*StaySetVariable))
  325. for _, ssv := range sm.ssiData {
  326. f(ssv)
  327. }
  328. case cmdDynamicStatusRetrieversMap:
  329. // Map the return values of the dynamic status
  330. // retriever functions.
  331. var resp []interface{}
  332. f := cmd.args.(func(string, string) interface{})
  333. for id, dsr := range sm.dsrData {
  334. dsv := dsr()
  335. v := f(id, dsv)
  336. if v != nil {
  337. resp = append(resp, v)
  338. }
  339. }
  340. cmd.respChan <- resp
  341. case cmdDynamicStatusRetrieversDo:
  342. // Iterate over the return values of the
  343. // dynamic status retriever functions.
  344. f := cmd.args.(func(string, string))
  345. for id, dsr := range sm.dsrData {
  346. dsv := dsr()
  347. f(id, dsv)
  348. }
  349. }
  350. }
  351. //--------------------
  352. // ADDITIONAL MEASURING TYPES
  353. //--------------------
  354. // Measuring contains one measuring.
  355. type Measuring struct {
  356. systemMonitor *SystemMonitor
  357. id string
  358. startTime int64
  359. endTime int64
  360. }
  361. // EndMEasuring ends a measuring and passes it to the
  362. // measuring server in the background.
  363. func (m *Measuring) EndMeasuring() int64 {
  364. m.endTime = time.Nanoseconds()
  365. m.systemMonitor.measuringChan <- m
  366. return m.endTime - m.startTime
  367. }
  368. // MeasuringPoint contains the cumulated measuring
  369. // data of one measuring point.
  370. type MeasuringPoint struct {
  371. Id string
  372. Count int64
  373. MinTime int64
  374. MaxTime int64
  375. TtlTime int64
  376. AvgTime int64
  377. }
  378. // Create a new measuring point out of a measuring.
  379. func newMeasuringPoint(m *Measuring) *MeasuringPoint {
  380. time := m.endTime - m.startTime
  381. mp := &MeasuringPoint{
  382. Id: m.id,
  383. Count: 1,
  384. MinTime: time,
  385. MaxTime: time,
  386. TtlTime: time,
  387. AvgTime: time,
  388. }
  389. return mp
  390. }
  391. // Update a measuring point with a measuring.
  392. func (mp *MeasuringPoint) update(m *Measuring) {
  393. time := m.endTime - m.startTime
  394. mp.Count++
  395. if mp.MinTime > time {
  396. mp.MinTime = time
  397. }
  398. if mp.MaxTime < time {
  399. mp.MaxTime = time
  400. }
  401. mp.TtlTime += time
  402. mp.AvgTime = mp.TtlTime / mp.Count
  403. }
  404. // New value for a stay-set variable.
  405. type value struct {
  406. id string
  407. value int64
  408. }
  409. // StaySetVariable contains the cumulated values
  410. // for one stay-set variable.
  411. type StaySetVariable struct {
  412. Id string
  413. Count int64
  414. ActValue int64
  415. MinValue int64
  416. MaxValue int64
  417. AvgValue int64
  418. total int64
  419. }
  420. // Create a new stay-set variable out of a value.
  421. func newStaySetVariable(v *value) *StaySetVariable {
  422. ssv := &StaySetVariable{
  423. Id: v.id,
  424. Count: 1,
  425. ActValue: v.value,
  426. MinValue: v.value,
  427. MaxValue: v.value,
  428. AvgValue: v.value,
  429. }
  430. return ssv
  431. }
  432. // Update a stay-set variable with a value.
  433. func (ssv *StaySetVariable) update(v *value) {
  434. ssv.Count++
  435. ssv.ActValue = v.value
  436. ssv.total += v.value
  437. if ssv.MinValue > ssv.ActValue {
  438. ssv.MinValue = ssv.ActValue
  439. }
  440. if ssv.MaxValue < ssv.ActValue {
  441. ssv.MaxValue = ssv.ActValue
  442. }
  443. ssv.AvgValue = ssv.total / ssv.Count
  444. }
  445. // DynamicStatusRetriever is called by the server and
  446. // returns a current status as string.
  447. type DynamicStatusRetriever func() string
  448. // New registration of a retriever function.
  449. type retrieverRegistration struct {
  450. id string
  451. dsr DynamicStatusRetriever
  452. }
  453. /*
  454. EOF
  455. */