Java Flow Code Walkthrough heading-link-icon

This section describes the Java code for the flows Communication between participants in an application network is peer-to-peer using flows. and supporting classes in the workflows/src/main/java/com/r3/developers/cordapptemplate/flowexample/workflows folder in the CorDapp template repo. It contains the following:

The src/main/java/com/r3/developers/cordapptemplate/flowexample/workflows folder contains the following files:

package com.r3.developers.cordapptemplate.flowexample.workflows;

import net.corda.v5.base.annotations.CordaSerializable;
import net.corda.v5.base.types.MemberX500Name;

// A class which will contain a message, It must be marked with @CordaSerializable for Corda
// to be able to send from one virtual node to another.
@CordaSerializable
public class Message {
    public Message(MemberX500Name sender, String message) {
        this.sender = sender;
        this.message = message;
    }

    public MemberX500Name getSender() {
        return sender;
    }

    public String getMessage() {
        return message;
    }

    public MemberX500Name sender;
    public String message;
}
package com.r3.developers.cordapptemplate.flowexample.workflows;

import net.corda.v5.application.flows.*;
import net.corda.v5.application.marshalling.JsonMarshallingService;
import net.corda.v5.application.membership.MemberLookup;
import net.corda.v5.application.messaging.FlowMessaging;
import net.corda.v5.application.messaging.FlowSession;
import net.corda.v5.base.annotations.Suspendable;
import net.corda.v5.base.types.MemberX500Name;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


// MyFirstFlow is an initiating flow, it's corresponding responder flow is called MyFirstFlowResponder (defined below)
// to link the two sides of the flow together they need to have the same protocol.
@InitiatingFlow(protocol = "my-first-flow")
// MyFirstFlow should inherit from ClientStartableFlow, which tells Corda it can be started via an REST call
public class MyFirstFlow implements ClientStartableFlow {

  // Log messages from the flows for debugging.
  private final static Logger log = LoggerFactory.getLogger(MyFirstFlow.class);

  // Corda has a set of injectable services which are injected into the flow at runtime.
  // Flows declare them with @CordaInjectable, then the flows have access to their services.

  // JsonMarshallingService provides a service for manipulating JSON.
  @CordaInject
  public JsonMarshallingService jsonMarshallingService;

  // FlowMessaging provides a service that establishes flow sessions between virtual nodes
  // that send and receive payloads between them.
  @CordaInject
  public FlowMessaging flowMessaging;

  // MemberLookup provides a service for looking up information about members of the virtual network which
  // this CorDapp operates in.
  @CordaInject
  public MemberLookup memberLookup;

  public MyFirstFlow() {}

  // When a flow is invoked its call() method is called.
  // Call() methods must be marked as @Suspendable, this allows Corda to pause mid-execution to wait
  // for a response from the other flows and services.

  @Suspendable
  @Override
  public String call(ClientRequestBody requestBody) {

    // Follow what happens in the console or logs.
    log.info("MFF: MyFirstFlow.call() called");

    // Show the requestBody in the logs - this can be used to help establish the format for starting a flow on Corda.
    log.info("MFF: requestBody: " + requestBody.getRequestBody());

    // Deserialize the Json requestBody into the MyfirstFlowStartArgs class using the JsonSerialisation service.
    MyFirstFlowStartArgs flowArgs = requestBody.getRequestBodyAs(jsonMarshallingService, MyFirstFlowStartArgs.class);

    // Obtain the MemberX500Name of the counterparty.
    MemberX500Name otherMember = flowArgs.getOtherMember();

    // Get our identity from the MemberLookup service.
    MemberX500Name ourIdentity = memberLookup.myInfo().getName();

    // Create the message payload using the MessageClass we defined.
    Message message = new Message(otherMember, "Hello from " + ourIdentity + ".");

    // Log the message to be sent.
    log.info("MFF: message.message: " + message.getMessage());

    // Start a flow session with the otherMember using the FlowMessaging service.
    // The otherMember's virtual node will run the corresponding MyFirstFlowResponder responder flow.
    FlowSession session = flowMessaging.initiateFlow(otherMember);

    // Send the Payload using the send method on the session to the MyFirstFlowResponder responder flow.
    session.send(message);

    // Receive a response from the responder flow.
    Message response = session.receive(Message.class);

    // The return value of a ClientStartableFlow must always be a String. This will be passed
    // back as the REST response when the status of the flow is queried on Corda, or as the return
    // value from the flow when testing using the simulator.
    return response.getMessage();
  }
}

