Skip to content

2.1 RabbitMQ Integration Guide

Márcio Azevedo edited this page May 5, 2016 · 9 revisions

NuGet Version

Source Code: FSharp.DataProcessingPipelines.RabbitMQ.

To start using (assuming you already have a RabbitMQ instance - if you don't, use, for example, CloudAMQP free plan or install your own instance) follow the next steps. This example assumes the following scenario:

  1. Install FSharp.DataProcessingPipelines.RabbitMQ package and its dependencies (via NuGet or Paket)

  2. Add a F# module to your F# project available gist:

     open System
     open System.Collections.Generic
     open System.Globalization
     open System.Threading
     open RabbitMQ.Client
     open EasyNetQ
     open FSharp.DataProcessingPipelines.Core
     open FSharp.DataProcessingPipelines.Core.Messages
     open FSharp.DataProcessingPipelines.Core.Pipes
     open FSharp.DataProcessingPipelines.Core.Filters
     open FSharp.DataProcessingPipelines.Core.Runners
     open FSharp.DataProcessingPipelines.Infrastructure.RabbitMQ
     
     type BaseMessage (id:int, events:(DateTime * String) list) = 
     	member this.Id = id
     	member this.Events = events
    
     type ServiceBInputPipe
     	(serviceBus:IBus, subscriberId:String, topic:String, locale:String) = 
     	inherit RabbitMQInputPipe<BaseMessage> (serviceBus, subscriberId, topic, locale)
    
     type ServiceAOutputPipe
     	(serviceBus:IBus, topic:String) = 
     	inherit RabbitMQOutputPipe<BaseMessage> (serviceBus, topic)
    
     type ServiceAFilter (pipe:ServiceAOutputPipe) =
     	inherit DataSource<BaseMessage>(pipe)
     	override this.Execute () = 
     		try
     			try
     				let msg = BaseMessage(1, [(DateTime.Now, "Test Message created in F# by the ServiceAFilter!")])
     				printfn "--- Service A Publishes Msg ---"
     				this.OutputPipe.Publish msg
     			finally
     				// Dispose if needed
     				()
     		with
     			| ex -> 
     				// log exception
     				()
    
     type ServiceBFilter (pipe:ServiceBInputPipe) =
     	inherit DataSink<BaseMessage>(pipe)
     	override this.Execute () = 
     		let handler (msg:BaseMessage) = 
     			printfn "--- Service B Execute -> %d: " (msg.Id)
     			for i in msg.Events do
     				printfn "(%A, %s)" (fst i) (snd i)
     			printfn "-----------------------------------------"
     		this.InputPipe.Subscribe (handler)
    
     let ServiceBusHost = "host=localhost" //TODO: set an existing RabbitMQ host!
     let Culture = "en-US"
    
     let ServiceASubscriberId = "ServiceASubscriberId"
     let ServiceAInputPipeTopic = "ServiceAInputPipeTopic"
     let ServiceAOutputPipeTopic = "ServiceAOutputPipeTopic"
    
     let ServiceBSubscriberId = "ServiceBSubscriberId";
     let ServiceBInputPipeTopic = "ServiceAOutputPipeTopic"
     let ServiceBOutputPipeTopic = "ServiceDInputPipeTopic"
    
     let serviceBus = 
     	try
     		RabbitHutch.CreateBus(ServiceBusHost)
     	with
     		| ex -> 
     			let innerException = ex.InnerException
     			printfn "%A %A" (ex.Message) (innerException)
     			raise ex
    
     let outputPipe = new ServiceAOutputPipe(serviceBus, ServiceAInputPipeTopic)
     let inputPipe = new ServiceBInputPipe(serviceBus, ServiceBSubscriberId, ServiceAInputPipeTopic, Culture)
    
     let myRunnerA = BaseRunner (ServiceAFilter (outputPipe))
     let myRunnerB = BaseRunner (ServiceBFilter (inputPipe))
    
     myRunnerA.Start ()
     myRunnerB.Start ()
    
Clone this wiki locally