|
42 | 42 | import threading
|
43 | 43 | import time
|
44 | 44 |
|
45 |
| -from collections.abc import Collection |
| 45 | +from collections.abc import Callable, Collection |
46 | 46 | from dataclasses import dataclass, field
|
47 | 47 | from datetime import datetime
|
48 | 48 | from enum import Enum
|
49 | 49 | from typing import Any, Final
|
50 | 50 |
|
| 51 | +import requests |
51 | 52 | import web
|
52 | 53 |
|
53 | 54 | import _init_path # noqa: F401 Imported for its side effect of setting PYTHONPATH
|
|
61 | 62 | from openlibrary.utils.dateutil import WEEK_SECS
|
62 | 63 | from openlibrary.utils.isbn import (
|
63 | 64 | normalize_identifier,
|
64 |
| - normalize_isbn, |
65 |
| - isbn_13_to_isbn_10, |
66 | 65 | isbn_10_to_isbn_13,
|
67 | 66 | )
|
68 | 67 |
|
@@ -160,6 +159,175 @@ def to_dict(self):
|
160 | 159 | }
|
161 | 160 |
|
162 | 161 |
|
| 162 | +class BaseLookupWorker(threading.Thread): |
| 163 | + """ |
| 164 | + A base class for creating API look up workers on their own threads. |
| 165 | + """ |
| 166 | + |
| 167 | + def __init__( |
| 168 | + self, |
| 169 | + queue: queue.PriorityQueue, |
| 170 | + process_item: Callable, |
| 171 | + stats_client: stats.StatsClient, |
| 172 | + logger: logging.Logger, |
| 173 | + name: str, |
| 174 | + ) -> None: |
| 175 | + self.queue = queue |
| 176 | + self.process_item = process_item |
| 177 | + self.stats_client = stats_client |
| 178 | + self.logger = logger |
| 179 | + self.name = name |
| 180 | + |
| 181 | + def run(self): |
| 182 | + while True: |
| 183 | + try: |
| 184 | + item = self.queue.get(timeout=API_MAX_WAIT_SECONDS) |
| 185 | + self.logger.info(f"{self.name} lookup: processing item {item}") |
| 186 | + self.process_item(item) |
| 187 | + except queue.Empty: |
| 188 | + continue |
| 189 | + except Exception as e: |
| 190 | + self.logger.exception(f"{self.name} Lookup Thread died: {e}") |
| 191 | + self.stats_client.incr(f"ol.affiliate.{self.name}.lookup_thread_died") |
| 192 | + |
| 193 | + |
| 194 | +class AmazonLookupWorker(BaseLookupWorker): |
| 195 | + """ |
| 196 | + A look up worker for the Amazon Products API. |
| 197 | +
|
| 198 | + A separate thread of execution that uses the time up to API_MAX_WAIT_SECONDS to |
| 199 | + create a list of isbn_10s that is not larger than API_MAX_ITEMS_PER_CALL and then |
| 200 | + passes them to process_amazon_batch() |
| 201 | + """ |
| 202 | + |
| 203 | + def run(self): |
| 204 | + while True: |
| 205 | + start_time = time.time() |
| 206 | + asins: set[PrioritizedIdentifier] = set() # no duplicates in the batch |
| 207 | + while len(asins) < API_MAX_ITEMS_PER_CALL and self._seconds_remaining( |
| 208 | + start_time |
| 209 | + ): |
| 210 | + try: # queue.get() will block (sleep) until successful or it times out |
| 211 | + asins.add( |
| 212 | + self.queue.get(timeout=self._seconds_remaining(start_time)) |
| 213 | + ) |
| 214 | + except queue.Empty: |
| 215 | + pass |
| 216 | + |
| 217 | + self.logger.info(f"Before amazon_lookup(): {len(asins)} items") |
| 218 | + if asins: |
| 219 | + time.sleep(seconds_remaining(start_time)) |
| 220 | + try: |
| 221 | + process_amazon_batch(asins) |
| 222 | + self.logger.info(f"After amazon_lookup(): {len(asins)} items") |
| 223 | + except Exception: |
| 224 | + self.logger.exception("Amazon Lookup Thread died") |
| 225 | + self.stats_client.incr("ol.affiliate.amazon.lookup_thread_died") |
| 226 | + |
| 227 | + def _seconds_remaining(self, start_time: float) -> float: |
| 228 | + return max(API_MAX_WAIT_SECONDS - (time.time() - start_time), 0) |
| 229 | + |
| 230 | + |
| 231 | +def fetch_google_book(isbn: str) -> dict | None: |
| 232 | + """ |
| 233 | + Get Google Books metadata, if it exists. |
| 234 | + """ |
| 235 | + url = f"https://www.googleapis.com/books/v1/volumes?q=isbn:{isbn}" |
| 236 | + try: |
| 237 | + r = requests.get(url) |
| 238 | + if r.status_code == 200: |
| 239 | + return json.loads(r.json()) |
| 240 | + |
| 241 | + except Exception as e: |
| 242 | + logger.exception(f"Error processing ISBN {isbn} on Google Books: {e!s}") |
| 243 | + return None |
| 244 | + |
| 245 | + return None |
| 246 | + |
| 247 | + |
| 248 | +# TODO: See clean_amazon_metadata_for_load(). This function needs modification. |
| 249 | +def process_google_book(google_book_data: dict[str, Any]) -> dict[str, Any] | None: |
| 250 | + """ |
| 251 | + Returns a dict-edition record suitable for import via /api/import |
| 252 | +
|
| 253 | + Processing https://www.googleapis.com/books/v1/volumes?q=isbn:9785699350131: |
| 254 | + {'isbn_10': ['5699350136'], |
| 255 | + 'isbn_13': ['9785699350131'], |
| 256 | + 'title': 'Бал моей мечты', |
| 257 | + 'subtitle': '[для сред. шк. возраста]', |
| 258 | + 'authors': [{'name': 'Светлана Лубенец'}], |
| 259 | + 'source_records': ['google_books:YJ1uQwAACAAJ'], |
| 260 | + 'publishers': [], |
| 261 | + 'publish_date': '2009', |
| 262 | + 'number_of_pages': 153} |
| 263 | + """ |
| 264 | + result = {} |
| 265 | + isbn_10 = [] |
| 266 | + isbn_13 = [] |
| 267 | + |
| 268 | + if not (data := google_book_data.get("items", [])): |
| 269 | + return None |
| 270 | + |
| 271 | + if len(data) != 1: |
| 272 | + logger.warning("Google Books had more than one result for an ISBN.") |
| 273 | + return None |
| 274 | + |
| 275 | + # Permanent URL: https://www.googleapis.com/books/v1/volumes/{id} |
| 276 | + google_books_identifier = data[0].get("id") |
| 277 | + if not (book := data[0].get("volumeInfo", {})): |
| 278 | + return None |
| 279 | + |
| 280 | + # Extract ISBNs, if any. |
| 281 | + for identifier in book.get("industryIdentifiers", []): |
| 282 | + if identifier.get("type") == "ISBN_10": |
| 283 | + isbn_10.append(identifier.get("identifier")) |
| 284 | + elif identifier.get("type") == "ISBN_13": |
| 285 | + isbn_13.append(identifier.get("identifier")) |
| 286 | + |
| 287 | + result["isbn_10"] = isbn_10 if isbn_10 else [] |
| 288 | + result["isbn_13"] = isbn_13 if isbn_13 else [] |
| 289 | + |
| 290 | + result["title"] = book.get("title", "") |
| 291 | + result["subtitle"] = book.get("subtitle") |
| 292 | + result["authors"] = ( |
| 293 | + [{"name": author} for author in book.get("authors", [])] |
| 294 | + if book.get("authors") |
| 295 | + else [] |
| 296 | + ) |
| 297 | + # TODO: Needs promise time source record also, when applicable? |
| 298 | + result["source_records"] = [f"google_books:{google_books_identifier}"] |
| 299 | + # has publisher: https://www.googleapis.com/books/v1/volumes/YJ1uQwAACAAJ |
| 300 | + # does not have publisher: https://www.googleapis.com/books/v1/volumes?q=isbn:9785699350131 |
| 301 | + result["publishers"] = [book.get("publisher")] if book.get("publisher") else [] |
| 302 | + result["publish_date"] = book.get("publishedDate", "") |
| 303 | + # Language needs converting. 2 character code -> 3 character. |
| 304 | + # result["languages"] = [book.get("language")] if book.get("language") else [] |
| 305 | + result["number_of_pages"] = book.get("pageCount", None) |
| 306 | + |
| 307 | + return result |
| 308 | + |
| 309 | + |
| 310 | +def process_isbn(isbn: str) -> None: |
| 311 | + """ |
| 312 | + Process ISBNs. |
| 313 | + TODO: explain/process overview. |
| 314 | + """ |
| 315 | + if google_book_data := fetch_google_book(isbn): |
| 316 | + if google_book := process_google_book(google_book_data=google_book_data): |
| 317 | + get_current_amazon_batch().add_items( |
| 318 | + [ |
| 319 | + { |
| 320 | + 'ia_id': google_book['source_records'][0], |
| 321 | + 'status': 'staged', |
| 322 | + 'data': json.dumps(google_book), |
| 323 | + } |
| 324 | + ] |
| 325 | + ) |
| 326 | + else: |
| 327 | + # TODO: Try AMZ if nothing from Google Books? Or make async requests 'everywhere'? |
| 328 | + pass |
| 329 | + |
| 330 | + |
163 | 331 | def get_current_amazon_batch() -> Batch:
|
164 | 332 | """
|
165 | 333 | At startup, get the Amazon openlibrary.core.imports.Batch() for global use.
|
@@ -338,6 +506,7 @@ def amazon_lookup(site, stats_client, logger) -> None:
|
338 | 506 | asins.add(web.amazon_queue.get(timeout=seconds_remaining(start_time)))
|
339 | 507 | except queue.Empty:
|
340 | 508 | pass
|
| 509 | + |
341 | 510 | logger.info(f"Before amazon_lookup(): {len(asins)} items")
|
342 | 511 | if asins:
|
343 | 512 | time.sleep(seconds_remaining(start_time))
|
@@ -420,6 +589,8 @@ def GET(self, identifier: str) -> str:
|
420 | 589 | if not web.amazon_api:
|
421 | 590 | return json.dumps({"error": "not_configured"})
|
422 | 591 |
|
| 592 | + # TODO: Google Books can take ISBN 10 or ISBN 13. |
| 593 | + |
423 | 594 | b_asin, isbn_10, isbn_13 = normalize_identifier(identifier)
|
424 | 595 | if not (key := isbn_10 or b_asin):
|
425 | 596 | return json.dumps({"error": "rejected_isbn", "identifier": identifier})
|
|
0 commit comments