Sending/Receiving Messages
Recall RPC
Dictionary:
Protobuf:
Synchronous vs. Asynchronous Communication
Synchronous
Both parties have to participate at the same time
Examples: phone call, RPC call
Asynchronous
One party can send any time, the other can receive later
Examples: email, streaming
ETL (Extract Transform Load)
Have multiple OLTP database
ETL Code
Problem: if we have X OLTP databases and Y derivative stores, how many ETL programs must we write? $X \cdot Y$ (Too Much!)
Solution: Uniļ¬ed Log (Centralize changes in a distributed logging service)
Kafka Design
Topics (managed by servers called brokers)
pip install kafka-python
admin = KafkaAdminClient(...)
admin.create_topics([NewTopic("sports", ...)])
Producers, Consumers
Producers Publish (pub/sub)
producer3 = KafkaProducer(...)
producer3.send("sports", ...)
Consumers Subscribe (pub/sub)
consumer3 = KafkaConsumer(...)
consumer3.subscribe(["sports"])
Receiving Messages
poll()
loop
consumer3 = KafkaConsumer(...)
while True:
batch = consumer3.poll(????)
for topic, messages in batch.items():
for msg in messages:
...
poll()
(ideally) returns some messages the consumer hasn't seen before, from any subscribed topicpoll()
leaves messages intact on brokers (for other consumers), unlike many prior streaming systemsWhat's in a Message?
key (optional): some bytes. The key is used for partitioning and is usually one of the entries in the value structure.
producer.send("topic", value=????, key=????)
value (required): some bytes. The value is usually some kind of structure with many values.
producer.send("topic", value=????)
Python dict => bytes
value = bytes(json.dumps(dic), "utf-8")
Protobuf => bytes:
msg = mymod_pb2.MyMessage(...)
value = msg.SerializeToString()
# actually bytes, not str
Reason for Partitioning
Some topics might have too many messages for one machine (or set of machines with replicas) to keep up
Topics can be created with N partitions:
Changing Partitions
Selecting Partitions
Consumers: Read Offsets
Ordering Kafka Messages
Partially vs. Totally Ordered
Some things are totally ordered, like integers. Either x < y or y >= x.
Other things are partially ordered, like Git commits. Sometimes you can compare, sometimes you can't!
Kafka Messages are partially ordered. Messages are consumed from a partition in the order they were written to that partition (no guarantees across topics or across partitions).
If A and B share the same topic and key, and B was produced after A, then:
Seek to an Offset
part = TopicPartition("clicks", 3)
offset = 6
consumer.seek(part, offset)
Read pattern
Consumer Groups
c = KafkaConsumer("clicks", group_id="g1", ...)
different applications might operate independently
they should ALL get a chance to consume messages
need offsets for each topic/partition/consumer group combination
Partition Assignment: Manual
tp0 = TopicPartition("clicks", 0)
...
consumer2.assign([tp0, tp1])
consumer3.assign([tp2, tp3]
Partition Assignment: Automatic
while True:
batch = consumer.poll(1000)
for topic, msgs in batch.items():
for msg in msgs:
consumer.close()
Segment Files: Log Rollover and Deletion