|
1 #!/usr/bin/env python |
|
2 # |
|
3 # Copyright 2007 Google Inc. |
|
4 # |
|
5 # Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 # you may not use this file except in compliance with the License. |
|
7 # You may obtain a copy of the License at |
|
8 # |
|
9 # http://www.apache.org/licenses/LICENSE-2.0 |
|
10 # |
|
11 # Unless required by applicable law or agreed to in writing, software |
|
12 # distributed under the License is distributed on an "AS IS" BASIS, |
|
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 # See the License for the specific language governing permissions and |
|
15 # limitations under the License. |
|
16 # |
|
17 |
|
18 """Stub version of the memcache API, keeping all data in process memory.""" |
|
19 |
|
20 |
|
21 |
|
22 import logging |
|
23 import time |
|
24 |
|
25 from google.appengine.api import memcache |
|
26 from google.appengine.api.memcache import memcache_service_pb |
|
27 |
|
28 MemcacheSetResponse = memcache_service_pb.MemcacheSetResponse |
|
29 MemcacheSetRequest = memcache_service_pb.MemcacheSetRequest |
|
30 MemcacheIncrementRequest = memcache_service_pb.MemcacheIncrementRequest |
|
31 MemcacheDeleteResponse = memcache_service_pb.MemcacheDeleteResponse |
|
32 |
|
33 |
|
34 class CacheEntry(object): |
|
35 """An entry in the cache.""" |
|
36 |
|
37 def __init__(self, value, expiration, flags, gettime): |
|
38 """Initializer. |
|
39 |
|
40 Args: |
|
41 value: String containing the data for this entry. |
|
42 expiration: Number containing the expiration time or offset in seconds |
|
43 for this entry. |
|
44 flags: Opaque flags used by the memcache implementation. |
|
45 gettime: Used for testing. Function that works like time.time(). |
|
46 """ |
|
47 assert isinstance(value, basestring) |
|
48 assert len(value) <= memcache.MAX_VALUE_SIZE |
|
49 assert isinstance(expiration, (int, long)) |
|
50 |
|
51 self._gettime = gettime |
|
52 self.value = value |
|
53 self.flags = flags |
|
54 self.created_time = self._gettime() |
|
55 self.will_expire = expiration != 0 |
|
56 self.locked = False |
|
57 self._SetExpiration(expiration) |
|
58 |
|
59 def _SetExpiration(self, expiration): |
|
60 """Sets the expiration for this entry. |
|
61 |
|
62 Args: |
|
63 expiration: Number containing the expiration time or offset in seconds |
|
64 for this entry. If expiration is above one month, then it's considered |
|
65 an absolute time since the UNIX epoch. |
|
66 """ |
|
67 if expiration > (86400 * 30): |
|
68 self.expiration_time = expiration |
|
69 else: |
|
70 self.expiration_time = self._gettime() + expiration |
|
71 |
|
72 def CheckExpired(self): |
|
73 """Returns True if this entry has expired; False otherwise.""" |
|
74 return self.will_expire and self._gettime() >= self.expiration_time |
|
75 |
|
76 def ExpireAndLock(self, timeout): |
|
77 """Marks this entry as deleted and locks it for the expiration time. |
|
78 |
|
79 Used to implement memcache's delete timeout behavior. |
|
80 |
|
81 Args: |
|
82 timeout: Parameter originally passed to memcache.delete or |
|
83 memcache.delete_multi to control deletion timeout. |
|
84 """ |
|
85 self.will_expire = True |
|
86 self.locked = True |
|
87 self._SetExpiration(timeout) |
|
88 |
|
89 def CheckLocked(self): |
|
90 """Returns True if this entry was deleted but has not yet timed out.""" |
|
91 return self.locked and not self.CheckExpired() |
|
92 |
|
93 |
|
94 class MemcacheServiceStub(object): |
|
95 """Python only memcache service stub. |
|
96 |
|
97 This stub keeps all data in the local process' memory, not in any |
|
98 external servers. |
|
99 """ |
|
100 |
|
101 def __init__(self, gettime=time.time): |
|
102 """Initializer. |
|
103 |
|
104 Args: |
|
105 gettime: time.time()-like function used for testing. |
|
106 """ |
|
107 self._gettime = gettime |
|
108 self._ResetStats() |
|
109 |
|
110 self._the_cache = {} |
|
111 |
|
112 def _ResetStats(self): |
|
113 """Resets statistics information.""" |
|
114 self._hits = 0 |
|
115 self._misses = 0 |
|
116 self._byte_hits = 0 |
|
117 |
|
118 def MakeSyncCall(self, service, call, request, response): |
|
119 """The main RPC entry point. |
|
120 |
|
121 Args: |
|
122 service: Must be name as defined by sub class variable SERVICE. |
|
123 call: A string representing the rpc to make. Must be part of |
|
124 MemcacheService. |
|
125 request: A protocol buffer of the type corresponding to 'call'. |
|
126 response: A protocol buffer of the type corresponding to 'call'. |
|
127 """ |
|
128 assert service == 'memcache' |
|
129 assert request.IsInitialized() |
|
130 |
|
131 attr = getattr(self, '_Dynamic_' + call) |
|
132 attr(request, response) |
|
133 |
|
134 def _GetKey(self, key): |
|
135 """Retrieves a CacheEntry from the cache if it hasn't expired. |
|
136 |
|
137 Does not take deletion timeout into account. |
|
138 |
|
139 Args: |
|
140 key: The key to retrieve from the cache. |
|
141 |
|
142 Returns: |
|
143 The corresponding CacheEntry instance, or None if it was not found or |
|
144 has already expired. |
|
145 """ |
|
146 entry = self._the_cache.get(key, None) |
|
147 if entry is None: |
|
148 return None |
|
149 elif entry.CheckExpired(): |
|
150 del self._the_cache[key] |
|
151 return None |
|
152 else: |
|
153 return entry |
|
154 |
|
155 def _Dynamic_Get(self, request, response): |
|
156 """Implementation of MemcacheService::Get(). |
|
157 |
|
158 Args: |
|
159 request: A MemcacheGetRequest. |
|
160 response: A MemcacheGetResponse. |
|
161 """ |
|
162 keys = set(request.key_list()) |
|
163 for key in keys: |
|
164 entry = self._GetKey(key) |
|
165 if entry is None or entry.CheckLocked(): |
|
166 self._misses += 1 |
|
167 continue |
|
168 self._hits += 1 |
|
169 self._byte_hits += len(entry.value) |
|
170 item = response.add_item() |
|
171 item.set_key(key) |
|
172 item.set_value(entry.value) |
|
173 item.set_flags(entry.flags) |
|
174 |
|
175 def _Dynamic_Set(self, request, response): |
|
176 """Implementation of MemcacheService::Set(). |
|
177 |
|
178 Args: |
|
179 request: A MemcacheSetRequest. |
|
180 response: A MemcacheSetResponse. |
|
181 """ |
|
182 for item in request.item_list(): |
|
183 key = item.key() |
|
184 set_policy = item.set_policy() |
|
185 old_entry = self._GetKey(key) |
|
186 |
|
187 set_status = MemcacheSetResponse.NOT_STORED |
|
188 if ((set_policy == MemcacheSetRequest.SET) or |
|
189 (set_policy == MemcacheSetRequest.ADD and old_entry is None) or |
|
190 (set_policy == MemcacheSetRequest.REPLACE and old_entry is not None)): |
|
191 |
|
192 if (old_entry is None or |
|
193 set_policy == MemcacheSetRequest.SET |
|
194 or not old_entry.CheckLocked()): |
|
195 self._the_cache[key] = CacheEntry(item.value(), |
|
196 item.expiration_time(), |
|
197 item.flags(), |
|
198 gettime=self._gettime) |
|
199 set_status = MemcacheSetResponse.STORED |
|
200 |
|
201 response.add_set_status(set_status) |
|
202 |
|
203 def _Dynamic_Delete(self, request, response): |
|
204 """Implementation of MemcacheService::Delete(). |
|
205 |
|
206 Args: |
|
207 request: A MemcacheDeleteRequest. |
|
208 response: A MemcacheDeleteResponse. |
|
209 """ |
|
210 for item in request.item_list(): |
|
211 key = item.key() |
|
212 entry = self._GetKey(key) |
|
213 |
|
214 delete_status = MemcacheDeleteResponse.DELETED |
|
215 if entry is None: |
|
216 delete_status = MemcacheDeleteResponse.NOT_FOUND |
|
217 elif item.delete_time == 0: |
|
218 del self._the_cache[key] |
|
219 else: |
|
220 entry.ExpireAndLock(item.delete_time()) |
|
221 |
|
222 response.add_delete_status(delete_status) |
|
223 |
|
224 def _Dynamic_Increment(self, request, response): |
|
225 """Implementation of MemcacheService::Increment(). |
|
226 |
|
227 Args: |
|
228 request: A MemcacheIncrementRequest. |
|
229 response: A MemcacheIncrementResponse. |
|
230 """ |
|
231 key = request.key() |
|
232 entry = self._GetKey(key) |
|
233 if entry is None: |
|
234 return |
|
235 |
|
236 try: |
|
237 old_value = long(entry.value) |
|
238 except ValueError, e: |
|
239 logging.error('Increment/decrement failed: Could not interpret ' |
|
240 'value for key = "%s" as an integer.', key) |
|
241 return |
|
242 |
|
243 delta = request.delta() |
|
244 if request.direction() == MemcacheIncrementRequest.DECREMENT: |
|
245 delta = -delta |
|
246 |
|
247 new_value = old_value + delta |
|
248 if not (0 <= new_value < 2**64): |
|
249 new_value = 0 |
|
250 |
|
251 entry.value = str(new_value) |
|
252 response.set_new_value(new_value) |
|
253 |
|
254 def _Dynamic_FlushAll(self, request, response): |
|
255 """Implementation of MemcacheService::FlushAll(). |
|
256 |
|
257 Args: |
|
258 request: A MemcacheFlushRequest. |
|
259 response: A MemcacheFlushResponse. |
|
260 """ |
|
261 self._the_cache.clear() |
|
262 self._ResetStats() |
|
263 |
|
264 def _Dynamic_Stats(self, request, response): |
|
265 """Implementation of MemcacheService::Stats(). |
|
266 |
|
267 Args: |
|
268 request: A MemcacheStatsRequest. |
|
269 response: A MemcacheStatsResponse. |
|
270 """ |
|
271 stats = response.mutable_stats() |
|
272 stats.set_hits(self._hits) |
|
273 stats.set_misses(self._misses) |
|
274 stats.set_byte_hits(self._byte_hits) |
|
275 stats.set_items(len(self._the_cache)) |
|
276 |
|
277 total_bytes = 0 |
|
278 for key, entry in self._the_cache.iteritems(): |
|
279 total_bytes += len(entry.value) |
|
280 stats.set_bytes(total_bytes) |
|
281 |
|
282 stats.set_oldest_item_age(1800) |