public class FlowLogic<T>
A sub-class of class FlowLogic
implements a flow using direct, straight line blocking code. Thus you
can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after
a node crash, how many instances of your flow there are running and so on.
Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using
the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should
request it just-in-time via the FlowLogic.getServiceHub
property which is provided. Don't try and keep data you got from a
service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from
underneath you, for instance, if the node is restarted or reconfigured!
Additionally, be aware of what data you pin either via the stack or in your class FlowLogic
implementation. Very large
objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.
If you'd like to use another FlowLogic class as a component of your own, construct it on the fly and then pass
it to the FlowLogic.subFlow
method. It will return the result of that flow when it completes.
If your flow (whether it's a top-level flow or a subflow) is supposed to initiate a session with the counterparty
and request they start their counterpart flow, then make sure it's annotated with interface InitiatingFlow
. This annotation
also has a version property to allow you to version your flow and enables a node to restrict support for the flow to
that particular version.
Functions that suspend the flow (including all functions on class FlowSession
) accept a maySkipCheckpoint parameter
defaulting to false, false meaning a checkpoint should always be created on suspend. This parameter may be set to
true which allows the implementation to potentially optimise away the checkpoint, saving a roundtrip to the database.
This option however comes with a big warning sign: Setting the parameter to true requires the flow's code to be replayable from the previous checkpoint (or start of flow) up until the next checkpoint (or end of flow) in order to prepare for hard failures. As suspending functions always commit the flow's database transaction regardless of this parameter the flow must be prepared for scenarios where a previous running of the flow already committed itsrelevant database transactions. Only set this option to true if you know what you're doing.
Modifier and Type | Class and Description |
---|---|
static class |
FlowLogic.Companion |
Modifier and Type | Field and Description |
---|---|
static FlowLogic.Companion |
Companion |
Constructor and Description |
---|
FlowLogic()
A sub-class of
class FlowLogic implements a flow using direct, straight line blocking code. Thus you
can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after
a node crash, how many instances of your flow there are running and so on. |
Modifier and Type | Method and Description |
---|---|
T |
call()
This is where you fill out your business logic.
|
void |
checkFlowPermission(java.lang.String permissionName,
java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to ensure that the active FlowInitiator is authorised for a particular action.
This provides fine grained control over application level permissions, when RPC control over starting the flow is insufficient,
or the permission is runtime dependent upon the choices made inside long lived flow code.
For example some users may have restricted limits on how much cash they can transfer, or whether they can change certain fields.
An audit event is always recorded whenever this method is used.
If the permission is not granted for the FlowInitiator a FlowException is thrown.
|
FlowStackSnapshot |
flowStackSnapshot()
Returns a shallow copy of the Quasar stack frames at the time of call to
FlowLogic.flowStackSnapshot . Use this to inspect
what objects would be serialised at the time of call to a suspending action (e.g. send/receive).
Note: This logic is only available during tests and is not meant to be used during the production deployment.
Therefore the default implementation does nothing. |
static FlowLogic<?> |
getCurrentTopLevel()
Return the outermost
class FlowLogic instance, or null if not in a flow. |
FlowInfo |
getFlowInfo(Party otherParty)
Deprecated.
|
org.slf4j.Logger |
getLogger()
This is where you should log things to.
|
Party |
getOurIdentity()
Specifies the identity to use for this flow. This will be one of the multiple identities that belong to this node.
This is the same as calling
ourIdentityAndCert.party . |
PartyAndCertificate |
getOurIdentityAndCert()
Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that
belong to this node.
|
ProgressTracker |
getProgressTracker()
Override this to provide a
class ProgressTracker . If one is provided and stepped, the framework will do something
helpful with the progress reports e.g record to the audit service. If this flow is invoked as a subflow of another,
then the tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track
progress. |
StateMachineRunId |
getRunId()
Returns a wrapped java.util.UUID object that identifies this state machine run (i.e. subflows have the same
identifier as their parents).
|
ServiceHub |
getServiceHub()
Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is
only available once the flow has started, which means it cannot be accessed in the constructor. Either
access this lazily or from inside
FlowLogic.call . |
FlowSession |
initiateFlow(Destination destination)
Creates a communication session with destination. Subsequently you may send/receive using this session object. How the messaging
is routed depends on the
interface Destination type, including whether this call does any initial communication. |
FlowSession |
initiateFlow(Party party)
Creates a communication session with party. Subsequently you may send/receive using this session object. Note
that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
|
void |
persistFlowStackSnapshot()
Persists a shallow copy of the Quasar stack frames at the time of call to
FlowLogic.persistFlowStackSnapshot .
Use this to track the monitor evolution of the quasar stack values during the flow execution.
The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform. |
<R> UntrustworthyData<R> |
receive(java.lang.Class<R> receiveType,
Party otherParty)
Deprecated.
|
<R> java.util.List<net.corda.core.utilities.UntrustworthyData> |
receiveAll(java.lang.Class<R> receiveType,
java.util.List<? extends net.corda.core.flows.FlowSession> sessions,
boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions.
|
<R> java.util.List<net.corda.core.utilities.UntrustworthyData> |
receiveAll(java.lang.Class<R> receiveType,
java.util.List<? extends net.corda.core.flows.FlowSession> sessions)
Suspends until a message has been received for each session in the specified sessions.
|
java.util.Map<net.corda.core.flows.FlowSession,net.corda.core.utilities.UntrustworthyData> |
receiveAllMap(java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Class<? extends java.lang.Object>> sessions,
boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions.
|
java.util.Map<net.corda.core.flows.FlowSession,net.corda.core.utilities.UntrustworthyData> |
receiveAllMap(java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Class<? extends java.lang.Object>> sessions)
Suspends until a message has been received for each session in the specified sessions.
|
void |
recordAuditEvent(java.lang.String eventType,
java.lang.String comment,
java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to record application level flow audit events
|
void |
send(Party otherParty,
java.lang.Object payload)
Deprecated.
|
<R> UntrustworthyData<R> |
sendAndReceive(java.lang.Class<R> receiveType,
Party otherParty,
java.lang.Object payload)
Deprecated.
|
static void |
sleep(java.time.Duration duration,
boolean maySkipCheckpoint)
If on a flow, suspends the flow and only wakes it up after at least duration time has passed. Otherwise,
just sleep for duration. This sleep function is not designed to aid scheduling, for which you should
consider using
interface SchedulableState . It is designed to aid with managing contention
for which you have not managed via another means. |
static void |
sleep(java.time.Duration duration)
If on a flow, suspends the flow and only wakes it up after at least duration time has passed. Otherwise,
just sleep for duration. This sleep function is not designed to aid scheduling, for which you should
consider using
interface SchedulableState . It is designed to aid with managing contention
for which you have not managed via another means. |
<R> R |
subFlow(FlowLogic<? extends R> subLogic)
Invokes the given subflow. This function returns once the subflow completes successfully with the result
returned by that subflow's
FlowLogic.call method. If the subflow has a progress tracker, it is attached to the
current step in this flow's progress tracker. |
DataFeed<java.lang.String,java.lang.String> |
track()
Returns a pair of the current progress step, as a string, and an observable of stringified changes to the
FlowLogic.getProgressTracker . |
DataFeed<java.util.List,java.util.List> |
trackStepsTree()
Returns a pair of the current steps tree of current
FlowLogic.getProgressTracker as pairs of zero-based depth and stringified step
label and observable of upcoming changes to the structure. |
DataFeed<java.lang.Integer,java.lang.Integer> |
trackStepsTreeIndex()
Returns a pair of the current progress step index (as integer) in steps tree of current
FlowLogic.getProgressTracker , and an observable
of its upcoming changes. |
SignedTransaction |
waitForLedgerCommit(SecureHash hash,
boolean maySkipCheckpoint)
Suspends the flow until the transaction with the specified ID is received, successfully verified and
sent to the vault for processing. Note that this call suspends until the transaction is considered
valid by the local node, but that doesn't imply the vault will consider it relevant.
|
SignedTransaction |
waitForLedgerCommit(SecureHash hash)
Suspends the flow until the transaction with the specified ID is received, successfully verified and
sent to the vault for processing. Note that this call suspends until the transaction is considered
valid by the local node, but that doesn't imply the vault will consider it relevant.
|
void |
waitForStateConsumption(java.util.Set<net.corda.core.contracts.StateRef> stateRefs)
Suspends the current flow until all the provided
class StateRef s have been consumed. |
public static FlowLogic.Companion Companion
public FlowLogic()
A sub-class of class FlowLogic
implements a flow using direct, straight line blocking code. Thus you
can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after
a node crash, how many instances of your flow there are running and so on.
Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using
the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should
request it just-in-time via the FlowLogic.getServiceHub
property which is provided. Don't try and keep data you got from a
service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from
underneath you, for instance, if the node is restarted or reconfigured!
Additionally, be aware of what data you pin either via the stack or in your class FlowLogic
implementation. Very large
objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.
If you'd like to use another FlowLogic class as a component of your own, construct it on the fly and then pass
it to the FlowLogic.subFlow
method. It will return the result of that flow when it completes.
If your flow (whether it's a top-level flow or a subflow) is supposed to initiate a session with the counterparty
and request they start their counterpart flow, then make sure it's annotated with interface InitiatingFlow
. This annotation
also has a version property to allow you to version your flow and enables a node to restrict support for the flow to
that particular version.
Functions that suspend the flow (including all functions on class FlowSession
) accept a maySkipCheckpoint parameter
defaulting to false, false meaning a checkpoint should always be created on suspend. This parameter may be set to
true which allows the implementation to potentially optimise away the checkpoint, saving a roundtrip to the database.
This option however comes with a big warning sign: Setting the parameter to true requires the flow's code to be replayable from the previous checkpoint (or start of flow) up until the next checkpoint (or end of flow) in order to prepare for hard failures. As suspending functions always commit the flow's database transaction regardless of this parameter the flow must be prepared for scenarios where a previous running of the flow already committed itsrelevant database transactions. Only set this option to true if you know what you're doing.
public org.slf4j.Logger getLogger()
This is where you should log things to.
public StateMachineRunId getRunId()
Returns a wrapped java.util.UUID object that identifies this state machine run (i.e. subflows have the same identifier as their parents).
public ServiceHub getServiceHub()
Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is
only available once the flow has started, which means it cannot be accessed in the constructor. Either
access this lazily or from inside FlowLogic.call
.
FlowLogic.call
public FlowSession initiateFlow(Destination destination)
Creates a communication session with destination. Subsequently you may send/receive using this session object. How the messaging
is routed depends on the interface Destination
type, including whether this call does any initial communication.
interface Destination
public FlowSession initiateFlow(Party party)
Creates a communication session with party. Subsequently you may send/receive using this session object. Note that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
public PartyAndCertificate getOurIdentityAndCert()
Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that belong to this node.
public Party getOurIdentity()
Specifies the identity to use for this flow. This will be one of the multiple identities that belong to this node.
This is the same as calling ourIdentityAndCert.party
.
NodeInfo.getLegalIdentities
public FlowInfo getFlowInfo(Party otherParty)
Returns a class FlowInfo
object describing the flow otherParty is using. With FlowInfo.flowVersion it
provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
This method can be called before any send or receive has been done with otherParty. In such a case this will force them to start their flow.
class FlowInfo
public <R> UntrustworthyData<R> sendAndReceive(java.lang.Class<R> receiveType, Party otherParty, java.lang.Object payload)
Serializes and queues the given payload object for sending to the otherParty. Suspends until a response is received, which must be of the given receiveType. Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
use this when you expect to do a message swap than do use FlowLogic.send
and then receive in turn.
class UntrustworthyData
wrapper around the received object.FlowLogic.send
public <R> UntrustworthyData<R> receive(java.lang.Class<R> receiveType, Party otherParty)
Suspends until the specified otherParty sends us a message of type receiveType.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
class UntrustworthyData
wrapper around the received object.public void send(Party otherParty, java.lang.Object payload)
Queues the given payload for sending to the otherParty and continues without suspending.
Note that the other party may receive the message at some arbitrary later point or not at all: if otherParty is offline then message delivery will be retried until it comes back or until the message is older than the network's event horizon time.
public java.util.Map<net.corda.core.flows.FlowSession,net.corda.core.utilities.UntrustworthyData> receiveAllMap(java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Class<? extends java.lang.Object>> sessions, boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions.
Consider receiveAllreceiveTypeClass,sessionsListListUntrustworthyData when the same type is expected from all sessions.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
public java.util.Map<net.corda.core.flows.FlowSession,net.corda.core.utilities.UntrustworthyData> receiveAllMap(java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Class<? extends java.lang.Object>> sessions)
Suspends until a message has been received for each session in the specified sessions.
Consider receiveAllreceiveTypeClass,sessionsListListUntrustworthyData when the same type is expected from all sessions.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
public <R> java.util.List<net.corda.core.utilities.UntrustworthyData> receiveAll(java.lang.Class<R> receiveType, java.util.List<? extends net.corda.core.flows.FlowSession> sessions, boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions.
Consider sessionsMapFlowSession,ClassMapFlowSession,UntrustworthyData when sessions are expected to receive different types.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
public <R> java.util.List<net.corda.core.utilities.UntrustworthyData> receiveAll(java.lang.Class<R> receiveType, java.util.List<? extends net.corda.core.flows.FlowSession> sessions)
Suspends until a message has been received for each session in the specified sessions.
Consider sessionsMapFlowSession,ClassMapFlowSession,UntrustworthyData when sessions are expected to receive different types.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
public <R> R subFlow(FlowLogic<? extends R> subLogic)
Invokes the given subflow. This function returns once the subflow completes successfully with the result
returned by that subflow's FlowLogic.call
method. If the subflow has a progress tracker, it is attached to the
current step in this flow's progress tracker.
If the subflow is not an initiating flow (i.e. not annotated with interface InitiatingFlow
) then it will continue to use
the existing sessions this flow has created with its counterparties. This allows for subflows which can act as
building blocks for other flows, for example removing the boilerplate of common sequences of sends and receives.
class FlowLogic
s it communicated with. The subflow can be retried by catching this exception.FlowLogic.call
,
interface InitiatingFlow
public void checkFlowPermission(java.lang.String permissionName, java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to ensure that the active FlowInitiator is authorised for a particular action. This provides fine grained control over application level permissions, when RPC control over starting the flow is insufficient, or the permission is runtime dependent upon the choices made inside long lived flow code. For example some users may have restricted limits on how much cash they can transfer, or whether they can change certain fields. An audit event is always recorded whenever this method is used. If the permission is not granted for the FlowInitiator a FlowException is thrown.
permissionName
- is a string representing the desired permission. Each flow is given a distinct namespace for these permissions.extraAuditData
- in the audit log for this permission check these extra key value pairs will be recorded.public void recordAuditEvent(java.lang.String eventType, java.lang.String comment, java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to record application level flow audit events
eventType
- is a string representing the type of event. Each flow is given a distinct namespace for these names.comment
- a general human readable summary of the event.extraAuditData
- in the audit log for this permission check these extra key value pairs will be recorded.public ProgressTracker getProgressTracker()
Override this to provide a class ProgressTracker
. If one is provided and stepped, the framework will do something
helpful with the progress reports e.g record to the audit service. If this flow is invoked as a subflow of another,
then the tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track
progress.
Note that this has to return a tracker before the flow is invoked. You can't change your mind half way through.
class ProgressTracker
public T call()
This is where you fill out your business logic.
public DataFeed<java.lang.String,java.lang.String> track()
Returns a pair of the current progress step, as a string, and an observable of stringified changes to the
FlowLogic.getProgressTracker
.
FlowLogic.getProgressTracker
public DataFeed<java.lang.Integer,java.lang.Integer> trackStepsTreeIndex()
Returns a pair of the current progress step index (as integer) in steps tree of current FlowLogic.getProgressTracker
, and an observable
of its upcoming changes.
FlowLogic.getProgressTracker
public DataFeed<java.util.List,java.util.List> trackStepsTree()
Returns a pair of the current steps tree of current FlowLogic.getProgressTracker
as pairs of zero-based depth and stringified step
label and observable of upcoming changes to the structure.
FlowLogic.getProgressTracker
public SignedTransaction waitForLedgerCommit(SecureHash hash, boolean maySkipCheckpoint)
Suspends the flow until the transaction with the specified ID is received, successfully verified and sent to the vault for processing. Note that this call suspends until the transaction is considered valid by the local node, but that doesn't imply the vault will consider it relevant.
public SignedTransaction waitForLedgerCommit(SecureHash hash)
Suspends the flow until the transaction with the specified ID is received, successfully verified and sent to the vault for processing. Note that this call suspends until the transaction is considered valid by the local node, but that doesn't imply the vault will consider it relevant.
public void waitForStateConsumption(java.util.Set<net.corda.core.contracts.StateRef> stateRefs)
Suspends the current flow until all the provided class StateRef
s have been consumed.
WARNING! Remember that the flow which uses this async operation will NOT wake-up until all the supplied StateRefs have been consumed. If the node isn't aware of the supplied StateRefs or if the StateRefs are never consumed, then the calling flow will remain suspended FOREVER!!
stateRefs
- the StateRefs which will be consumed in the future.class StateRef
public FlowStackSnapshot flowStackSnapshot()
Returns a shallow copy of the Quasar stack frames at the time of call to FlowLogic.flowStackSnapshot
. Use this to inspect
what objects would be serialised at the time of call to a suspending action (e.g. send/receive).
Note: This logic is only available during tests and is not meant to be used during the production deployment.
Therefore the default implementation does nothing.
FlowLogic.flowStackSnapshot
public void persistFlowStackSnapshot()
Persists a shallow copy of the Quasar stack frames at the time of call to FlowLogic.persistFlowStackSnapshot
.
Use this to track the monitor evolution of the quasar stack values during the flow execution.
The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.
Note: With respect to the FlowLogic.flowStackSnapshot
, the snapshot being persisted by this method is partial,
meaning that only flow relevant traces and local variables are persisted.
Also, this logic is only available during tests and is not meant to be used during the production deployment.
Therefore the default implementation does nothing.
public static FlowLogic<?> getCurrentTopLevel()
Return the outermost class FlowLogic
instance, or null if not in a flow.
class FlowLogic
public static void sleep(java.time.Duration duration, boolean maySkipCheckpoint)
If on a flow, suspends the flow and only wakes it up after at least duration time has passed. Otherwise,
just sleep for duration. This sleep function is not designed to aid scheduling, for which you should
consider using interface SchedulableState
. It is designed to aid with managing contention
for which you have not managed via another means.
Warning: long sleeps and in general long running flows are highly discouraged, as there is currently no support for flow migration! This method will throw an exception if you attempt to sleep for longer than 5 minutes.
interface SchedulableState
public static void sleep(java.time.Duration duration)
If on a flow, suspends the flow and only wakes it up after at least duration time has passed. Otherwise,
just sleep for duration. This sleep function is not designed to aid scheduling, for which you should
consider using interface SchedulableState
. It is designed to aid with managing contention
for which you have not managed via another means.
Warning: long sleeps and in general long running flows are highly discouraged, as there is currently no support for flow migration! This method will throw an exception if you attempt to sleep for longer than 5 minutes.
interface SchedulableState