/*
RequestBody for triggering the flow via REST:
{
    "clientRequestId": "r1",
    "flowClassName": "com.r3.developers.cordapptemplate.flowexample.workflows.MyFirstFlow",
    "requestData": {
        "otherMember":"CN=Bob, OU=Test Dept, O=R3, L=London, C=GB"
        }
}
 */
package com.r3.developers.cordapptemplate.flowexample.workflows;

import net.corda.v5.application.flows.CordaInject;
import net.corda.v5.application.flows.InitiatedBy;
import net.corda.v5.application.flows.ResponderFlow;
import net.corda.v5.application.membership.MemberLookup;
import net.corda.v5.application.messaging.FlowSession;
import net.corda.v5.base.annotations.Suspendable;
import net.corda.v5.base.types.MemberX500Name;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


// MyFirstFlowResponder is a responder flow. Its corresponding initiating flow is called MyFirstFlow (defined in MyFirstFlow.java)
// to link the two sides of the flow together they need to have the same protocol.
@InitiatedBy(protocol = "my-first-flow")
// Responder flows must inherit from ResponderFlow
public class MyFirstFlowResponder implements ResponderFlow {

    // Log messages from the flows for debugging.
    private final static Logger log = LoggerFactory.getLogger(MyFirstFlowResponder.class);

    // MemberLookup looks for information about members of the virtual network which
    // this CorDapp operates in.
    @CordaInject
    public MemberLookup memberLookup;

    public MyFirstFlowResponder() {}

    // Responder flows are invoked when an initiating flow makes a call via a session set up with the virtual
    // node hosting the responder flow. When a responder flow is invoked its call() method is called.
    // call() methods must be marked as @Suspendable, this allows Corda to pause mid-execution to wait
    // for a response from the other flows and services.
    // The call() method has the flow session passed in as a parameter by Corda so the session is available to
    // responder flow code, you don't need to inject the FlowMessaging service.
    @Suspendable
    @Override
    public void call(FlowSession session) {

        // Follow what happens in the console or logs.
        log.info("MFF: MyFirstResponderFlow.call() called");

        // Receive the payload and deserialize it into a message class.
        Message receivedMessage = session.receive(Message.class);

        // Log the message as a proxy for performing some useful operation on it.
        log.info("MFF: Message received from " + receivedMessage.getSender() + ":" + receivedMessage.getMessage());

        // Get our identity from the MemberLookup service.
        MemberX500Name ourIdentity = memberLookup.myInfo().getName();

        // Create a message to greet the sender.
        Message response = new Message(ourIdentity,
                "Hello " + session.getCounterparty().getCommonName() + ", best wishes from " + ourIdentity.getCommonName());

        // Log the response to be sent.
        log.info("MFF: response.message: " + response.getMessage());

        // Send the response via the send method on the flow session
        session.send(response);
    }
}
package com.r3.developers.cordapptemplate.flowexample.workflows;

import net.corda.v5.base.types.MemberX500Name;

// A class to hold the arguments required to start the flow
public class MyFirstFlowStartArgs {
    public MemberX500Name otherMember;

    public MyFirstFlowStartArgs(MemberX500Name otherMember) {
        this.otherMember = otherMember;
    }

    // The JSON Marshalling Service, which handles serialisation, needs this constructor.
    public MyFirstFlowStartArgs() {}
}

There are two helper classes:

  • MyFirstFlowStartArgs — provides a wrapper around the single arguments that need to be passed into the flow — the other member of the application network The set of all possible entities onboarded according to the rules of the network. Application networks enable people and businesses to interact with each other on Corda. who the message should be sent to:

    public class MyFirstFlowStartArgs {
     public MemberX500Name otherMember;
    
     public MyFirstFlowStartArgs(MemberX500Name otherMember) {
         this.otherMember = otherMember;
     }
    
     // The JSON Marshalling Service, which handles serialisation, needs this constructor.
     public MyFirstFlowStartArgs() {}
    }
    
  • Message — specifies the sender and the message. This is used for both the message sent from the initiator to the responder and subsequently the message sent back from the responder to the initiator. Note, as this is a class defined in a CorDapp Corda Distributed Application. A Java (or any JVM targeting language) application built using the Corda build toolchain and CorDapp API to solve some problem that is best solved in a decentralized manner. and it is going to be sent ‘down the wire’ between two virtual nodes, it requires the @CordaSerializable annotation.

    @CordaSerializable
    public class Message {
        public Message(MemberX500Name sender, String message) {
            this.sender = sender;
            this.message = message;
        }
    
        public MemberX500Name getSender() {
            return sender;
        }
    
        public String getMessage() {
            return message;
        }
    
        public MemberX500Name sender;
        public String message;
    }
    

