store.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package main
  2. import (
  3. "encoding/gob"
  4. "errors"
  5. "io"
  6. "log"
  7. "os"
  8. "net/rpc"
  9. "sync"
  10. )
  11. const saveQueueLength = 1000
  12. type Store interface {
  13. Put(url, key *string) error
  14. Get(key, url *string) error
  15. }
  16. type ProxyStore struct {
  17. urls *URLStore // local cache
  18. client *rpc.Client
  19. }
  20. type URLStore struct {
  21. urls map[string]string
  22. mu sync.RWMutex
  23. save chan record
  24. }
  25. type record struct {
  26. Key, URL string
  27. }
  28. func NewURLStore(filename string) *URLStore {
  29. s := &URLStore{urls: make(map[string]string)}
  30. if filename != "" {
  31. s.save = make(chan record, saveQueueLength)
  32. if err := s.load(filename); err != nil {
  33. log.Println("Error loading URLStore: ", err)
  34. }
  35. go s.saveLoop(filename)
  36. }
  37. return s
  38. }
  39. func (s *URLStore) Get(key, url *string) error {
  40. s.mu.RLock()
  41. defer s.mu.RUnlock()
  42. if u, ok := s.urls[*key]; ok {
  43. *url = u
  44. return nil
  45. }
  46. return errors.New("key not found")
  47. }
  48. func (s *URLStore) Set(key, url *string) error {
  49. s.mu.Lock()
  50. defer s.mu.Unlock()
  51. if _, present := s.urls[*key]; present {
  52. return errors.New("key already exists")
  53. }
  54. s.urls[*key] = *url
  55. return nil
  56. }
  57. func (s *URLStore) Count() int {
  58. s.mu.RLock()
  59. defer s.mu.RUnlock()
  60. return len(s.urls)
  61. }
  62. func (s *URLStore) Put(url, key *string) error {
  63. for {
  64. *key = genKey(s.Count())
  65. if err := s.Set(key, url); err == nil {
  66. break
  67. }
  68. }
  69. if s.save != nil {
  70. s.save <- record{*key, *url}
  71. }
  72. return nil
  73. }
  74. func (s *URLStore) load(filename string) error {
  75. f, err := os.Open(filename)
  76. if err != nil {
  77. return err
  78. }
  79. defer f.Close()
  80. d := gob.NewDecoder(f)
  81. for err == nil {
  82. var r record
  83. if err = d.Decode(&r); err == nil {
  84. s.Set(&r.Key, &r.URL)
  85. }
  86. }
  87. if err == io.EOF {
  88. return nil
  89. }
  90. return err
  91. }
  92. func (s *URLStore) saveLoop(filename string) {
  93. f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
  94. if err != nil {
  95. log.Fatal("Error opening URLStore: ", err)
  96. }
  97. e := gob.NewEncoder(f)
  98. for {
  99. r := <-s.save
  100. if err := e.Encode(r); err != nil {
  101. log.Println("Error saving to URLStore: ", err)
  102. }
  103. }
  104. }
  105. func NewProxyStore(addr string) *ProxyStore {
  106. client, err := rpc.DialHTTP("tcp", addr)
  107. if err != nil {
  108. log.Println("Error constructing ProxyStore: ", err)
  109. // return // ?
  110. }
  111. return &ProxyStore{urls: NewURLStore(""), client: client}
  112. }
  113. func (s *ProxyStore) Get(key, url *string) error {
  114. if err := s.urls.Get(key, url); err == nil {
  115. return nil
  116. }
  117. // rpc call to master:
  118. if err := s.client.Call("Store.Get", key, url); err != nil {
  119. return err
  120. }
  121. s.urls.Set(key, url) // update local cache
  122. return nil
  123. }
  124. func (s *ProxyStore) Put(url, key *string) error {
  125. // rpc call to master:
  126. if err := s.client.Call("Store.Put", url, key); err != nil {
  127. return err
  128. }
  129. s.urls.Set(key, url) // update local cache
  130. return nil
  131. }