|
1 #!/usr/bin/env python |
|
2 # |
|
3 # Copyright 2007 Google Inc. |
|
4 # |
|
5 # Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 # you may not use this file except in compliance with the License. |
|
7 # You may obtain a copy of the License at |
|
8 # |
|
9 # http://www.apache.org/licenses/LICENSE-2.0 |
|
10 # |
|
11 # Unless required by applicable law or agreed to in writing, software |
|
12 # distributed under the License is distributed on an "AS IS" BASIS, |
|
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 # See the License for the specific language governing permissions and |
|
15 # limitations under the License. |
|
16 # |
|
17 |
|
18 """An apiproxy stub that calls a remote handler via HTTP. |
|
19 |
|
20 This allows easy remote access to the App Engine datastore, and potentially any |
|
21 of the other App Engine APIs, using the same interface you use when accessing |
|
22 the service locally. |
|
23 |
|
24 An example Python script: |
|
25 --- |
|
26 from google.appengine.ext import db |
|
27 from google.appengine.ext.remote_api import remote_api_stub |
|
28 from myapp import models |
|
29 import getpass |
|
30 |
|
31 def auth_func(): |
|
32 return (raw_input('Username:'), getpass.getpass('Password:')) |
|
33 |
|
34 remote_api_stub.ConfigureRemoteDatastore('my-app', '/remote_api', auth_func) |
|
35 |
|
36 # Now you can access the remote datastore just as if your code was running on |
|
37 # App Engine! |
|
38 |
|
39 houses = models.House.all().fetch(100) |
|
40 for a_house in q: |
|
41 a_house.doors += 1 |
|
42 db.put(houses) |
|
43 --- |
|
44 |
|
45 A few caveats: |
|
46 - Where possible, avoid iterating over queries directly. Fetching as many |
|
47 results as you will need is faster and more efficient. |
|
48 - If you need to iterate, consider instead fetching items in batches with a sort |
|
49 order and constructing a new query starting from where the previous one left |
|
50 off. The __key__ pseudo-property can be used as a sort key for this purpose, |
|
51 and does not even require a custom index if you are iterating over all |
|
52 entities of a given type. |
|
53 - Likewise, it's a good idea to put entities in batches. Instead of calling put |
|
54 for each individual entity, accumulate them and put them in batches using |
|
55 db.put(), if you can. |
|
56 - Requests and responses are still limited to 1MB each, so if you have large |
|
57 entities or try and fetch or put many of them at once, your requests may fail. |
|
58 """ |
|
59 |
|
60 |
|
61 |
|
62 |
|
63 |
|
64 import os |
|
65 import pickle |
|
66 import sha |
|
67 import sys |
|
68 import thread |
|
69 import threading |
|
70 from google.appengine.api import apiproxy_stub_map |
|
71 from google.appengine.datastore import datastore_pb |
|
72 from google.appengine.ext.remote_api import remote_api_pb |
|
73 from google.appengine.runtime import apiproxy_errors |
|
74 from google.appengine.tools import appengine_rpc |
|
75 |
|
76 |
|
77 def GetUserAgent(): |
|
78 """Determines the value of the 'User-agent' header to use for HTTP requests. |
|
79 |
|
80 Returns: |
|
81 String containing the 'user-agent' header value, which includes the SDK |
|
82 version, the platform information, and the version of Python; |
|
83 e.g., "remote_api/1.0.1 Darwin/9.2.0 Python/2.5.2". |
|
84 """ |
|
85 product_tokens = [] |
|
86 |
|
87 product_tokens.append("Google-remote_api/1.0") |
|
88 |
|
89 product_tokens.append(appengine_rpc.GetPlatformToken()) |
|
90 |
|
91 python_version = ".".join(str(i) for i in sys.version_info) |
|
92 product_tokens.append("Python/%s" % python_version) |
|
93 |
|
94 return " ".join(product_tokens) |
|
95 |
|
96 |
|
97 def GetSourceName(): |
|
98 return "Google-remote_api-1.0" |
|
99 |
|
100 |
|
101 class TransactionData(object): |
|
102 """Encapsulates data about an individual transaction.""" |
|
103 |
|
104 def __init__(self, thread_id): |
|
105 self.thread_id = thread_id |
|
106 self.preconditions = {} |
|
107 self.entities = {} |
|
108 |
|
109 |
|
110 class RemoteStub(object): |
|
111 """A stub for calling services on a remote server over HTTP. |
|
112 |
|
113 You can use this to stub out any service that the remote server supports. |
|
114 """ |
|
115 |
|
116 def __init__(self, server, path): |
|
117 """Constructs a new RemoteStub that communicates with the specified server. |
|
118 |
|
119 Args: |
|
120 server: An instance of a subclass of |
|
121 google.appengine.tools.appengine_rpc.AbstractRpcServer. |
|
122 path: The path to the handler this stub should send requests to. |
|
123 """ |
|
124 self._server = server |
|
125 self._path = path |
|
126 |
|
127 def MakeSyncCall(self, service, call, request, response): |
|
128 request_pb = remote_api_pb.Request() |
|
129 request_pb.set_service_name(service) |
|
130 request_pb.set_method(call) |
|
131 request_pb.mutable_request().set_contents(request.Encode()) |
|
132 |
|
133 response_pb = remote_api_pb.Response() |
|
134 response_pb.ParseFromString(self._server.Send(self._path, |
|
135 request_pb.Encode())) |
|
136 |
|
137 if response_pb.has_exception(): |
|
138 raise pickle.loads(response_pb.exception().contents()) |
|
139 else: |
|
140 response.ParseFromString(response_pb.response().contents()) |
|
141 |
|
142 |
|
143 class RemoteDatastoreStub(RemoteStub): |
|
144 """A specialised stub for accessing the App Engine datastore remotely. |
|
145 |
|
146 A specialised stub is required because there are some datastore operations |
|
147 that preserve state between calls. This stub makes queries possible. |
|
148 Transactions on the remote datastore are unfortunately still impossible. |
|
149 """ |
|
150 |
|
151 def __init__(self, server, path): |
|
152 super(RemoteDatastoreStub, self).__init__(server, path) |
|
153 self.__queries = {} |
|
154 self.__transactions = {} |
|
155 |
|
156 self.__next_local_cursor = 1 |
|
157 self.__local_cursor_lock = threading.Lock() |
|
158 self.__next_local_tx = 1 |
|
159 self.__local_tx_lock = threading.Lock() |
|
160 |
|
161 def MakeSyncCall(self, service, call, request, response): |
|
162 assert service == 'datastore_v3' |
|
163 |
|
164 explanation = [] |
|
165 assert request.IsInitialized(explanation), explanation |
|
166 |
|
167 handler = getattr(self, '_Dynamic_' + call, None) |
|
168 if handler: |
|
169 handler(request, response) |
|
170 else: |
|
171 super(RemoteDatastoreStub, self).MakeSyncCall(service, call, request, |
|
172 response) |
|
173 |
|
174 assert response.IsInitialized(explanation), explanation |
|
175 |
|
176 def _Dynamic_RunQuery(self, query, query_result): |
|
177 self.__local_cursor_lock.acquire() |
|
178 try: |
|
179 cursor_id = self.__next_local_cursor |
|
180 self.__next_local_cursor += 1 |
|
181 finally: |
|
182 self.__local_cursor_lock.release() |
|
183 self.__queries[cursor_id] = query |
|
184 |
|
185 query_result.mutable_cursor().set_cursor(cursor_id) |
|
186 query_result.set_more_results(True) |
|
187 |
|
188 def _Dynamic_Next(self, next_request, query_result): |
|
189 cursor = next_request.cursor().cursor() |
|
190 if cursor not in self.__queries: |
|
191 raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST, |
|
192 'Cursor %d not found' % cursor) |
|
193 query = self.__queries[cursor] |
|
194 |
|
195 if query is None: |
|
196 query_result.set_more_results(False) |
|
197 return |
|
198 |
|
199 request = datastore_pb.Query() |
|
200 request.CopyFrom(query) |
|
201 if request.has_limit(): |
|
202 request.set_limit(min(request.limit(), next_request.count())) |
|
203 else: |
|
204 request.set_limit(next_request.count()) |
|
205 |
|
206 super(RemoteDatastoreStub, self).MakeSyncCall( |
|
207 'remote_datastore', 'RunQuery', request, query_result) |
|
208 |
|
209 query.set_offset(query.offset() + query_result.result_size()) |
|
210 if query.has_limit(): |
|
211 query.set_limit(query.limit() - query_result.result_size()) |
|
212 if not query_result.more_results(): |
|
213 self.__queries[cursor] = None |
|
214 |
|
215 def _Dynamic_Get(self, get_request, get_response): |
|
216 txid = None |
|
217 if get_request.has_transaction(): |
|
218 txid = get_request.transaction().handle() |
|
219 txdata = self.__transactions[txid] |
|
220 assert (txdata.thread_id == thread.get_ident(), |
|
221 "Transactions are single-threaded.") |
|
222 |
|
223 keys = [(k, k.Encode()) for k in get_request.key_list()] |
|
224 |
|
225 new_request = datastore_pb.GetRequest() |
|
226 for key, enckey in keys: |
|
227 if enckey not in txdata.entities: |
|
228 new_request.add_key().CopyFrom(key) |
|
229 else: |
|
230 new_request = get_request |
|
231 |
|
232 if new_request.key_size() > 0: |
|
233 super(RemoteDatastoreStub, self).MakeSyncCall( |
|
234 'datastore_v3', 'Get', new_request, get_response) |
|
235 |
|
236 if txid is not None: |
|
237 newkeys = new_request.key_list() |
|
238 entities = get_response.entity_list() |
|
239 for key, entity in zip(newkeys, entities): |
|
240 entity_hash = None |
|
241 if entity.has_entity(): |
|
242 entity_hash = sha.new(entity.entity().Encode()).digest() |
|
243 txdata.preconditions[key.Encode()] = (key, entity_hash) |
|
244 |
|
245 new_response = datastore_pb.GetResponse() |
|
246 it = iter(get_response.entity_list()) |
|
247 for key, enckey in keys: |
|
248 if enckey in txdata.entities: |
|
249 cached_entity = txdata.entities[enckey][1] |
|
250 if cached_entity: |
|
251 new_response.add_entity().mutable_entity().CopyFrom(cached_entity) |
|
252 else: |
|
253 new_response.add_entity() |
|
254 else: |
|
255 new_entity = it.next() |
|
256 if new_entity.has_entity(): |
|
257 assert new_entity.entity().key() == key |
|
258 new_response.add_entity().CopyFrom(new_entity) |
|
259 else: |
|
260 new_response.add_entity() |
|
261 get_response.CopyFrom(new_response) |
|
262 |
|
263 def _Dynamic_Put(self, put_request, put_response): |
|
264 if put_request.has_transaction(): |
|
265 entities = put_request.entity_list() |
|
266 |
|
267 requires_id = lambda x: x.id() == 0 and not x.has_name() |
|
268 new_ents = [e for e in entities |
|
269 if requires_id(e.key().path().element_list()[-1])] |
|
270 id_request = remote_api_pb.PutRequest() |
|
271 if new_ents: |
|
272 for ent in new_ents: |
|
273 e = id_request.add_entity() |
|
274 e.mutable_key().CopyFrom(ent.key()) |
|
275 e.mutable_entity_group() |
|
276 id_response = datastore_pb.PutResponse() |
|
277 super(RemoteDatastoreStub, self).MakeSyncCall( |
|
278 'remote_datastore', 'GetIDs', id_request, id_response) |
|
279 assert id_request.entity_size() == id_response.key_size() |
|
280 for key, ent in zip(id_response.key_list(), new_ents): |
|
281 ent.mutable_key().CopyFrom(key) |
|
282 ent.mutable_entity_group().add_element().CopyFrom( |
|
283 key.path().element(0)) |
|
284 |
|
285 txid = put_request.transaction().handle() |
|
286 txdata = self.__transactions[txid] |
|
287 assert (txdata.thread_id == thread.get_ident(), |
|
288 "Transactions are single-threaded.") |
|
289 for entity in entities: |
|
290 txdata.entities[entity.key().Encode()] = (entity.key(), entity) |
|
291 put_response.add_key().CopyFrom(entity.key()) |
|
292 else: |
|
293 super(RemoteDatastoreStub, self).MakeSyncCall( |
|
294 'datastore_v3', 'Put', put_request, put_response) |
|
295 |
|
296 def _Dynamic_Delete(self, delete_request, response): |
|
297 if delete_request.has_transaction(): |
|
298 txid = delete_request.transaction().handle() |
|
299 txdata = self.__transactions[txid] |
|
300 assert (txdata.thread_id == thread.get_ident(), |
|
301 "Transactions are single-threaded.") |
|
302 for key in delete_request.key_list(): |
|
303 txdata.entities[key.Encode()] = (key, None) |
|
304 else: |
|
305 super(RemoteDatastoreStub, self).MakeSyncCall( |
|
306 'datastore_v3', 'Delete', delete_request, response) |
|
307 |
|
308 def _Dynamic_BeginTransaction(self, request, transaction): |
|
309 self.__local_tx_lock.acquire() |
|
310 try: |
|
311 txid = self.__next_local_tx |
|
312 self.__transactions[txid] = TransactionData(thread.get_ident()) |
|
313 self.__next_local_tx += 1 |
|
314 finally: |
|
315 self.__local_tx_lock.release() |
|
316 transaction.set_handle(txid) |
|
317 |
|
318 def _Dynamic_Commit(self, transaction, transaction_response): |
|
319 txid = transaction.handle() |
|
320 if txid not in self.__transactions: |
|
321 raise apiproxy_errors.ApplicationError( |
|
322 datastore_pb.Error.BAD_REQUEST, |
|
323 'Transaction %d not found.' % (txid,)) |
|
324 |
|
325 txdata = self.__transactions[txid] |
|
326 assert (txdata.thread_id == thread.get_ident(), |
|
327 "Transactions are single-threaded.") |
|
328 del self.__transactions[txid] |
|
329 |
|
330 tx = remote_api_pb.TransactionRequest() |
|
331 for key, hash in txdata.preconditions.values(): |
|
332 precond = tx.add_precondition() |
|
333 precond.mutable_key().CopyFrom(key) |
|
334 if hash: |
|
335 precond.set_hash(hash) |
|
336 |
|
337 puts = tx.mutable_puts() |
|
338 deletes = tx.mutable_deletes() |
|
339 for key, entity in txdata.entities.values(): |
|
340 if entity: |
|
341 puts.add_entity().CopyFrom(entity) |
|
342 else: |
|
343 deletes.add_key().CopyFrom(key) |
|
344 |
|
345 super(RemoteDatastoreStub, self).MakeSyncCall( |
|
346 'remote_datastore', 'Transaction', |
|
347 tx, datastore_pb.PutResponse()) |
|
348 |
|
349 def _Dynamic_Rollback(self, transaction, transaction_response): |
|
350 txid = transaction.handle() |
|
351 self.__local_tx_lock.acquire() |
|
352 try: |
|
353 if txid not in self.__transactions: |
|
354 raise apiproxy_errors.ApplicationError( |
|
355 datastore_pb.Error.BAD_REQUEST, |
|
356 'Transaction %d not found.' % (txid,)) |
|
357 |
|
358 assert (txdata[txid].thread_id == thread.get_ident(), |
|
359 "Transactions are single-threaded.") |
|
360 del self.__transactions[txid] |
|
361 finally: |
|
362 self.__local_tx_lock.release() |
|
363 |
|
364 def _Dynamic_CreateIndex(self, index, id_response): |
|
365 raise apiproxy_errors.CapabilityDisabledError( |
|
366 'The remote datastore does not support index manipulation.') |
|
367 |
|
368 def _Dynamic_UpdateIndex(self, index, void): |
|
369 raise apiproxy_errors.CapabilityDisabledError( |
|
370 'The remote datastore does not support index manipulation.') |
|
371 |
|
372 def _Dynamic_DeleteIndex(self, index, void): |
|
373 raise apiproxy_errors.CapabilityDisabledError( |
|
374 'The remote datastore does not support index manipulation.') |
|
375 |
|
376 |
|
377 def ConfigureRemoteDatastore(app_id, |
|
378 path, |
|
379 auth_func, |
|
380 servername=None, |
|
381 rpc_server_factory=appengine_rpc.HttpRpcServer): |
|
382 """Does necessary setup to allow easy remote access to an AppEngine datastore. |
|
383 |
|
384 Args: |
|
385 app_id: The app_id of your app, as declared in app.yaml. |
|
386 path: The path to the remote_api handler for your app |
|
387 (for example, '/remote_api'). |
|
388 auth_func: A function that takes no arguments and returns a |
|
389 (username, password) tuple. This will be called if your application |
|
390 requires authentication to access the remote_api handler (it should!) |
|
391 and you do not already have a valid auth cookie. |
|
392 servername: The hostname your app is deployed on. Defaults to |
|
393 <app_id>.appspot.com. |
|
394 rpc_server_factory: A factory to construct the rpc server for the datastore. |
|
395 """ |
|
396 if not servername: |
|
397 servername = '%s.appspot.com' % (app_id,) |
|
398 os.environ['APPLICATION_ID'] = app_id |
|
399 apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() |
|
400 server = rpc_server_factory(servername, auth_func, GetUserAgent(), |
|
401 GetSourceName()) |
|
402 stub = RemoteDatastoreStub(server, path) |
|
403 apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', stub) |