RdKafka for F# Microservices

08 Dec 2016, Jonathan Leaver

Objective

Apache Kafka represents a powerful tool for linking microservices in a distributed system with a focus on processing and producing streams of interesting data.

If you're new, check out a quick introduction to the important concepts in Apache Kafka.

RdKafka is a C-native library for interacting with Apache Kafka that is used in a wide variety of systems and a nice C# wrapper is available for it.

Let's look at how to use these from F# to support coordinated, high-performance microservices.

Dependencies

  1. RdKafka from NuGet, or using Paket:

    ./.paket/paket.exe add nuget RdKafka project MyProject
    
  2. Pre-build event to transfer native libraries (64-bit): (see: DllNotFoundException)

    xcopy /y /d /f
         "$(ProjectDir)..\packages\RdKafka.Internal.librdkafka\runtimes\win7-x64\native\*.*"
         "$(TargetDir)"
    
  3. Reference and open the C# wrapper:

#r "./packages/RdKafka/lib/net451/RdKafka.dll"

open RdKafka

Terminology

Kafka

type Topic = string       // https://kafka.apache.org/intro#intro_topics
and Partition = int       //   partition of a topic
and Offset = int64        //   offset position within a partition of a topic
and BrokerCsv = string    // connection string of "broker1:9092,broker2:9092"
                          //   with protocol {http://,tcp://} removed
and ConsumerName = string // https://kafka.apache.org/intro#intro_consumers
and ErrorReason = string  

RdKafka Events

RdKafka provides much better transparency than other .NET kafka libraries by exposing a wide variety of events using callbacks. Here we define an F# union type for the various cases.

type Event =
  | Statistics of string
  | OffsetCommit of ErrorCode*List<Topic*Partition*Offset>
  | EndReached of Topic*Partition*Offset
  | PartitionsAssigned of List<Topic*Partition*Offset>
  | PartitionsRevoked of List<Topic*Partition*Offset>
  | ConsumerError of ErrorCode
  | Error of ErrorCode*string

Converting RdKafka Types

Following are some simple conversions to F# datatypes that are used throughput our samples:

let ofTopicPartitionOffset (tpo:TopicPartitionOffset) =
  tpo.Topic,tpo.Partition,tpo.Offset

let ofTopicPartitionOffsets =
  Seq.map ofTopicPartitionOffset >> Seq.toList

let ofCommit (oca:Consumer.OffsetCommitArgs) =
  oca.Error,ofTopicPartitionOffsets oca.Offsets

let ofError (ea:Handle.ErrorArgs) =
  ea.ErrorCode,ea.Reason

Logging RdKafka Events

Consider using an exhaustive pattern match on the Event union type to provide highly granular logging. As you're working with the library, these cases will give you valuable insight:

let toLog = function
  | Event.Error _ -> () 
//| ...

You can easily attach your toLog function to callbacks from both the producer and consumer by mapping the callbacks into cases of the Event type:

let fromConsumerToLog (c:EventConsumer) =
  c.OnStatistics.Add(Statistics >> toLog)
  c.OnOffsetCommit.Add(ofCommit >> OffsetCommit >> toLog)
  c.OnEndReached.Add(ofTopicPartitionOffset >> EndReached >> toLog)
  c.OnPartitionsAssigned.Add(ofTopicPartitionOffsets >> PartitionsAssigned >> toLog)
  c.OnPartitionsRevoked.Add(ofTopicPartitionOffsets >> PartitionsRevoked >> toLog)
  c.OnConsumerError.Add(ConsumerError >> toLog)
  c.OnError.Add(ofError >> Error >> toLog)

let fromProducerToLog (p:Producer) =
  p.OnError.Add(ofError >> Error >> toLog)
  p.OnStatistics.Add(Statistics >> toLog)

Configuration and Connection

When you connect to RdKafka, you can configure any of the defaults in the underlying native library. In particular, there are several settings you may want to consider:

  1. a consumer GroupId, shared by all cooperating instances of a microservice
    • note: for rdkafka 0.9.1 or earlier, setting GroupId on a producer may block Dispose()
  2. whether or not to EnableAutoCommit for tracking your current offset position
  3. whether to save the offsets on the broker for coordination
  4. if your Kafka cluster runs an idle connection reaper, disconnection messages will appear at even intervals when idle
  5. a metadata broker list workaround enables you to query additional metadata using the native wrapper
  6. where to start a brand new consumer group:
    • smallest starts processing from the earliest offsets in the topic
    • largest, the default, starts from the newest message offsets
let connect (brokerCsv:BrokerCsv) (group:ConsumerName) (autoCommit:bool) =
  let config = new Config()
  config.GroupId <- group                                // (1)
  config.StatisticsInterval <- TimeSpan.FromSeconds(1.0)
  config.EnableAutoCommit <- autoCommit                  // (2)
  config.["offset.store.method"] <- "broker"             // (3)
  config.["log.connection.close"] <- "false"             // (4)
  config.["metadata.broker.list"] <- brokerCsv           // (5)
  config.DefaultTopicConfig <-
    let topicConfig = new TopicConfig()
    topicConfig.["auto.offset.reset"] <- "smallest"      // (6)
    topicConfig

  new EventConsumer(config, brokerCsv),
  new Producer(config, brokerCsv)

Publishing

A partition key and payload can then be published to a topic. The response includes a partition and offset position confirming the write. Consider Encoding.UTF8.GetBytes if your message is text.

