cglsup.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. /*
  2. Tideland Common Go Library - Supervision
  3. Copyright (C) 2011 Frank Mueller / Oldenburg / Germany
  4. Redistribution and use in source and binary forms, with or
  5. modification, are permitted provided that the following conditions are
  6. met:
  7. Redistributions of source code must retain the above copyright notice, this
  8. list of conditions and the following disclaimer.
  9. Redistributions in binary form must reproduce the above copyright notice,
  10. this list of conditions and the following disclaimer in the documentation
  11. and/or other materials provided with the distribution.
  12. Neither the name of Tideland nor the names of its contributors may be
  13. used to endorse or promote products derived from this software without
  14. specific prior written permission.
  15. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  16. AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  17. IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  18. ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  19. LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  20. CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  21. SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  22. INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  23. CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  24. ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  25. THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. package cgl
  28. //--------------------
  29. // IMPORTS
  30. //--------------------
  31. import (
  32. "log"
  33. "os"
  34. "time"
  35. )
  36. //--------------------
  37. // GLOBAL VARIABLES
  38. //--------------------
  39. var supervisor *Supervisor
  40. //--------------------
  41. // INIT
  42. //--------------------
  43. func init() {
  44. supervisor = NewSupervisor(nil)
  45. }
  46. //--------------------
  47. // FUNCTIONS
  48. //--------------------
  49. // Return the global supervisor.
  50. func GlobalSupervisor() *Supervisor {
  51. return supervisor
  52. }
  53. //--------------------
  54. // RECOVERABLE
  55. //--------------------
  56. // The interface for recoverable types.
  57. type Recoverable interface {
  58. Supervisor() *Supervisor
  59. Recover(Recoverable, interface{})
  60. }
  61. //--------------------
  62. // SUPERVISOR
  63. //--------------------
  64. // Message: Add a recoverable for mass recovering.
  65. type addRecoverableMsg struct {
  66. id string
  67. r Recoverable
  68. }
  69. // Message: Cry for help after an error.
  70. type cryForHelpMsg struct {
  71. r Recoverable
  72. err interface{}
  73. }
  74. // The supervisor itself.
  75. type Supervisor struct {
  76. supervisor *Supervisor
  77. recoverables map[string]Recoverable
  78. addChan chan *addRecoverableMsg
  79. helpChan chan *cryForHelpMsg
  80. }
  81. // Create a new supervisor.
  82. func NewSupervisor(parent *Supervisor) *Supervisor {
  83. s := &Supervisor{
  84. supervisor: parent,
  85. recoverables: make(map[string]Recoverable),
  86. addChan: make(chan *addRecoverableMsg),
  87. helpChan: make(chan *cryForHelpMsg),
  88. }
  89. go s.backend()
  90. return s
  91. }
  92. // Add a recoverable for joint restart in case of an error.
  93. func (s *Supervisor) AddRecoverable(id string, r Recoverable) {
  94. s.addChan <- &addRecoverableMsg{id, r}
  95. }
  96. // Let a recoverable cry for help at its supervisor.
  97. func (s *Supervisor) Help(r Recoverable, err interface{}) {
  98. s.helpChan <- &cryForHelpMsg{r, err}
  99. }
  100. // Implement Supervisor() of the recoverable interface for the supervisor itself.
  101. func (s *Supervisor) Supervisor() *Supervisor {
  102. return s.supervisor
  103. }
  104. // Implement Recover() of the recoverable interface for the supervisor itself.
  105. func (s *Supervisor) Recover(r Recoverable, err interface{}) {
  106. if s == r {
  107. go s.backend()
  108. }
  109. }
  110. // Backend goroutine of the supervisor.
  111. func (s *Supervisor) backend() {
  112. defer func() {
  113. // Test for error and cry for help
  114. // if needed.
  115. HelpIfNeeded(s, recover())
  116. }()
  117. // Wait for cries for help.
  118. for {
  119. select {
  120. case add := <-s.addChan:
  121. s.recoverables[add.id] = add.r
  122. case cfh := <-s.helpChan:
  123. if len(s.recoverables) > 0 {
  124. // Recover all recoverables.
  125. done := false
  126. for _, recoverable := range s.recoverables {
  127. recoverable.Recover(recoverable, cfh.err)
  128. if recoverable == cfh.r {
  129. done = true
  130. }
  131. }
  132. // Erroreous recoverable is not registered.
  133. if !done {
  134. cfh.r.Recover(cfh.r, cfh.err)
  135. }
  136. } else {
  137. // Recover the erroreous recoverable.
  138. cfh.r.Recover(cfh.r, cfh.err)
  139. }
  140. }
  141. }
  142. }
  143. //--------------------
  144. // HEARTBEATABLE
  145. //--------------------
  146. // The interface for heartbeatable types.
  147. type Heartbeatable interface {
  148. Recoverable
  149. SetHearbeat(*Heartbeat)
  150. }
  151. //--------------------
  152. // HEARBEAT
  153. //--------------------
  154. // Heartbeat for one recoverable.
  155. type Heartbeat struct {
  156. recoverable Recoverable
  157. ticker *time.Ticker
  158. openTicks int64
  159. HeartbeatChan chan *Heartbeat
  160. ImAliveChan chan bool
  161. }
  162. // Create a new heartbeat.
  163. func NewHeartbeat(r Recoverable, ns int64) *Heartbeat {
  164. h := &Heartbeat{
  165. recoverable: r,
  166. ticker: time.NewTicker(ns),
  167. openTicks: 0,
  168. HeartbeatChan: make(chan *Heartbeat),
  169. ImAliveChan: make(chan bool),
  170. }
  171. go h.backend()
  172. return h
  173. }
  174. // Backend goroutine of the heartbeat.
  175. func (h *Heartbeat) backend() {
  176. for {
  177. select {
  178. case <-h.ticker.C:
  179. // Check open ticks.
  180. if h.openTicks > 0 {
  181. h.recoverBelated()
  182. } else {
  183. h.sendHeartbeat()
  184. }
  185. case <-h.ImAliveChan:
  186. // Reduce number of open ticks.
  187. if h.openTicks > 0 {
  188. h.openTicks--
  189. }
  190. }
  191. }
  192. }
  193. // Recover a belated recaverable.
  194. func (h *Heartbeat) recoverBelated() {
  195. err := os.NewError("Belated recoverable!")
  196. if h.recoverable.Supervisor() != nil {
  197. // Cry for help using the supervisor.
  198. h.recoverable.Supervisor().Help(h.recoverable, err)
  199. } else {
  200. // Recover directly.
  201. h.recoverable.Recover(h.recoverable, err)
  202. }
  203. h.openTicks = 0
  204. }
  205. // Send a heartbeat.
  206. func (h *Heartbeat) sendHeartbeat() {
  207. select {
  208. case h.HeartbeatChan <- h:
  209. break
  210. default:
  211. log.Printf("Heartbeat can't be sent!")
  212. }
  213. h.openTicks++
  214. }
  215. //--------------------
  216. // CONVENIENCE FUNCTIONS
  217. //--------------------
  218. // Tell the supervisor to help if
  219. // the passed error is not nil.
  220. func HelpIfNeeded(r Recoverable, err interface{}) {
  221. // Test for error.
  222. if err != nil {
  223. // Test for configured supervisor.
  224. if r.Supervisor() != nil {
  225. // Cry for help.
  226. r.Supervisor().Help(r, err)
  227. }
  228. }
  229. }
  230. // Send a heartbeat.
  231. func ImAlive(h *Heartbeat) {
  232. h.ImAliveChan <- true
  233. }
  234. /*
  235. EOF
  236. */