Build Status Coverage Status


Harvest is a messagebus, CQRS framework and eventstore for Dart with multiple backends. Features include

Message bus features

  • Synchronous and asynchronous delivery
  • Support for both "fire-and-forget" as well as delivery notication
  • Custom message callsbacks allowing subscribers to notify publishers of message status
  • Simple message bus API including standard Dart Stream/Sink interface
  • Extensive error handling support
  • Intercept dead messages (i.e. messages with no listeners)
  • Listen to all published events regardless of type
  • Enrich messages prior to their delivery

Event store features

  • Create, persist and manipulate streams of events independently of the event storage type
  • Simple API for supporting differnt storage backends for events
  • Supports file, IndexedDB and memory backends

CQRS support

  • Support for creating event sourced entities and aggregate roots
  • Support SAGA patterns for long running business process including compensating actions


You can use the message bus standalone or in conjuction with the event store. Message delivery can be synchronous or asynchronous

// Synchronous messagebus
var syncMessageBus = new MessageBus()
// Asynchronous messagebus
var asyncmessageBus = new MessageBus.async()

the message bus support both "fire-and-forget" as well as delivery notications

// subscribe to MyMessage
messageBus.subscribe(MyMessage, (MyMessage msg) => print(msg));
// Publish message and continue immediately
// Publish message and await until it has been handled by all subscribers  
int deliveredTo = await messageBus.publish(myMessage);

Messages can be programmatically completed by subscribers by using the CallbackCompleted message mixin

// Message that is completed by custom callback 
class CallbackMessage extends Message with CallbackCompleted {
// CallbackCompleted messages provide their own success callback
messageBus.subscribe(CallbackMessage, (CallbackMessage msg) {
  print("handled $msg");
  // complete callback successfully 
  msg.completed(true, "callback data"); 
// Publish message the result will contain "callback data"
var result = await messageBus.publish(myMessage);

// CallbackCompleted messages provide their own error callback
messageBus.subscribe(CallbackMessage, (CallbackMessage msg) {
  print("handled $msg");
  // complete message with an error
  msg.completed(false, "some error occured");

try {
  messageBus.publish(new CallbackMessage();
} catch(e} {
  // e = "some error occured";

If you prefer the Dart Stream/Sink interface HArvest supports this as well

// Get a stream for MyMessage
var stream =;
// Listen to stream
stream.listen((MyMessage myMessage) => print("recieved message $myMessage"));

// Get sink for MyMessage
var sink = messageBus.sink(MyMessage);
// Use sink to dispatch event
sink.add(new MyMessage("a message"));

Error handling can be done both through streams and message bus interface

// Using stream interface: **onError** function invoked when listener fails {
  // handled m
}, onError:(e) => print("error $e"));

// Using messagebus interface: **onError** function invoked when listener fails
messageBus.subscribe(MyMessage, (m) {
  // handled m
}, onError:(e) => print("error $e"));

Harvest exposes a hook for intercepting messages that has no listeners

// Handler invoked when a message with no subscribers is published
messageBus.deadMessageHandler = (Message msg) => print("no handler for ${msg.runtimeType}");

You can also listen to all events regardless of types

messageBus.everyMessage.listen((Message msg) => print("message ${msg.runtimeType} published");

If you want to intercept all messages prior to delivery you can use Harvest message enricher API

// Enricher that stores the current user id in the messages header
messageBus.enricher = (Message m) {
  m.headers["userId"] = StaticSessionData.userId;    

Event store

Harvest event store can be used for simply storing events which can later be retrieved

	import 'package:harvest/harvest.dart';
	main() async {
		var streamId = new Guid();
		var eventStore = new MemoryEventStore();
		// get a event stream for streamId 
		eventStream = await eventStore.openStream(streamId);
		// create some events
		var event1 = ...
		var event2 = ...
		// store them
		eventStream.addAll([event1, event2]);

Various backends are included in the standard Harvest distribution

  // Harvest FileEventStore that stores events in JSON files
  import 'package:harvest/harvest_file.dart';
  var eventStore = new FileEventStore("path/to/file.txt");
  // Harvest IdbEventStore that stores events in IndexdDB databases
  import 'package:harvest/harvest_idb.dart';
  var eventStore = new IdbEventStore("idb_database");

new EventStores can be created by implementing the EventStore and EventStream APIs

Event sourcing

Fully fletched event sourced applications are also supported. Event sourcing is the concept of saving and retriving objects by the events that occured on them rather than by their state. Consider the following bank account use case:

  1. User creates account
  2. User deposits 10$
  3. User withdraws 2$

In a CRUD application you would now have a BankAccount object with an amount property with the value 8. In a event sourced application you have a BankAccount object and 3 events for it

  1. AccountCreated
  2. AmountDeposited
  3. AmountWithdrawn

Where is this useful?

  • For certain applications the eventlog can be useful in itself such as a audit trail in a financial system.

  • It can help manage complexity in large applications by forcing programmers to make event types for every action that can occur.

  • It makes debugging easy since you can replay the event log to recreate any former system state where an error occurred.

  • It makes mobile app synchronization a breeze, since the offline app can just queue up events and replay them on the backend once it comes online.

  • In applications using the CQRS architecture pattern.

For more information, see the provided example application.




  • Rewrite example GUI app in Polymer
  • Document usage of SAGAs (process manager)
  • Ensure message headers are not serialized
  • Enable CQRS event reloading test
  • Create serilization interface that allows switching between mirror, transformation and manual serilizations



Harvest event store and CQRS API


Eventstore backed by a file system


Eventstore backed by IndexedDB


Sample application data used in the bundled example app and tests, usefull for getting to know how Harvest works