let publish (brokerCsv:BrokerCsv) (group:ConsumerName) (topic:Topic) =
  let consumer,producer = connect brokerCsv group false
  let topic = producer.Topic(topic)
  fun (key:byte[], payload:byte[]) -> async {
    let! report = Async.AwaitTask(topic.Produce(payload=payload,key=key))
    return report.Partition, report.Offset
  }

Subscribing

To consume, on partition assignment we select Offset.Stored, which defaults to the value of auto.offset.reset if no stored offset exists. Messages are then sent to the onMessage callback once the topic subscription starts.

let subscribeCallback (brokerCsv) (group) (topic:Topic) (onMessage) =
  let autoCommit = true
  let consumer,producer = connect brokerCsv group autoCommit
  consumer.OnPartitionsAssigned.Add(
    ofTopicPartitionOffsets
    >> List.map (fun (t,p,o) -> t,p,Offset.Stored)
    >> List.map TopicPartitionOffset
    >> Collections.Generic.List<_>
    >> consumer.Assign)
  consumer.OnMessage.Add(onMessage)
  consumer.Subscribe(new Collections.Generic.List<string>([topic]))
  consumer.Start()

The above works quite well assuming you process the message to completion within the callback.

You may want to process larger batches of messages asynchronously, however. To use a sequence instead, a blocking collection can buffer incoming messages as they're received. A sequence generator yielding messages from this buffer is returned to the client:

let subscribeSeq brokerCsv group topic : seq<Partition*Offset*byte[]> =

Buffering more than 3000 messages will hold+block the callback until the client has consumed from the sequence.

  let buffer = 3000
  let messages =
    new Collections.Concurrent.BlockingCollection<Message>(
      new Collections.Concurrent.ConcurrentQueue<Message>(), buffer)
  consumer.OnMessage.Add(messages.Add) 
  consumer.Subscribe(new Collections.Generic.List<string>([topic]))
  consumer.Start()
  
  Seq.initInfinite(fun _ ->
    let message = messages.Take()
    message.Partition, message.Offset, message.Payload)  // or AsyncSeq :)

At this point, we make an important observation: the native client will autocommit offsets acknowledging that a message in the buffer has been processed even though it may not have been dequeued. From RdKafka's point of view, the callback for that message has completed!

This is when you'll want to manage offsets yourself.

Manual Offsets

For a given partition, we need to know which messages have started processing. These are the Active messages that we do not yet want to commit. When a message has completed we move it to Processed.

It's possible that processing may complete out of order! To account for this, the Next offset to commit must be less than the oldest Active message.

type Offsets =
  { Next : Offset option       // the next offset to be committed
    Active : Offset list       // offsets of active messages (started processing)
    Processed : Offset list }  // offsets of processed messages newer than (>) any
                               // active message, e.g.:
                               //   still working on 4L, but 5L & 6L are processed

In the following module:

  • start adds a message to the Active set
  • finish moves a message from the Active set to Processed
  • update adjusts the Next offset to commit (based on any changes above)
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Offsets =
  let empty = { Next=None; Processed=[]; Active=[] }

  let private update = function
    | { Processed=[] } as x -> x
    | { Active=[] } as x -> { x with Processed=[]; Next=List.max x.Processed |> Some }
    | x -> x.Processed
           |> List.partition ((>=) (List.min x.Active))
           |> function | [], _ -> x
                       | c, p -> { x with Processed=p; Next=List.max c |> Some }
  let start  (x:Offset) (xs:Offsets) = update { xs with Active = x :: xs.Active }
  let finish (x:Offset) (xs:Offsets) = update { xs with Active = List.filter ((<>) x) xs.Active
                                                        Processed = x :: xs.Processed }

Since the Offsets above apply to an individual partition, we want to be able to track all current partitions. This follows the same lifecycle we've seen so far:

  1. a partition is assigned to us
  2. a message starts processing
  3. a message finishes processing
  4. a partition may be revoked (and assigned to another instance)
type Partitions = Map<Partition, Offsets>

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Partitions =  
  let assign (p:Partition) (xs:Partitions) : Partitions =
    match Map.tryFind p xs with
    | Some _ -> xs
    | None -> Map.add p Offsets.empty xs

  let start (p:Partition, o:Offset) (xs:Partitions) : Partitions =
    match Map.tryFind p xs with 
    | Some offsets -> Map.add p (Offsets.start o offsets) xs
    | None -> xs

  let finish (p:Partition, o:Offset) (xs:Partitions) : Partitions =
    match Map.tryFind p xs with
    | Some offsets -> Map.add p (Offsets.finish o offsets) xs
    | None -> xs

  let revoke (p:Partition) (xs:Partitions) : Partitions =
    Map.remove p xs

Finally, we want the next offset to commit for each partition assigned to us. Our client can then commit a checkpoint to record completion of all messages up to this point.

  let checkpoint : Partitions -> List<Partition*Offset> =
    Map.toList >> List.choose (fun (p,o) -> o.Next |> Option.map(fun o -> p,1L+o))

Integrating & Committing Offsets

This example uses an F# mailbox processor to aggregate our progress from multiple threads. There are several ways to solve this problem! This particular solution can play nicely with asynchronous workflows and is fairly concise.

The actor observes the following:

  1. processing started or finished on a message at Offset of Partition
  2. active Partitions have been assigned to us, or revoked from us (i.e. assigned to another consumer in our group)
