Top K problem(heavy hitters)


Functional requirements

  • Return top 10 hashtag, say in Instagram, within given time interval.


  • Accurate result, data sampling is not preferred.

  • Scalable, with data growth

  • Highly available

  • Low latency, response in 200ms


  • 1m posts every day, 3 hashtags per post

  • 36 qps -> peak: 100 qps


  • Single machine scenario

  • Partition on ID/Key

  • Even faster? Sacrifice accuracy: Count-min sketch, how it works?

  • Combination: fast path, slow path

Simple case

Maintain a HashTable, flush to storage every minute.

Higher scale scenario

Option 1: app servers send log to Message Queue

  • pros

    • Fast processing. Saving logs then query from DB incurs delays.

  • cons

    • High pressure on message queue

    • Server need to know about the queue, which consumes server resources and increase maintenance cost.

    • Not able to do MapReduce to produce accurate result for longer term, like 1 hour or 1 day.

Option 2: read from row log

  • Key workflow

    • APP servers persist logs, which is standard process.

    • Partitioners read from raw log. Each read and process small fraction of logs, so they need to hold exclusive lock(Chubby) of the frontier timestamp.

    • Partitioners aggregate data, then send to each MQ topic(partition on key+timestamp).

    • Aggregators listen to specific topics, aggregate for 1min, then persist data to DB.

    • We can have Ad Hoc aggregators to calculate results for 5/10/60 min and persist back to DB, so that readers can read result directly.

  • Pros

    • Decouple servers and data pipeline. Easy to maintain and scale.

    • Low complexity. We store raw logs for other purpose anyway.

    • MapReduce to produce accurate result for longer term, like 1 hour or 1 day.

    • Accurate result for both short-term and long-term result.

  • Cons

    • Slower process speed.

Option 3: probabilistic approach in single machine

  • Count-min Sketch (CMS) algorithm.

  • Pros

    • Super fast.

    • Less resources needed.

      • Constant memory: only maintain a maxHeap of K elements and a table of hash values.

    • Simple system.

  • Cons

    • Not accurate result.

Option 4: #3 + MapReduce, fast path and slow path


  • Hitters table

    • tables for minute, hour, day

    • Schema: start_time, key, count

    • Composite primary on <start_time, key>


  • "Buffer" is important in large scale data aggregation pipeline. E.g. we cache 1s data in Partitioner and cache 1 min data in Aggregator. So we substantially reduce the traffic in downstream. In this case, load on the DB is negligible.

  • Decouple app servers and data pipeline, i.e. read from log storage.

  • Partition the data. On key+timestamp to address hotspot issue.


Last updated