Get Hands-on with Event Sourcing and CQRS
Follow the instructions at https://github.com/SuchSoftware/practical-microservices
Go through the slides up until "Let's Build It!"
docker-compose rm -sfdocker-compose up- View Data DB: postgres://postgres@localhost:5432/practical_microservices
- Message Store: postgres://postgres@localhost:5433/message_store
git checkout step-01
- Open
exercises/01-write-a-message.js - If you try to run it, it will go boom! So run it and watch the boom.
- Uncomment the
// id: uuid(),line. - Run it again.
- Talk about the fields in the message
- Check out the result in the database viewer. Point out the connection information is in the README
- SLIDE - anatomy of a message
git checkout step-02
- Lots of dependency injection
- SLIDES - Showing how it works from
src/index.js - All wired together in
src/config.js. Be sure to peek into this file.
git checkout step-03
- Exercise
02-handle-transcribe-command.js - Run
node exercises/02-handle-transcribe-command.js - Show src/config.js - We’re passing the message store now
- Show src/transcribe-component/index.js - we’re receiving the message store now
- Back to src/transcribe-component/index.js - Show the
transcodeVideofunction - Running the exercise gave the error
TypeError: Cannot read property 'Transcribe' of undefined - The code in the exercse was:
config.transcribeComponent.handlers
.Transcribe(transcribe)
- We need something callend
handlerson the transcribe component with a function namedTranscribe - Instantiate
handlers, passingmessageStoreand havecreateHandlersreceivemessageStore:
function createHandlers ({ messageStore }) {
return {}
}
function build ({ messageStore }) {
const handlers = createHandlers({ messageStore })
// ...
return {
handlers,
start
}
}
- Show that the handler key name matches the message type we’re handling
- Live code the solution
function createHandlers ({ messageStore }) {
return {
async Transcribe (transcribe) {
const { transcribeId, uri } = transcribe.data
const transcription = transcribeVideo(uri)
const transcribed = {
id: uuid(),
type: 'Transcribed',
metadata: {
traceId: transcribe.metadata.traceId,
originStreamName: transcribe.metadata.originStreamName
},
data: {
transcribeId,
uri,
transcription
}
}
const streamName = `transcribe-${videoId}`
return messageStore.write(streamName, transcribed)
}
}
}
git checkout step-04
- Get clean database before running the exercise
- Exercise
03-double-handle-transcribe-command.js - What is different about this exercise? (Double calling the handler)
- Could this ever happen? (Crash. Redeployment. Restart.)
- How the message store code stores position
- We don’t want to reprocess messages. (Process vs. handle)
- Anyone know what we’re missing? (idempotence)
git checkout step-05
- Slides on Idempotence and projections
- Exercise
exercises/04-projecting-the-transcription.js - Let’s fill out the projection
- Projections use keys that match the message types as well. What message type do I need?
module.exports = {
$init: () => ({ id: null, isTranscribed: false }),
Transcribed (transcribeJob, transcribed) {
transcribeJob.id = transcribed.data.transcribeId
transcribeJob.isTranscribed = true
transcribeJob.uri = transcribed.data.uri
transcribeJob.transcription = transcribed.data.transcription
return transcribeJob
}
}
git checkout step-06
- Notice in
src/transcribe-component/index.jsthat we’re requiring the projection - Show
projectfunction insrc/message-store/read.js - Show where in the handler it needs to be used and make the change to use it.
if (transcription.isTranscribed) {
console.log(`[${transcribe.id}]: Already transcribed. Skipping.`)
return true
}
- Re-run
exercises/03-double-handle-transcribe-command.js, see how only 1 event gets written.
git checkout step-07
- Clean database first
- Exercise
exercises/05-write-transcribe-command.js - Run it, see the command
- Write subscription code
function build ({ messageStore }) {
const handlers = createHandlers({ messageStore })
const commandSubscription = messageStore.createSubscription({
streamName: 'transcribe:command',
handlers,
subscriberId: 'transcribeCommandConsumer'
})
// ...
function start () {
console.log('Starting transcribe component')
commandSubscription.start()
}
// ...
}
- Start the server
- Look in Message Store and see the transcribed event
git checkout step-08
- Exercise
exercises/06-handle-transcode-command.js - We’re setting aside the transcribe component now (take them back to the event model)
- Group codes this whole component
- Fill out the projection in
src/transcode-component/projection.js
module.exports = {
$init: () => ({
id: null,
uri: null,
transcodedUri: null,
isTranscoded: false
}),
Transcoded (transcoding, transcoded) {
transcoding.id = transcoded.data.transcodeId
transcoding.uri = transcoded.data.uri
transcoding.transcodedUri = transcoded.data.transcodedUri
transcoding.isTranscoded = true
return transcoding
}
}
- Fill out the handler in
src/transcode-component/index.js
function createHandlers ({ messageStore }) {
return {
async Transcode (transcode) {
const transcodeId = transcode.data.transcodeId
const streamName = `transcode-${transcodeId}`
const transcoding = await messageStore.fetch(streamName, projection)
if (transcoding.isTranscoded) {
console.log(`(${transcodeId}): Already transcoded. Skipping.`)
return true
}
const transcodedUri = transcodeFile(transcode.data.uri)
const transcoded = {
id: uuid(),
type: 'Transcoded',
metadata: {
traceId: transcode.metadata.traceId,
originStreamName: transcode.metadata.originStreamName
},
data: {
transcodeId,
uri: transcode.data.uri,
transcodedUri
}
}
return messageStore.write(streamName, transcoded)
}
}
}
git checkout step-09
- Exercise
exercises/06-handle-catalog-command.js - The catalog component needs to get the other 2 to do work. How does it do it?
- We want to advance the process based off of our own events
- The projection is already filled out
- Get a Catalog command transformed into a Started event in
src/catalog-component/index.js
function createCommandHandlers ({ messageStore }) {
return {
async Catalog (catalog) {
const videoId = catalog.data.videoId
const videoStreamName = `catalog-${videoId}`
const video = await messageStore.fetch(videoStreamName, projection)
if (video.isStarted) {
console.log(`(${catalog.id}) Video already started. Skipping`)
return true
}
const started = {
id: uuid(),
type: 'Started',
metadata: {
traceId: catalog.metadata.traceId
},
data: {
videoId: catalog.data.videoId,
uri: catalog.data.uri
}
}
return messageStore.write(videoStreamName, started)
}
}
}
git checkout step-10
- Exercise
exercises/08-handle-started-event.js - Respond to our own event
- It's in a new set of handlers
- Make sure to set the
originStreamName - We expect to see more than 1 command. Why? Why does it not matter?
function createEventHandlers ({ messageStore }) {
return {
async Started (started) {
const videoId = started.data.videoId
const streamName = `catalog-${videoId}`
const video = await messageStore.fetch(streamName, projection)
if (video.isTranscoded) {
console.log(`(${started.id}) Video already transcoded. Skipping`)
return true
}
const transcodeId = uuid()
const transcode = {
id: uuid(),
type: 'Transcode',
metadata: {
traceId: started.metadata.traceId,
originStreamName: streamName
},
data: {
transcodeId,
uri: started.data.uri
}
}
const commandStream = `transcode:command-${transcodeId}`
return messageStore.write(commandStream, transcode)
}
}
}
git checkout step-11
- Exercise
exercises/09-handle-transcoded-event-caused-by-catalog.js - The catalog component will drive the process off of its own events. It shouldn’t rely on other streams for its own state
- Idempotently copy the Transcoded event to the catalog stream
- Talk about how we get the
catalogstream from themetadataon an event in atranscodestream. - Call out
originStreamNamein subscription
function createTranscodeEventHandlers ({ messageStore }) {
return {
async Transcoded (transcoded) {
const streamName = transcoded.metadata.originStreamName
const video = await messageStore.fetch(streamName, projection)
if (video.isTranscoded) {
console.log(`(${transcoded.id}) Video already transcoded. Skipping`)
return true
}
const videoTranscoded = {
id: uuid(),
type: 'Transcoded',
metadata: {
traceId: transcoded.metadata.traceId
},
data: {
videoId: video.id,
transcodeId: transcoded.data.transcodeId,
uri: transcoded.data.uri,
transcodedUri: transcoded.data.transcodedUri
}
}
return messageStore.write(streamName, videoTranscoded)
}
}
}
git checkout step-12
- Exercise
exercises/10-handle-transcoded-event-in-catalog-stream.js - The handler function for this has not been scaffolded
- Where will we put the handler?
- What is a handler?
function createEventHandlers ({ messageStore }) {
return {
// ...
async Transcoded (transcoded) {
const streamName = transcoded.streamName
const video = await messageStore.fetch(streamName, projection)
if (video.isTranscribed) {
console.log(`(${transcoded.id}) Video already transcribed. Skipping`)
return true
}
const transcribeId = uuid()
// ourselves
const transcribe = {
id: uuid(),
type: 'Transcribe',
metadata: {
traceId: transcoded.metadata.traceId,
originStreamName: streamName
},
data: {
transcribeId,
uri: video.uri
}
}
const commandStreamName = `transcribe:command-${transcribeId}`
return messageStore.write(commandStreamName, transcribe)
}
}
}
git checkout step-13
- No exercise
- We're not doing anything new, so we're not doing this as an exercise
- Show the code - It's the same template as what we did for Transcoded
- Then we also handle
catalog'sTranscribedevent to write aCatalogedevent
git checkout step-014
- Keeping it simple. Anyone can upload a video. Can’t foresee any problem with that!
- Videos are named after their id. Can’t foresee any problems with that!
- Notice that the Application’s job here is to just get the command to the message store
- That’s all it has available at the moment
- That’s why the view video route has the interstitial
- Notice that the reads are now just like any other HTTP handler you’ve work with before. What we’ve done is decouple our write model from our read model
- Database migrations in
migrationsfolder - Wipe the databases (docker-compose rm -sf)
- Fire up the databases (docker-compose up)
- Start the server (npm start)
- Run the exercise (node exercises/13-whole-process.js)
- Inspect the message store
git checkout step-015
- It’s just a component, but we call them out as aggregators to make the distinction
- The query needs to be idempotent. Upserting gives us that.