type OffsetMonitor = OffsetMonitorMessage->unit
and OffsetMonitorMessage =
  | Start of Partition * Offset | Finish of Partition * Offset // (1)
  | Assign of Partition list    | Revoke of Partition list     // (2)

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module OffsetMonitor =

    let assign m = List.map (fun (_,p,_) -> p) >> Assign >> m 
    let revoke m = List.map (fun (_,p,_) -> p) >> Revoke >> m
    let start  m (x:Message) = (x.Partition, x.Offset) |> Start |> m
    let finish m (x:Message) = (x.Partition, x.Offset) |> Finish |> m

    let create (consumer:EventConsumer) (topic:Topic) =

      let integrate = function
        | Start  (p,o) -> Partitions.start  (p,o)
        | Finish (p,o) -> Partitions.finish (p,o)
        | Assign (ps)  -> List.foldBack Partitions.assign (ps)
        | Revoke (ps)  -> List.foldBack Partitions.revoke (ps)

Here, every 45 seconds our offset monitor will commit outstanding checkpoints. Note that a Kafka cluster may reassign your partitions if you wait too long to report progress.

      MailboxProcessor<OffsetMonitorMessage>.Start(fun inbox ->
        let rec loop(watch:Stopwatch, partitions:Partitions) = async {
            if watch.Elapsed.TotalSeconds > 45. then
              let! result =
                Partitions.checkpoint partitions
                |> List.map (fun (p,o) -> TopicPartitionOffset(topic,p,o))
                |> Collections.Generic.List
                |> consumer.Commit
                |> Async.AwaitTask
                |> Async.Catch
              return! loop(Stopwatch.StartNew(),partitions)
            else
              let! message = inbox.TryReceive(1000)
              return!
                match message with
                | None -> loop(watch, partitions)
                | Some m -> loop(watch, partitions |> integrate m)
          }
        loop(Stopwatch.StartNew(), Map.empty)).Post

Subscribing (Parallelism + Manual Offset Management)

Let's suppose we want to process multiple messages at a time on multiple threads.

To tie all of this together, we create a hybrid of subscribeCallback and subscribeSeq functions above. It accepts an onMessage callback, tracks all offsets, and executes using some degree of concurrency.

let subscribeParallel concurrency brokerCsv group topic onMessage =
  1. when a partition is assigned, we start tracking its offsets
  2. before business logic, offsets are marked as active
  3. after business logic, offsets are marked as processed
  4. when a partition is revoked, we stop tracking its offsets
  let monitor = OffsetMonitor.create consumer topic

  consumer.OnPartitionsAssigned.Add(
    ofTopicPartitionOffsets >> OffsetMonitor.assign monitor) // (1)
  consumer.OnPartitionsRevoked.Add(
    ofTopicPartitionOffsets >> OffsetMonitor.revoke monitor) // (4)

  let onMessage(message:Message) = async {
      OffsetMonitor.start monitor message                    // (2)
      do! onMessage(message)
      OffsetMonitor.finish monitor message }                 // (3)

  consumer.Subscribe(new Collections.Generic.List<string>([topic]))
  consumer.Start()

Finally, take the messageSeq from subscribeSeq, and apply onMessage with the specified degree of concurrency. (using AsyncSeq, this time :) )

  messageSeq
  |> AsyncSeq.ofSeq
  |> AsyncSeq.iterAsyncParThrottled concurrency onMessage

Supervision

Supervising progress of a microservice running RdKafka depends on monitoring two things:

  1. the range of messages available in a topic (i.e. its Watermark offsets), and
  2. the current Checkpoint offsets for a consumer group relative to those Watermarks

Watermarks

A line drawn on the side of an empty ship is its low water line. When you put cargo on the ship, it sits lower in water. The line drawn on the side of a fully loaded ship is its high water line. The same terminology is used here:

type Watermark =     // watermark high/low offset for each partition of a topic
  { Topic : string   // topic
    Partition : int  // partition of the topic
    High : int64     // highest offset available in this partition (newest message)
    Low : int64 }    // lowest offset available in this partition (oldest message)

To query these watermarks:

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Watermark =
  let queryWithTimeout (timeout) (producer:Producer) (consumer:Consumer) (topic:Topic) =
    let queryWatermark(t, p) =
      async {
        let! w =
          consumer.QueryWatermarkOffsets(TopicPartition(t, p), timeout)
          |> Async.AwaitTask
        return { Topic=t; Partition=p; High=w.High; Low=w.Low }
      }
    async {
      let topic = producer.Topic(topic)
      let! metadata =
        producer.Metadata(onlyForTopic=topic, timeout=timeout)
        |> Async.AwaitTask
      return!
        metadata.Topics
        |> Seq.collect(fun t -> [for p in t.Partitions -> t.Topic, p.PartitionId])
        |> Seq.sort
        |> Seq.map queryWatermark
        |> Async.Parallel // again, consider AsyncSeq instead :)
    }

Checkpoints

Your consumer group's current position is composed of an offset position within each partition of the topic:

type Checkpoint = List<Partition*Offset>

If you're monitoring within an active consumer, you have access to the absolute latest offsets completed for each partition. When multiple consumers are working together in a group, however, each has only a partial view of the overall progress.

