| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569 |
- /*
- Tideland Common Go Library - Monitoring
- Copyright (C) 2009-2011 Frank Mueller / Oldenburg / Germany
- Redistribution and use in source and binary forms, with or
- modification, are permitted provided that the following conditions are
- met:
- Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- Neither the name of Tideland nor the names of its contributors may be
- used to endorse or promote products derived from this software without
- specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- THE POSSIBILITY OF SUCH DAMAGE.
- */
- package cgl
- //--------------------
- // IMPORTS
- //--------------------
- import (
- "fmt"
- "io"
- "log"
- "os"
- "time"
- )
- //--------------------
- // GLOBAL VARIABLES
- //--------------------
- var monitor *SystemMonitor
- //--------------------
- // CONSTANTS
- //--------------------
- const (
- etmTLine = "+------------------------------------------+-----------+-----------+-----------+-----------+---------------+-----------+\n"
- etmHeader = "| Name | Count | Min Time | Max Time | Avg Time | Total Time | Op/Sec |\n"
- etmFormat = "| %-40s | %9d | %9.3f | %9.3f | %9.3f | %13.3f | %9d |\n"
- etmFooter = "| All times in milliseconds. |\n"
- etmELine = "+----------------------------------------------------------------------------------------------------------------------+\n"
- ssiTLine = "+------------------------------------------+-----------+---------------+---------------+---------------+---------------+\n"
- ssiHeader = "| Name | Count | Act Value | Min Value | Max Value | Avg Value |\n"
- ssiFormat = "| %-40s | %9d | %13d | %13d | %13d | %13d |\n"
- dsrTLine = "+------------------------------------------+---------------------------------------------------------------------------+\n"
- dsrHeader = "| Name | Value |\n"
- dsrFormat = "| %-40s | %-73s |\n"
- )
- const (
- cmdMeasuringPointsMap = iota
- cmdMeasuringPointsDo
- cmdStaySetVariablesMap
- cmdStaySetVariablesDo
- cmdDynamicStatusRetrieversMap
- cmdDynamicStatusRetrieversDo
- )
- //--------------------
- // MONITORING
- //--------------------
- // Command encapsulated the data for any command.
- type command struct {
- opCode int
- args interface{}
- respChan chan interface{}
- }
- // The system monitor type.
- type SystemMonitor struct {
- etmData map[string]*MeasuringPoint
- ssiData map[string]*StaySetVariable
- dsrData map[string]DynamicStatusRetriever
- measuringChan chan *Measuring
- valueChan chan *value
- retrieverRegistrationChan chan *retrieverRegistration
- commandChan chan *command
- }
- // Monitor returns the system monitor if it exists.
- // Otherwise it creates it first.
- func Monitor() *SystemMonitor {
- if monitor == nil {
- // Create system monitor.
- monitor = &SystemMonitor{
- etmData: make(map[string]*MeasuringPoint),
- ssiData: make(map[string]*StaySetVariable),
- dsrData: make(map[string]DynamicStatusRetriever),
- measuringChan: make(chan *Measuring, 1000),
- valueChan: make(chan *value, 1000),
- retrieverRegistrationChan: make(chan *retrieverRegistration, 10),
- commandChan: make(chan *command),
- }
- go monitor.backend()
- }
- return monitor
- }
- // BeginMeasuring starts a new measuring with a given id.
- // All measurings with the same id will be aggregated.
- func (sm *SystemMonitor) BeginMeasuring(id string) *Measuring {
- return &Measuring{sm, id, time.Nanoseconds(), 0}
- }
- // Measure the execution of a function.
- func (sm *SystemMonitor) Measure(id string, f func()) {
- m := sm.BeginMeasuring(id)
- f()
- m.EndMeasuring()
- }
- // MeasuringPointsMap performs the function f for all measuring points
- // and returns a slice with the return values of the function that are
- // not nil.
- func (sm *SystemMonitor) MeasuringPointsMap(f func(*MeasuringPoint) interface{}) []interface{} {
- cmd := &command{cmdMeasuringPointsMap, f, make(chan interface{})}
- sm.commandChan <- cmd
- resp := <-cmd.respChan
- return resp.([]interface{})
- }
- // MeasuringPointsDo performs the function f for
- // all measuring points.
- func (sm *SystemMonitor) MeasuringPointsDo(f func(*MeasuringPoint)) {
- cmd := &command{cmdMeasuringPointsDo, f, nil}
- sm.commandChan <- cmd
- }
- // MeasuringPointsWrite prints the measuring points for which
- // the passed function returns true to the passed writer.
- func (sm *SystemMonitor) MeasuringPointsWrite(w io.Writer, ff func(*MeasuringPoint) bool) {
- pf := func(t int64) float64 { return float64(t) / 1000000.0 }
- fmt.Fprint(w, etmTLine)
- fmt.Fprint(w, etmHeader)
- fmt.Fprint(w, etmTLine)
- lines := sm.MeasuringPointsMap(func(mp *MeasuringPoint) interface{} {
- if ff(mp) {
- ops := 1e9 / mp.AvgTime
- return fmt.Sprintf(etmFormat, mp.Id, mp.Count, pf(mp.MinTime), pf(mp.MaxTime), pf(mp.AvgTime), pf(mp.TtlTime), ops)
- }
- return nil
- })
- for _, line := range lines {
- fmt.Fprint(w, line)
- }
- fmt.Fprint(w, etmTLine)
- fmt.Fprint(w, etmFooter)
- fmt.Fprint(w, etmELine)
- }
- // MeasuringPointsPrintAll prints all measuring points
- // to STDOUT.
- func (sm *SystemMonitor) MeasuringPointsPrintAll() {
- sm.MeasuringPointsWrite(os.Stdout, func(mp *MeasuringPoint) bool { return true })
- }
- // SetValue sets a value of a stay-set variable.
- func (sm *SystemMonitor) SetValue(id string, v int64) {
- sm.valueChan <- &value{id, v}
- }
- // StaySetVariablesMap performs the function f for all variables
- // and returns a slice with the return values of the function that are
- // not nil.
- func (sm *SystemMonitor) StaySetVariablesMap(f func(*StaySetVariable) interface{}) []interface{} {
- cmd := &command{cmdStaySetVariablesMap, f, make(chan interface{})}
- sm.commandChan <- cmd
- resp := <-cmd.respChan
- return resp.([]interface{})
- }
- // StaySetVariablesDo performs the function f for all
- // variables.
- func (sm *SystemMonitor) StaySetVariablesDo(f func(*StaySetVariable)) {
- cmd := &command{cmdStaySetVariablesDo, f, nil}
- sm.commandChan <- cmd
- }
- // StaySetVariablesWrite prints the stay-set variables for which
- // the passed function returns true to the passed writer.
- func (sm *SystemMonitor) StaySetVariablesWrite(w io.Writer, ff func(*StaySetVariable) bool) {
- fmt.Fprint(w, ssiTLine)
- fmt.Fprint(w, ssiHeader)
- fmt.Fprint(w, ssiTLine)
- lines := sm.StaySetVariablesMap(func(ssv *StaySetVariable) interface{} {
- if ff(ssv) {
- return fmt.Sprintf(ssiFormat, ssv.Id, ssv.Count, ssv.ActValue, ssv.MinValue, ssv.MaxValue, ssv.AvgValue)
- }
- return nil
- })
- for _, line := range lines {
- fmt.Fprint(w, line)
- }
- fmt.Fprint(w, ssiTLine)
- }
- // StaySetVariablesPrintAll prints all stay-set variables
- // to STDOUT.
- func (sm *SystemMonitor) StaySetVariablesPrintAll() {
- sm.StaySetVariablesWrite(os.Stdout, func(ssv *StaySetVariable) bool { return true })
- }
- // Register registers a new dynamic status retriever function.
- func (sm *SystemMonitor) Register(id string, rf DynamicStatusRetriever) {
- sm.retrieverRegistrationChan <- &retrieverRegistration{id, rf}
- }
- // Unregister unregisters a dynamic status retriever function.
- func (sm *SystemMonitor) Unregister(id string) {
- sm.retrieverRegistrationChan <- &retrieverRegistration{id, nil}
- }
- // DynamicStatusValuesMap performs the function f for all status values
- // and returns a slice with the return values of the function that are
- // not nil.
- func (sm *SystemMonitor) DynamicStatusValuesMap(f func(string, string) interface{}) []interface{} {
- cmd := &command{cmdDynamicStatusRetrieversMap, f, make(chan interface{})}
- sm.commandChan <- cmd
- resp := <-cmd.respChan
- return resp.([]interface{})
- }
- // DynamicStatusValuesDo performs the function f for all
- // status values.
- func (sm *SystemMonitor) DynamicStatusValuesDo(f func(string, string)) {
- cmd := &command{cmdDynamicStatusRetrieversDo, f, nil}
- sm.commandChan <- cmd
- }
- // DynamicStatusValuesWrite prints the status values for which
- // the passed function returns true to the passed writer.
- func (sm *SystemMonitor) DynamicStatusValuesWrite(w io.Writer, ff func(string, string) bool) {
- fmt.Fprint(w, dsrTLine)
- fmt.Fprint(w, dsrHeader)
- fmt.Fprint(w, dsrTLine)
- lines := sm.DynamicStatusValuesMap(func(id, dsv string) interface{} {
- if ff(id, dsv) {
- return fmt.Sprintf(dsrFormat, id, dsv)
- }
- return nil
- })
- for _, line := range lines {
- fmt.Fprint(w, line)
- }
- fmt.Fprint(w, dsrTLine)
- }
- // DynamicStatusValuesPrintAll prints all status values to STDOUT.
- func (sm *SystemMonitor) DynamicStatusValuesPrintAll() {
- sm.DynamicStatusValuesWrite(os.Stdout, func(id, dsv string) bool { return true })
- }
- // Return the supervisor.
- func (sm *SystemMonitor) Supervisor() *Supervisor {
- return GlobalSupervisor()
- }
- // Recover after an error.
- func (sm *SystemMonitor) Recover(recoverable Recoverable, err interface{}) {
- log.Printf("[cgl] recovering system monitor backend after error '%v'!", err)
- go sm.backend()
- }
- // Backend of the system monitor.
- func (sm *SystemMonitor) backend() {
- defer func() {
- HelpIfNeeded(sm, recover())
- }()
- for {
- select {
- case measuring := <-sm.measuringChan:
- // Received a new measuring.
- if mp, ok := sm.etmData[measuring.id]; ok {
- // Measuring point found.
- mp.update(measuring)
- } else {
- // New measuring point.
- sm.etmData[measuring.id] = newMeasuringPoint(measuring)
- }
- case value := <-sm.valueChan:
- // Received a new value.
- if ssv, ok := sm.ssiData[value.id]; ok {
- // Variable found.
- ssv.update(value)
- } else {
- // New stay-set variable.
- sm.ssiData[value.id] = newStaySetVariable(value)
- }
- case registration := <-sm.retrieverRegistrationChan:
- // Received a new retriever for registration.
- if registration.dsr != nil {
- // Register a new retriever.
- sm.dsrData[registration.id] = registration.dsr
- } else {
- // Deregister a retriever.
- if dsr, ok := sm.dsrData[registration.id]; ok {
- sm.dsrData[registration.id] = dsr, false
- }
- }
- case cmd := <-sm.commandChan:
- // Receivedd a command to process.
- sm.processCommand(cmd)
- }
- }
- }
- // Process a command.
- func (sm *SystemMonitor) processCommand(cmd *command) {
- switch cmd.opCode {
- case cmdMeasuringPointsMap:
- // Map the measuring points.
- var resp []interface{}
- f := cmd.args.(func(*MeasuringPoint) interface{})
- for _, mp := range sm.etmData {
- v := f(mp)
- if v != nil {
- resp = append(resp, v)
- }
- }
- cmd.respChan <- resp
- case cmdMeasuringPointsDo:
- // Iterate over the measurings.
- f := cmd.args.(func(*MeasuringPoint))
- for _, mp := range sm.etmData {
- f(mp)
- }
- case cmdStaySetVariablesMap:
- // Map the stay-set variables.
- var resp []interface{}
- f := cmd.args.(func(*StaySetVariable) interface{})
- for _, ssv := range sm.ssiData {
- v := f(ssv)
- if v != nil {
- resp = append(resp, v)
- }
- }
- cmd.respChan <- resp
- case cmdStaySetVariablesDo:
- // Iterate over the stay-set variables.
- f := cmd.args.(func(*StaySetVariable))
- for _, ssv := range sm.ssiData {
- f(ssv)
- }
- case cmdDynamicStatusRetrieversMap:
- // Map the return values of the dynamic status
- // retriever functions.
- var resp []interface{}
- f := cmd.args.(func(string, string) interface{})
- for id, dsr := range sm.dsrData {
- dsv := dsr()
- v := f(id, dsv)
- if v != nil {
- resp = append(resp, v)
- }
- }
- cmd.respChan <- resp
- case cmdDynamicStatusRetrieversDo:
- // Iterate over the return values of the
- // dynamic status retriever functions.
- f := cmd.args.(func(string, string))
- for id, dsr := range sm.dsrData {
- dsv := dsr()
- f(id, dsv)
- }
- }
- }
- //--------------------
- // ADDITIONAL MEASURING TYPES
- //--------------------
- // Measuring contains one measuring.
- type Measuring struct {
- systemMonitor *SystemMonitor
- id string
- startTime int64
- endTime int64
- }
- // EndMEasuring ends a measuring and passes it to the
- // measuring server in the background.
- func (m *Measuring) EndMeasuring() int64 {
- m.endTime = time.Nanoseconds()
- m.systemMonitor.measuringChan <- m
- return m.endTime - m.startTime
- }
- // MeasuringPoint contains the cumulated measuring
- // data of one measuring point.
- type MeasuringPoint struct {
- Id string
- Count int64
- MinTime int64
- MaxTime int64
- TtlTime int64
- AvgTime int64
- }
- // Create a new measuring point out of a measuring.
- func newMeasuringPoint(m *Measuring) *MeasuringPoint {
- time := m.endTime - m.startTime
- mp := &MeasuringPoint{
- Id: m.id,
- Count: 1,
- MinTime: time,
- MaxTime: time,
- TtlTime: time,
- AvgTime: time,
- }
- return mp
- }
- // Update a measuring point with a measuring.
- func (mp *MeasuringPoint) update(m *Measuring) {
- time := m.endTime - m.startTime
- mp.Count++
- if mp.MinTime > time {
- mp.MinTime = time
- }
- if mp.MaxTime < time {
- mp.MaxTime = time
- }
- mp.TtlTime += time
- mp.AvgTime = mp.TtlTime / mp.Count
- }
- // New value for a stay-set variable.
- type value struct {
- id string
- value int64
- }
- // StaySetVariable contains the cumulated values
- // for one stay-set variable.
- type StaySetVariable struct {
- Id string
- Count int64
- ActValue int64
- MinValue int64
- MaxValue int64
- AvgValue int64
- total int64
- }
- // Create a new stay-set variable out of a value.
- func newStaySetVariable(v *value) *StaySetVariable {
- ssv := &StaySetVariable{
- Id: v.id,
- Count: 1,
- ActValue: v.value,
- MinValue: v.value,
- MaxValue: v.value,
- AvgValue: v.value,
- }
- return ssv
- }
- // Update a stay-set variable with a value.
- func (ssv *StaySetVariable) update(v *value) {
- ssv.Count++
- ssv.ActValue = v.value
- ssv.total += v.value
- if ssv.MinValue > ssv.ActValue {
- ssv.MinValue = ssv.ActValue
- }
- if ssv.MaxValue < ssv.ActValue {
- ssv.MaxValue = ssv.ActValue
- }
- ssv.AvgValue = ssv.total / ssv.Count
- }
- // DynamicStatusRetriever is called by the server and
- // returns a current status as string.
- type DynamicStatusRetriever func() string
- // New registration of a retriever function.
- type retrieverRegistration struct {
- id string
- dsr DynamicStatusRetriever
- }
- /*
- EOF
- */
|