To trigger a flow from REST, the flow must inherit from ClientStartableFlow. Most flows will come in pairs; one initiating flow and a corresponding responder flow. The responder flow must inherit from ResponderFlow. The two flows are linked by adding the @InitiatingFlow and @InitiatedBy annotations which both specify the same protocol in this case “my-first-flow”:

@InitiatingFlow(protocol = "my-first-flow")
public class MyFirstFlow implements ClientStartableFlow  { ... }
@InitiatedBy(protocol = "my-first-flow")
public class MyFirstFlowResponder implements ResponderFlow  { ... }

It is useful to add logging statements to both the initiating and responder flows. To do this, add a logger to each class. When running tests locally, the console displays log entries. When running on Corda, the log files are updated.

    private final static Logger log = LoggerFactory.getLogger(MyFirstFlowResponder.class);

The log files for the CorDapp template are located in the logs folder in the root of the project. The CorDapp template starts a new log file each time a new instance of Corda is created or when the log file grows too large. The logging can be configured by editing config/static-network-config.json file. Because the Corda combined worker runs all of the Corda processes in one JVM process, there are a lot of log entries. We recommend adding an easily searchable tag to each log message. For example:

        log.info("MFF: MyFirstFlow.call() called");

Each flow has a call() method. This is the method which Corda invokes when the flow is invoked.

When a flow is started via REST, the requestBody from the HTTP request is passed into the call method as the requestBody parameter, giving the rest of the call method access to the parameters passed in via HTTP.

When a responder flow is invoked as a result of an initiator flow on another node, the flow session with the initiating node is passed in as the parameter session.

The call() method in both flows must be marked as @Suspendable. This is an indicator to Corda that this method can be paused and persisted to the database whilst the flow waits for an asynchronous response. It will be rehydrated and continue to execute once the appropriate response is received. It is this mechanism that allows a CorDapp Developer to write what appears to be synchronous, blocking code that executes asynchronously and does not block the Corda cluster.

In the initiating flow:

   @Suspendable
   @Override
   public String call(ClientRequestBody requestBody)  { ... }

In the responder flow:

   @Suspendable
   @Override
   public void call(FlowSession session) { ... }

Corda 5 requires a CorDapp Developer to explicitly specify which Corda services are required by the flow. In this simple example we use three services:

  • JsonMarshallingService — a service that CorDapps and other services may use to marshal arbitrary content in and out of JSON format using standard approved mappers.
  • FlowMessaging — a service that CorDapps can use to create communication sessions between two virtual nodes. Once set up, you can send and receive using the session object.
  • MemberLookup — a service that CorDapps can use to retrieve information about virtual nodes on the network.

There are other services, such as the Persistence and Serialization services, which are beyond the scope of this Getting Started example.

Services are declared as properties in the flow class with the @CordaInject annotation:

public class MyFirstFlow implements ClientStartableFlow {

    ...

    @CordaInject
    public JsonMarshallingService jsonMarshallingService;

    @CordaInject
    public FlowMessaging flowMessaging;

    @CordaInject
    public MemberLookup memberLookup;

    ...

    }

The services are then available in the call function. For example to initiate a FlowSession with the other party:

    FlowSession session = flowMessaging.initiateFlow(otherMember);

The first thing that the MyFirstFlow.call() method does is convert the requestBody parameters into a Kotlin class. It does this using the getRequestBodyAs() method. This takes the jsonMarshallingService and the class that the requestBody parameters should be parsed into. The flowArgs variable has the type MyFirstFlowStartArgs, the helper class we declared in the Helper classes section.

   @Suspendable
    @Override
    public String call(ClientRequestBody requestBody) {

        ...

        MyFirstFlowStartArgs flowArgs = requestBody.getRequestBodyAs(jsonMarshallingService, MyFirstFlowStartArgs.class);

        ...

        }