It's possible to query the broker for the latest committed checkpoint for a consumer group across all partitions of the topic:

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Checkpoint =
  let queryWithTimeout (timeout) (producer:Producer) (consumer:Consumer) (topic:Topic) =
    async {
      let! metadata = 
        producer.Metadata(onlyForTopic=producer.Topic(topic), timeout=timeout)
        |> Async.AwaitTask
      let partitions =
        metadata.Topics
        |> Seq.collect(fun t -> [for p in t.Partitions -> t.Topic, p.PartitionId])
        |> Seq.sort
        |> Seq.map (fun (t,p) -> new TopicPartition(t,p))
      let! committed =
        consumer.Committed(new Collections.Generic.List<_>(partitions), timeout)
        |> Async.AwaitTask
      let checkpoint : Checkpoint =
        committed
        |> Seq.map (fun tpo -> tpo.Partition, tpo.Offset)
        |> Seq.sortBy fst
        |> Seq.toList
      return checkpoint
    }

Over time, your current offset in each partition should increase as your consumer group processes messages. Similarly, the high watermark will also increase as new messages are added. The difference between your high watermark and your current position is your lag.

Using these figures, you can measure your performance relative to any service level agreement in effect for your microservice, and potentially take corrective action - such as scaling the number of consumer instances or size of machines.

Summary

With an eye to building top-tier microservices, we looked at:

  • logging callbacks in RdKafka to achieve better visibility
  • configuring the client for flexibility in several useful scenarios
  • publishing and subscribing, including:
    • scaling the client as partitions are redistributed
    • maintaining an at-least-once guarantee for message processing at scale
  • monitoring overall progress using watermarks and committed checkpoints
  • and we barely even scratched the surface ;)

If you enjoy working with F# and Kafka, I also encourage you to check out Kafunk - an open source client (written entirely in F# !) under development at Jet.

Thanks for visiting ~ have fun !

namespace RdKafka
namespace System
namespace System.Collections
namespace System.Collections.Concurrent
namespace System.Diagnostics
type Topic = string

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Topic
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
type Partition = int

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Partition
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type Offset = int64

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Offset
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
type BrokerCsv = string

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.BrokerCsv
type ConsumerName = string

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ConsumerName
type ErrorReason = string

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ErrorReason
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event =
  | Statistics of string
  | OffsetCommit of ErrorCode * List<Topic * Partition * Offset>
  | EndReached of Topic * Partition * Offset
  | PartitionsAssigned of List<Topic * Partition * Offset>
  | PartitionsRevoked of List<Topic * Partition * Offset>
  | ConsumerError of ErrorCode
  | Error of ErrorCode * string

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Event

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

Full name: Microsoft.FSharp.Control.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
union case Event.Statistics: string -> Event
union case Event.OffsetCommit: ErrorCode * List<Topic * Partition * Offset> -> Event
type ErrorCode =
  | _BEGIN = -200
  | _BAD_MSG = -199
  | _BAD_COMPRESSION = -198
  | _DESTROY = -197
  | _FAIL = -196
  | _TRANSPORT = -195
  | _CRIT_SYS_RESOURCE = -194
  | _RESOLVE = -193
  | _MSG_TIMED_OUT = -192
  | _PARTITION_EOF = -191
  ...

Full name: RdKafka.ErrorCode
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member GetSlice : startIndex:int option * endIndex:int option -> 'T list
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
union case Event.EndReached: Topic * Partition * Offset -> Event
union case Event.PartitionsAssigned: List<Topic * Partition * Offset> -> Event
union case Event.PartitionsRevoked: List<Topic * Partition * Offset> -> Event
union case Event.ConsumerError: ErrorCode -> Event
union case Event.Error: ErrorCode * string -> Event
val ofTopicPartitionOffset : tpo:TopicPartitionOffset -> string * int * int64

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ofTopicPartitionOffset
val tpo : TopicPartitionOffset
Multiple items
type TopicPartitionOffset =
  struct
    new : topic:string * partition:int * offset:int64 -> TopicPartitionOffset
    member Offset : int64 with get, set
    member Partition : int with get, set
    member ToString : unit -> string
    member Topic : string with get, set
  end

Full name: RdKafka.TopicPartitionOffset

--------------------
TopicPartitionOffset()
TopicPartitionOffset(topic: string, partition: int, offset: int64) : unit
property TopicPartitionOffset.Topic: string
property TopicPartitionOffset.Partition: int
property TopicPartitionOffset.Offset: int64
val ofTopicPartitionOffsets : (Collections.Generic.IList<TopicPartitionOffset> -> (string * int * int64) list)

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ofTopicPartitionOffsets
module Seq

from Microsoft.FSharp.Collections
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.map
val toList : source:seq<'T> -> 'T list

Full name: Microsoft.FSharp.Collections.Seq.toList
val ofCommit : oca:Consumer.OffsetCommitArgs -> ErrorCode * (string * int * int64) list

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ofCommit
val oca : Consumer.OffsetCommitArgs
Multiple items
type Consumer =
  inherit Handle
  new : config:Config * ?brokerList:string -> Consumer
  member Assign : partitions:List<TopicPartitionOffset> -> unit
  member Assignment : List<TopicPartition>
  member Commit : unit -> Task + 2 overloads
  member Committed : partitions:List<TopicPartition> * timeout:TimeSpan -> Task<List<TopicPartitionOffset>>
  member Consume : timeout:TimeSpan -> Nullable<MessageAndError>
  member Dispose : unit -> unit
  member GetWatermarkOffsets : topicPartition:TopicPartition -> Offsets
  member Position : partitions:List<TopicPartition> -> List<TopicPartitionOffset>
  member Subscribe : topics:List<string> -> unit
  ...
  nested type OffsetCommitArgs

