Data Pipeline Architecture (Bookish)

Posting this for posterity, built around 2012-2013. The startup that was Bookish (joint venture between Simon & Schuster, Penguin and Hachette) is no longer around, but we built out some reasonable book data processing infrastructure, along with a search and recommendation engine. I ran engineering and architected the overall solution.

Business Objectives

  • Readers hub: Become the destination of choice for book-lovers.
  • Book discovery: Help readers find their next great book through a best-in-class recommendation engine.
  • Establish a new sales channel: direct-to-consumer sales through a new eCommerce portal.

Conceptual View

Data Ingest

  • Book catalog data: XML files from 20 publishers, 3 data aggregators. ONIX standard.
  • 30 million+ books (most with multiple ISBNs and often multiple contributors).
  • 100 million images (covers per ISBN and authors).
  • 600k ePubs.
  • Other enrichment data: editorial reviews, bestseller lists, comp titles, news articles, etc.

Challenges and Risks

  • Poor quality data from decades old systems.
  • ONIX standard 2.1, released in 2003, outdated.
  • No common identifiers for books or authors.
  • Computationally expensive graph clustering solution for deduping.
  • Undocumented business rules about field and source precedence.
  • Complicated legacy architecture. Inherited components built in multiple languages (Scala, Ruby, JavaScript), multiple application servers and two different databases (Postgres and MongoDB).
  • Big-bang launch amidst significant press and media build-up.
  • High reputational risk for publishers (the investors) from incorrect data or poor site response, availability or reliability.

Architecturally Significant Requirements

  • 30-40 feeds, 1 million book records.
  • Batch data ingest jobs must execute between the hours of 2am-7am ET.
  • Manual data changes need to be updated immediately.
  • ISBNs belonging to the same ‘Book’ must be aggregated. Authors across ISBNs must be rolled up to the ‘Book’.
  • All edge (user-facing) APIs should respond in < 100ms.
  • Support peak load: 100k user sessions, no more than 20% dip in average response time.
  • Business rules to select the canonical representation of a ‘Book’ depend on the history of data updates. Rules are updated regularly, need to be run retroactively and shouldn’t regress previous updates.

Architectural Decision: Use CQRS Model

  • CQRS: Command Query Responsibility Segregation (Fowler).
  • CQRS: Decouple Read and Write modeling and execution.
  • ASRs: near-real-time updates, low service latency.
  • Core domain modelled toward Event Sourcing (history).
  • Multiple ‘views’: User facing enriched catalog, Search indexes, Recommendation datasets.
  • Sync views with domain by using an Event Bus.

Why Not Use N-Tier/CRUD

  • Very complex read queries.
  • Isolation: Larger, more complicated jobs at high volume could impact database performance/response time. Isolating databases based on use allows for targeted optimization. E.g. high memory working set required for ingest jobs vs. site queries.
  • Scalability: Modelling of objects is based on their history – the calculation to create canonical views was complex and expensive.
  • Required to retain object history.
  • Caching? Search and recommendation engines significantly expand requested book data footprint.
  • Read replicas? Batch load could affect millions of objects – high eventual consistency for read queries on site.

Trade-Offs: CQRS vs. N-Tier/CRUD

Pros

  • Deeper decoupling of the system to discrete services.
  • Cost efficiency: Provisioning for discrete service datastores vs. monolithic all-in-one database).
  • Auditability: tracking history vs. final result (think version control).
  • Flexibility: able to test and rollback new ingest rules without impacting the live site.
  • No large batch jobs to run and maintain to regenerate views.
  • Functional/immutable models applied at the code and architectural level (consistent mental model for devs).

Cons

  • Data consistency (still eventual).
  • Orchestration, deployment and monitoring complexity (pre-Kubernetes).
  • Solution justification.
  • Additional infrastructure: Event Bus.

Logical View – Data Pipeline

Physical View

Architectural Decision: Implement Event Bus using ZeroMQ

  • Fulfils ASRs for: HA, scalability, throughput.
  • Functional/immutable approach to handling data and operations.
  • Trade-off: very high throughput, but low flexibility. Majority of the data events were batch triggered.
  • Idempotent messages, audit, accounting, replay.
  • Message order guaranteed.
  • No persistence, manual recovery.
  • Stateless.

Messaging Infrastructure – HA Binary Star (ZeroMQ)

Outcome

  • Met all delivery requirements.
  • Successful Launch – investors happy.
  • Peak load at launch: 2k concurrent user sessions. Deprovisioned search/rec instances.
  • Supported batch and real-time data updates, apply new rules on the fly.
  • Data consistency – daily accounting to replay lost messages.
  • Bottleneck: graph algorithm for concordance calculations.
  • Rule changes affecting 25% of our dataset would propagate through the system in minutes, faster than update services could handle.

2018 Design?

Actors -> Streams.
ZeroMQ -> Kafka (MSK).
Custom concordance -> Spark + GraphX for parallel processing.
Kubernetes (EKS) for service orchestration and deployment (or Fargate with ECR/ECS).
Services: Custom EC2 instances -> Docker containers.
MongoDB: Atlas, or if managed: EBS -> SSD.