We can now obtain the X500 name of the other virtual node that we want to send the message to from flowArgs, which we will need later to set up the FlowSession:

        MemberX500Name otherMember = flowArgs.otherMember;

To create the message, you must know the identity of the initiator. Remember that this flow could run on any node, so the identity cannot be hard coded. To find out the identity of the virtual node running the initiator flow, use the injected memberLookup service:

MemberX500Name otherMember = flowArgs.otherMember;
MemberX500Name ourIdentity = memberLookup.myInfo().getName();
Message message = new Message(otherMember, "Hello from " + ourIdentity + ".");
log.info("MFF: message.message: " + message.message);

We can now start sending messages to the responder:

  1. Set up a FlowSession between the initiator and responder node:

         FlowSession session = flowMessaging.initiateFlow(otherMember);
    
  2. Simply call send() on the session and pass a payload:

         session.send(message);
    

    The code continues to execute until it reaches the session.receive() method. At that point, the flow checkpoints and persists its state An immutable object representing a fact known by one or more participants at a specific point in time. You can use states to represent any type of data, and any kind of fact. to the database. It resumes when it receives a message back from the responder. This frees up the Corda cluster flow workers A worker that runs the CorDapp application code and translates flow API calls into function requests to the relevant workers. The flow workers are designed to share work between themselves and to record checkpoints at each stage of the application's progress, so that in the event of worker failure, the operations can be retried. to perform other tasks.

            Message response = session.receive(Message.class);
    

    When the message is sent to the responder via the send() method, Corda executes session.receive() in the MyFirstResponderFlow responder flow’s call() method. This session.receive() method retrieves the message that was sent in the initiator’s send() method.

    @Suspendable
    @Override
    public void call(FlowSession session) {
      log.info("MFF: MyFirstResponderFlow.call() called");
      Message receivedMessage = session.receive(Message.class);
      ...
    }
    

    The responder flow then obtains its own identity from its injected memberLookup service, creates a response, and sends the response back using session.send():

       MemberX500Name ourIdentity = memberLookup.myInfo().getName();
       Message response = new Message(ourIdentity,
              "Hello " + session.getCounterparty().getCommonName() + ", best wishes from " + ourIdentity.getCommonName());
      log.info("MFF: response.message: " + response.message);
      session.send(response);
    

    There is no return value for a ResponderFlow.

    Back on the Initiating virtual node, Corda detects the send coming from the responder and rehydrates the initiating flow from the database and resumes it from where it was checkpointed.

    The response is received and response.message is returned by the flow.

       Message response = session.receive(Message.class);
       return response.message;
    

    The response from the initiating flow is always a string, which can be returned when the flow status is queried by REST.

It is important that the sends and receives in the initiator and responder flows match. If the initiator sends a Foo and the responder expects a Bar, the flow hangs and likely results in a timeout error.

The sendAndReceive method on FlowSession sends a payload, check-points the flow, and then waits for a response to be received:

<ReceiveType> response = myFlowSession.sendAndReceive(<ReceiveType>.class, payload);

In Corda 4, when payloads were received they were wrapped in an UntrustworthyData class which required unwrapping:

// (Corda 4)
<ReceiveType> corda4Response = myFlowSession.sendAndReceive(<ReceiveType>.class, payload).unwrap( it->{
   <validationcode>
});

This has been removed in Corda 5 because CorDapp Developers usually use other methods to validate the data.

Was this page helpful?

Thanks for your feedback!

Chat with us

Chat with us on our #docs channel on slack. You can also join a lot of other slack channels there and have access to 1-on-1 communication with members of the R3 team and the online community.

Propose documentation improvements directly

Help us to improve the docs by contributing directly. It's simple - just fork this repository and raise a PR of your own - R3's Technical Writers will review it and apply the relevant suggestions.

We're sorry this page wasn't helpful. Let us know how we can make it better!

Chat with us

Chat with us on our #docs channel on slack. You can also join a lot of other slack channels there and have access to 1-on-1 communication with members of the R3 team and the online community.

Create an issue

Create a new GitHub issue in this repository - submit technical feedback, draw attention to a potential documentation bug, or share ideas for improvement and general feedback.

Propose documentation improvements directly

Help us to improve the docs by contributing directly. It's simple - just fork this repository and raise a PR of your own - R3's Technical Writers will review it and apply the relevant suggestions.