Full name: RdKafka.Consumer

--------------------
Consumer(config: Config, ?brokerList: string) : unit
type OffsetCommitArgs =
  struct
    member Error : ErrorCode with get, set
    member Offsets : IList<TopicPartitionOffset> with get, set
  end

Full name: RdKafka.Consumer.OffsetCommitArgs
property Consumer.OffsetCommitArgs.Error: ErrorCode
property Consumer.OffsetCommitArgs.Offsets: Collections.Generic.IList<TopicPartitionOffset>
val ofError : ea:Handle.ErrorArgs -> ErrorCode * string

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ofError
val ea : Handle.ErrorArgs
Multiple items
type Handle =
  new : unit -> Handle
  member Dispose : unit -> unit
  member ListGroup : group:string * timeout:TimeSpan -> Task<GroupInfo>
  member ListGroups : timeout:TimeSpan -> Task<List<GroupInfo>>
  member LogLevel : -> int with set
  member MemberId : string
  member Metadata : ?allTopics:bool * ?onlyForTopic:Topic * ?includeInternal:bool * ?timeout:TimeSpan -> Task<Metadata>
  member Name : string
  member OutQueueLength : int64
  member QueryWatermarkOffsets : topicPartition:TopicPartition * ?timeout:TimeSpan -> Task<Offsets>
  ...
  nested type ErrorArgs

Full name: RdKafka.Handle

--------------------
Handle() : unit
type ErrorArgs =
  struct
    member ErrorCode : ErrorCode with get, set
    member Reason : string with get, set
  end

Full name: RdKafka.Handle.ErrorArgs
property Handle.ErrorArgs.ErrorCode: ErrorCode
property Handle.ErrorArgs.Reason: string
val toLog : _arg1:Event -> unit

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.toLog
val fromConsumerToLog : c:EventConsumer -> unit

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.fromConsumerToLog
val c : EventConsumer
Multiple items
type EventConsumer =
  inherit Consumer
  new : config:Config * ?brokerList:string -> EventConsumer
  member Dispose : unit -> unit
  member Start : unit -> unit
  member Stop : unit -> Task
  event OnMessage : EventHandler<Message>
  event OnConsumerError : EventHandler<ErrorCode>
  event OnEndReached : EventHandler<TopicPartitionOffset>

Full name: RdKafka.EventConsumer

