Build Status Coverage Status


Harvest is a messagebus, CQRS frame work and event store for Dart with multiple backends. Features include

  • Message bus
  • Synchronous and asynchronous delivery
  • Support for both "fire-and-forget" as well as delivery notication
  • Optional Dart Stream/Sink interface
  • Intercept dead messages (i.e. messages with no listeners)
  • Listen to all published events regardless of type
  • Enrich messages prior to their delivery
  • Event store
  • Create, persist and manipulate streams of events independently of the event storage type
  • Simple API for supporting differnt storage backends for events
  • Supports file, IndexedDB and memory backends
  • Support for event sourced entities
  • CQRS and SAGA support


You can use the message bus standalone or in conjuction with the event store. Message delivery can be synchronous or asynchronous

  // Synchronous messagebus
  var syncMessageBus = new MessageBus()
  // Asynchronous messagebus
  var asyncmessageBus = new MessageBus.async()

the message bus support both "fire-and-forget" as well as delivery notications

  // subcribe to MyMessage
  messageBus.subscribe(MyMessage, (MyMessage msg) => print(msg));
  // Publish message and continue immediately
  // Publish message and await until it has been handled by all subscribers  
  int deliveredTo = await messageBus.publish(myMessage);

If you prefer the Dart Stream/Sink interface HArvest supports this as well

  // Get a stream for MyMessage
  var stream =;
  // Listen to stream
  stream.listen((MyMessage myMessage) => print("recieved message $myMessage"));
  // Get sink for MyMessage
  var sink = messageBus.sink(MyMessage);
  // Use sink to dispatch event
  sink.add(new MyMessage("a message"));

Harvest exposes a hook for intercepting messages that has no listeners

  // Handler invoked when a message with no subscribers is published
  messageBus.deadMessageHandler = (Message msg) => print("no handler for ${msg.runtimeType}");

You can also listen to all events regardless of types

  messageBus.everyMessage.listen((Message msg) => print("message ${msg.runtimeType} published");

If you want to intercept all messages prior to delivery you can use Harvest message enricher API

  // Enricher that stores the current user id in the messages header
  messageBus.enricher = (Message m) {
    m.headers["userId"] = StaticSessionData.userId;    

Event store

Harvest event store can be used for simply storing events which can later be retrieved

	import 'package:harvest/harvest.dart';
	main() async {
		var streamId = new Guid();
		var eventStore = new MemoryEventStore();
		// get a event stream for streamId 
		eventStream = await eventStore.openStream(streamId);
		// create some events
		var event1 = ...
		var event2 = ...
		// store them
		eventStream.addAll([event1, event2]);

Various backends are included in the standard Harvest distribution

  // Harvest FileEventStore that stores events in JSON files
  import 'package:harvest/harvest_file.dart';
  var eventStore = new FileEventStore("path/to/file.txt");
  // Harvest IdbEventStore that stores events in IndexdDB databases
  import 'package:harvest/harvest_idb.dart';
  var eventStore = new IdbEventStore("idb_database");

new EventStores can be created by implementing the EventStore and EventStream APIs

Event sourcing

Fully fletched event sourced applications are also supported. Event sourcing is the concept of saving and retriving objects by the events that occured on them rather than by their state. Consider the following bank account use case:

  1. User creates account
  2. User deposits 10$
  3. User withdraws 2$

In a CRUD application you would now have a BankAccount object with an amount property with the value 8. In a event sourced application you have a BankAccount object and 3 events for it

  1. AccountCreated
  2. AmountDeposited
  3. AmountWithdrawn

Where is this useful?

  • For certain applications the eventlog can be useful in itself such as a audit trail in a financial system.

  • It can help manage complexity in large applications by forcing programmers to make event types for every action that can occur.

  • It makes debugging easy since you can replay the event log to recreate any former system state where an error occurred.

  • It makes mobile app synchronization a breeze, since the offline app can just queue up events and replay them on the backend once it comes online.

  • In applications using the CQRS architecture pattern.

For more information, see the provided example application.




  • Test and document usage of SAGAs (process manager)
  • Ensure message headers are not serialized
  • Enable CQRS event reloading test
  • Create serilization interface that allows switching between mirror, transformation and manual serilizations



Harvest event store and CQRS API


Eventstore backed by a file system


Eventstore backed by IndexedDB


Sample application data used in the bundled example app and tests, usefull for getting to know how Harvest works