--------------------
EventConsumer(config: Config, ?brokerList: string) : unit
event Handle.OnStatistics: IEvent<EventHandler<string>,string>
member IObservable.Add : callback:('T -> unit) -> unit
event Consumer.OnOffsetCommit: IEvent<EventHandler<Consumer.OffsetCommitArgs>,Consumer.OffsetCommitArgs>
event EventConsumer.OnEndReached: IEvent<EventHandler<TopicPartitionOffset>,TopicPartitionOffset>
event Consumer.OnPartitionsAssigned: IEvent<EventHandler<Collections.Generic.List<TopicPartitionOffset>>,Collections.Generic.List<TopicPartitionOffset>>
event Consumer.OnPartitionsRevoked: IEvent<EventHandler<Collections.Generic.List<TopicPartitionOffset>>,Collections.Generic.List<TopicPartitionOffset>>
event EventConsumer.OnConsumerError: IEvent<EventHandler<ErrorCode>,ErrorCode>
event Handle.OnError: IEvent<EventHandler<Handle.ErrorArgs>,Handle.ErrorArgs>
val fromProducerToLog : p:Producer -> unit

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.fromProducerToLog
val p : Producer
Multiple items
type Producer =
  inherit Handle
  new : brokerList:string -> Producer + 1 overload
  member Topic : topic:string * ?config:TopicConfig -> Topic

Full name: RdKafka.Producer

--------------------
Producer(brokerList: string) : unit
Producer(config: Config, ?brokerList: string) : unit
val connect : brokerCsv:BrokerCsv -> group:ConsumerName -> autoCommit:bool -> EventConsumer * Producer

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.connect
val brokerCsv : BrokerCsv
val group : ConsumerName
val autoCommit : bool
type bool = Boolean

Full name: Microsoft.FSharp.Core.bool
val config : Config
Multiple items
type Config =
  new : unit -> Config
  member DefaultTopicConfig : TopicConfig with get, set
  member Dump : unit -> Dictionary<string, string>
  member EnableAutoCommit : bool with get, set
  member GroupId : string with get, set
  member Item : string -> string with get, set
  member Logger : LogCallback with get, set
  member StatisticsInterval : TimeSpan with get, set
  nested type LogCallback

Full name: RdKafka.Config

--------------------
Config() : unit
property Config.GroupId: string
property Config.StatisticsInterval: TimeSpan
Multiple items
type TimeSpan =
  struct
    new : ticks:int64 -> TimeSpan + 3 overloads
    member Add : ts:TimeSpan -> TimeSpan
    member CompareTo : value:obj -> int + 1 overload
    member Days : int
    member Duration : unit -> TimeSpan
    member Equals : value:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member Hours : int
    member Milliseconds : int
    member Minutes : int
    ...
  end

Full name: System.TimeSpan

--------------------
TimeSpan()
TimeSpan(ticks: int64) : unit
TimeSpan(hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : unit
TimeSpan.FromSeconds(value: float) : TimeSpan
property Config.EnableAutoCommit: bool
property Config.DefaultTopicConfig: TopicConfig
val topicConfig : TopicConfig
Multiple items
type TopicConfig =
  new : unit -> TopicConfig
  member CustomPartitioner : Partitioner with get, set
  member Dump : unit -> Dictionary<string, string>
  member Item : string -> string with get, set
  nested type Partitioner

Full name: RdKafka.TopicConfig

--------------------
TopicConfig() : unit
val publish : brokerCsv:BrokerCsv -> group:ConsumerName -> topic:Topic -> (byte [] * byte [] -> Async<int * int64>)

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.publish
val topic : Topic
val consumer : EventConsumer
val producer : Producer
Producer.Topic(topic: string, ?config: TopicConfig) : Topic
val key : byte []
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = Byte

Full name: Microsoft.FSharp.Core.byte
val payload : byte []
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val report : DeliveryReport
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.AwaitTask : task:Threading.Tasks.Task -> Async<unit>
static member Async.AwaitTask : task:Threading.Tasks.Task<'T> -> Async<'T>
Topic.Produce(payload: byte [], ?key: byte [], ?partition: int) : Threading.Tasks.Task<DeliveryReport>
field DeliveryReport.Partition
field DeliveryReport.Offset
val subscribeCallback : brokerCsv:BrokerCsv -> group:ConsumerName -> topic:Topic -> onMessage:(Message -> unit) -> unit

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.subscribeCallback
val onMessage : (Message -> unit)
val map : mapping:('T -> 'U) -> list:'T list -> 'U list

Full name: Microsoft.FSharp.Collections.List.map
val t : string
val p : int
val o : int64
field Offset.Stored = -1000L
Multiple items
namespace System.Collections

--------------------
namespace Microsoft.FSharp.Collections
namespace System.Collections.Generic
Multiple items
type List<'T> =
  new : unit -> List<'T> + 2 overloads
  member Add : item:'T -> unit
  member AddRange : collection:IEnumerable<'T> -> unit
  member AsReadOnly : unit -> ReadOnlyCollection<'T>
  member BinarySearch : item:'T -> int + 2 overloads
  member Capacity : int with get, set
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
  member CopyTo : array:'T[] -> unit + 2 overloads
  ...
  nested type Enumerator

Full name: System.Collections.Generic.List<_>

--------------------
Collections.Generic.List() : unit
Collections.Generic.List(capacity: int) : unit
Collections.Generic.List(collection: Collections.Generic.IEnumerable<'T>) : unit
Consumer.Assign(partitions: Collections.Generic.List<TopicPartitionOffset>) : unit
event EventConsumer.OnMessage: IEvent<EventHandler<Message>,Message>
Consumer.Subscribe(topics: Collections.Generic.List<string>) : unit
EventConsumer.Start() : unit
val subscribeSeq : brokerCsv:BrokerCsv -> group:ConsumerName -> topic:string -> seq<Partition * Offset * byte []>

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.subscribeSeq
val topic : string
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
val buffer : int
val messages : BlockingCollection<Message>
Multiple items
type BlockingCollection<'T> =
  new : unit -> BlockingCollection<'T> + 3 overloads
  member Add : item:'T -> unit + 1 overload
  member BoundedCapacity : int
  member CompleteAdding : unit -> unit
  member CopyTo : array:'T[] * index:int -> unit
  member Count : int
  member Dispose : unit -> unit
  member GetConsumingEnumerable : unit -> IEnumerable<'T> + 1 overload
  member IsAddingCompleted : bool
  member IsCompleted : bool
  ...

Full name: System.Collections.Concurrent.BlockingCollection<_>

--------------------
BlockingCollection() : unit
BlockingCollection(boundedCapacity: int) : unit
BlockingCollection(collection: IProducerConsumerCollection<'T>) : unit
BlockingCollection(collection: IProducerConsumerCollection<'T>, boundedCapacity: int) : unit
type Message =
  struct
    member Key : byte[] with get, set
    member Offset : int64 with get, set
    member Partition : int with get, set
    member Payload : byte[] with get, set
    member Topic : string with get, set
    member TopicPartitionOffset : TopicPartitionOffset
  end

Full name: RdKafka.Message
Multiple items
type ConcurrentQueue<'T> =
  new : unit -> ConcurrentQueue<'T> + 1 overload
  member CopyTo : array:'T[] * index:int -> unit
  member Count : int
  member Enqueue : item:'T -> unit
  member GetEnumerator : unit -> IEnumerator<'T>
  member IsEmpty : bool
  member ToArray : unit -> 'T[]
  member TryDequeue : result:'T -> bool
  member TryPeek : result:'T -> bool

Full name: System.Collections.Concurrent.ConcurrentQueue<_>

--------------------
ConcurrentQueue() : unit
ConcurrentQueue(collection: Collections.Generic.IEnumerable<'T>) : unit
BlockingCollection.Add(item: Message) : unit
BlockingCollection.Add(item: Message, cancellationToken: Threading.CancellationToken) : unit
val initInfinite : initializer:(int -> 'T) -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.initInfinite
val message : Message
BlockingCollection.Take() : Message
BlockingCollection.Take(cancellationToken: Threading.CancellationToken) : Message
property Message.Partition: int
property Message.Offset: int64
property Message.Payload: byte []
type Offsets =
  {Next: Offset option;
   Active: Offset list;
   Processed: Offset list;}

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Offsets
Offsets.Next: Offset option
type 'T option = Option<'T>

Full name: Microsoft.FSharp.Core.option<_>
Offsets.Active: Offset list
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
Offsets.Processed: Offset list
Multiple items
type CompilationRepresentationAttribute =
  inherit Attribute
  new : flags:CompilationRepresentationFlags -> CompilationRepresentationAttribute
  member Flags : CompilationRepresentationFlags

Full name: Microsoft.FSharp.Core.CompilationRepresentationAttribute

--------------------
new : flags:CompilationRepresentationFlags -> CompilationRepresentationAttribute
type CompilationRepresentationFlags =
  | None = 0
  | Static = 1
  | Instance = 2
  | ModuleSuffix = 4
  | UseNullAsTrueValue = 8
  | Event = 16

Full name: Microsoft.FSharp.Core.CompilationRepresentationFlags
CompilationRepresentationFlags.ModuleSuffix: CompilationRepresentationFlags = 4
val empty : Offsets

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetsModule.empty
union case Option.None: Option<'T>
val private update : _arg1:Offsets -> Offsets

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetsModule.update
val x : Offsets
val max : list:'T list -> 'T (requires comparison)

Full name: Microsoft.FSharp.Collections.List.max
union case Option.Some: Value: 'T -> Option<'T>
val partition : predicate:('T -> bool) -> list:'T list -> 'T list * 'T list

Full name: Microsoft.FSharp.Collections.List.partition
val min : list:'T list -> 'T (requires comparison)

Full name: Microsoft.FSharp.Collections.List.min
val c : Offset list
val p : Offset list
val start : x:Offset -> xs:Offsets -> Offsets

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetsModule.start
val x : Offset
val xs : Offsets
val finish : x:Offset -> xs:Offsets -> Offsets

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetsModule.finish
val filter : predicate:('T -> bool) -> list:'T list -> 'T list

Full name: Microsoft.FSharp.Collections.List.filter
type Partitions = Map<Partition,Offsets>

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Partitions
Multiple items
module Map

from Microsoft.FSharp.Collections

--------------------
type Map<'Key,'Value (requires comparison)> =
  interface IEnumerable
  interface IComparable
  interface IEnumerable<KeyValuePair<'Key,'Value>>
  interface ICollection<KeyValuePair<'Key,'Value>>
  interface IDictionary<'Key,'Value>
  new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
  member Add : key:'Key * value:'Value -> Map<'Key,'Value>
  member ContainsKey : key:'Key -> bool
  override Equals : obj -> bool
  member Remove : key:'Key -> Map<'Key,'Value>
  ...

Full name: Microsoft.FSharp.Collections.Map<_,_>

--------------------
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
val assign : p:Partition -> xs:Partitions -> Partitions

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.assign
val p : Partition
val xs : Partitions
val tryFind : key:'Key -> table:Map<'Key,'T> -> 'T option (requires comparison)

Full name: Microsoft.FSharp.Collections.Map.tryFind
val add : key:'Key -> value:'T -> table:Map<'Key,'T> -> Map<'Key,'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Map.add
val start : p:Partition * o:Offset -> xs:Partitions -> Partitions

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.start
val o : Offset
val offsets : Offsets
val finish : p:Partition * o:Offset -> xs:Partitions -> Partitions

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.finish
val revoke : p:Partition -> xs:Partitions -> Partitions

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.revoke
val remove : key:'Key -> table:Map<'Key,'T> -> Map<'Key,'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Map.remove
val checkpoint : (Partitions -> List<Partition * Offset>)

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.checkpoint
val toList : table:Map<'Key,'T> -> ('Key * 'T) list (requires comparison)

Full name: Microsoft.FSharp.Collections.Map.toList
val choose : chooser:('T -> 'U option) -> list:'T list -> 'U list

Full name: Microsoft.FSharp.Collections.List.choose
val o : Offsets
module Option

from Microsoft.FSharp.Core
val map : mapping:('T -> 'U) -> option:'T option -> 'U option

Full name: Microsoft.FSharp.Core.Option.map
type OffsetMonitor = OffsetMonitorMessage -> unit

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitor
type OffsetMonitorMessage =
  | Start of Partition * Offset
  | Finish of Partition * Offset
  | Assign of Partition list
  | Revoke of Partition list

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorMessage
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
union case OffsetMonitorMessage.Start: Partition * Offset -> OffsetMonitorMessage
union case OffsetMonitorMessage.Finish: Partition * Offset -> OffsetMonitorMessage
union case OffsetMonitorMessage.Assign: Partition list -> OffsetMonitorMessage
union case OffsetMonitorMessage.Revoke: Partition list -> OffsetMonitorMessage
val assign : m:(OffsetMonitorMessage -> 'a) -> (('b * Partition * 'c) list -> 'a)

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.assign
val m : (OffsetMonitorMessage -> 'a)
val revoke : m:(OffsetMonitorMessage -> 'a) -> (('b * Partition * 'c) list -> 'a)

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.revoke
val start : m:(OffsetMonitorMessage -> 'a) -> x:Message -> 'a

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.start
val x : Message
val finish : m:(OffsetMonitorMessage -> 'a) -> x:Message -> 'a

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.finish
val create : consumer:EventConsumer -> topic:Topic -> (OffsetMonitorMessage -> unit)

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.create
val integrate : (OffsetMonitorMessage -> Partitions -> Partitions)
Multiple items
module Partitions

from 2016-12-08-rdkafka-for-fsharp-microservices

--------------------
type Partitions = Map<Partition,Offsets>

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Partitions
val ps : Partition list
val foldBack : folder:('T -> 'State -> 'State) -> list:'T list -> state:'State -> 'State

Full name: Microsoft.FSharp.Collections.List.foldBack
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<OffsetMonitorMessage>
val loop : (Stopwatch * Partitions -> Async<'a>)
val watch : Stopwatch
Multiple items
type Stopwatch =
  new : unit -> Stopwatch
  member Elapsed : TimeSpan
  member ElapsedMilliseconds : int64
  member ElapsedTicks : int64
  member IsRunning : bool
  member Reset : unit -> unit
  member Restart : unit -> unit
  member Start : unit -> unit
  member Stop : unit -> unit
  static val Frequency : int64
  ...

Full name: System.Diagnostics.Stopwatch

--------------------
Stopwatch() : unit
val partitions : Partitions
property Stopwatch.Elapsed: TimeSpan
property TimeSpan.TotalSeconds: float
val result : Choice<unit,exn>
Consumer.Commit() : Threading.Tasks.Task
Consumer.Commit(offsets: Collections.Generic.List<TopicPartitionOffset>) : Threading.Tasks.Task
Consumer.Commit(message: Message) : Threading.Tasks.Task
static member Async.Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
Stopwatch.StartNew() : Stopwatch
val message : OffsetMonitorMessage option
member MailboxProcessor.TryReceive : ?timeout:int -> Async<'Msg option>
val m : OffsetMonitorMessage
val empty<'Key,'T (requires comparison)> : Map<'Key,'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Map.empty
val ofSeq : xs:seq<'a> -> unit

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.AsyncSeq.ofSeq
val xs : seq<'a>
val iterAsyncParThrottled : 'a -> 'b -> 'c -> Async<unit>

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.AsyncSeq.iterAsyncParThrottled
member AsyncBuilder.Return : value:'T -> Async<'T>
val subscribeParallel : concurrency:'a -> brokerCsv:BrokerCsv -> group:ConsumerName -> topic:Topic -> onMessage:(Message -> Async<unit>) -> Async<unit>

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.subscribeParallel
val concurrency : 'a
val onMessage : (Message -> Async<unit>)
val messageSeq : seq<Message>
val monitor : (OffsetMonitorMessage -> unit)
Multiple items
module OffsetMonitor

from 2016-12-08-rdkafka-for-fsharp-microservices

--------------------
type OffsetMonitor = OffsetMonitorMessage -> unit

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitor
module AsyncSeq

from 2016-12-08-rdkafka-for-fsharp-microservices
type Watermark =
  {Topic: string;
   Partition: int;
   High: int64;
   Low: int64;}

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Watermark
Multiple items
Watermark.Topic: string

--------------------
type Topic = string

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Topic
Multiple items
Watermark.Partition: int

--------------------
type Partition = int

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Partition
Watermark.High: int64
Watermark.Low: int64
val queryWithTimeout : timeout:TimeSpan -> producer:Producer -> consumer:Consumer -> topic:Topic -> Async<Watermark []>

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.WatermarkModule.queryWithTimeout
val timeout : TimeSpan
val consumer : Consumer
val queryWatermark : (string * int -> Async<Watermark>)
val w : Offsets
Handle.QueryWatermarkOffsets(topicPartition: TopicPartition, ?timeout: TimeSpan) : Threading.Tasks.Task<Offsets>
Multiple items
type TopicPartition =
  struct
    new : topic:string * partition:int -> TopicPartition
    member Partition : int with get, set
    member ToString : unit -> string
    member Topic : string with get, set
  end

Full name: RdKafka.TopicPartition

--------------------
TopicPartition()
TopicPartition(topic: string, partition: int) : unit
property Offsets.High: int64
property Offsets.Low: int64
val metadata : Metadata
Handle.Metadata(?allTopics: bool, ?onlyForTopic: Topic, ?includeInternal: bool, ?timeout: TimeSpan) : Threading.Tasks.Task<Metadata>
property Metadata.Topics: Collections.Generic.List<TopicMetadata>
val collect : mapping:('T -> #seq<'U>) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.collect
val t : TopicMetadata
val p : PartitionMetadata
property TopicMetadata.Partitions: Collections.Generic.List<PartitionMetadata>
property TopicMetadata.Topic: string
property PartitionMetadata.PartitionId: int
val sort : source:seq<'T> -> seq<'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Seq.sort
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
type Checkpoint = List<Partition * Offset>

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Checkpoint
Multiple items
module Checkpoint

from 2016-12-08-rdkafka-for-fsharp-microservices

--------------------
type Checkpoint = List<Partition * Offset>

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Checkpoint
val queryWithTimeout : timeout:TimeSpan -> producer:Producer -> consumer:Consumer -> topic:Topic -> Async<Checkpoint>

Full name: 2016-12-08-rdkafka-for-fsharp-microservices.CheckpointModule.queryWithTimeout
val partitions : seq<TopicPartition>
val committed : Collections.Generic.List<TopicPartitionOffset>
Consumer.Committed(partitions: Collections.Generic.List<TopicPartition>, timeout: TimeSpan) : Threading.Tasks.Task<Collections.Generic.List<TopicPartitionOffset>>
val checkpoint : Checkpoint
val sortBy : projection:('T -> 'Key) -> source:seq<'T> -> seq<'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Seq.sortBy
val fst : tuple:('T1 * 'T2) -> 'T1

Full name: Microsoft.FSharp.Core.Operators.fst