org.openadaptor.adaptor.transporter
Class AbstractTransporterSink

java.lang.Object
  extended by org.openadaptor.adaptor.SimpleComponent
      extended by org.openadaptor.adaptor.AbstractWriter
          extended by org.openadaptor.adaptor.AbstractSimpleSink
              extended by org.openadaptor.adaptor.transporter.AbstractTransporterSink
All Implemented Interfaces:
CallbackProvider, Component, Configurable, IbafTransactionalResource, Sink, DOMessageReaderDelegate
Direct Known Subclasses:
JMSSink

public class AbstractTransporterSink
extends AbstractSimpleSink

AbstractTransporterSink supports Snks with multiple transporters. For example, JMSSink can support publication on multiple Topics or Queues.

Much of the work is delegated to a transporter manager which responsible for managing the transporters.

If the transporter manager is null, then AbstractTransporterSink behaves similary to AbstractSimpleSink, except that default txnXXX() methods are implemented to throw an exception rather than doing nothing.

Author:
Colin Prosser

Nested Class Summary
 
Nested classes/interfaces inherited from class org.openadaptor.adaptor.SimpleComponent
SimpleComponent.StateHolder
 
Field Summary
(package private) static org.apache.log4j.Logger log
           
 
Fields inherited from class org.openadaptor.adaptor.AbstractSimpleSink
_continue_on_exception
 
Fields inherited from class org.openadaptor.adaptor.AbstractWriter
_packetName, _packetSize, _record_delimiter, _string_writer, _writeBatchAsRecord, _writer, PACKET_NAME, PACKET_SIZE
 
Fields inherited from class org.openadaptor.adaptor.SimpleComponent
_controller, _formatter, _propsPrefix, _readerDelegate, _textEncoding, MESSAGE_WRITER_PREFIX, MESSAGEID_ATTR, TEXT_ENCODING_ATTR
 
Constructor Summary
AbstractTransporterSink()
           
 
Method Summary
 void cleanUp()
          Closes all conncectors being managed.
 java.lang.String customControl(DataObject[] customArgs)
          Handle IBAF transporter control messages.
 Controller getController()
          Returns the controller.
 java.lang.String getMessageType()
          Returns the current message type - Text or Object.
 AdaptorTransporterManager getTransporterManager()
          Returns the transporter manager delegate.
 void init(java.lang.String name, java.util.Properties properties, java.lang.String prefix, Controller controller)
          Multi-transporter sinks must implement init.
 void pause()
          Pauses all conncectors being managed.
 void processMessage(Message message)
          Sends Dealbus DataObject[] messages to a JMS Queue/Topic.
 void resume()
          Resumes all conncectors being managed.
 void setFromProperties(java.util.Properties properties, java.lang.String[] prefixList)
          Sets parameters from a property list.
 void setMessageType(java.lang.String messageType)
          Set the message type - valid values are Text or Object.
 void setTransporterManager(AdaptorTransporterManager manager)
          Sets the transporter manager delegate.
 boolean sinkHasTransactionRecovery(java.lang.String subject, IbafException ibe)
          Called if a commit or rollback fails to determine whether a recovery attempt should be allowed.
 void startUp()
          startUp closes any existing transporters, re-establishes fresh connections and starts the associated Transporters.
 void terminate()
          Stops all conncectors being managed.
 void txnBegin()
          Called by a controller when a txnBegin is requested by the Source of a message.
 void txnCommit()
          Called by a controller when a txnCommit is requested by the Source of a message.
 void txnRollback()
          Called by a controller when a txnRollback is requested by the Source of a message.
 
Methods inherited from class org.openadaptor.adaptor.AbstractSimpleSink
processHospitalException, writerHospitalException
 
Methods inherited from class org.openadaptor.adaptor.AbstractWriter
getDOStringWriter, getRecordDelimiter, getWriter, setDOStringWriter, setRecordDelimiter, setWriter, writeDataObjects, writeMessage, writerCleanUp, writeRecord, writerStartUp
 
Methods inherited from class org.openadaptor.adaptor.SimpleComponent
addCallback, canUpdateWhileRunning, didReceiveMessage, getAsProperties, getCallbackManager, getCurrentState, getCustomControlProperties, getLastUID, getName, getProperty, getProperty, getPropsPrefix, getRequestedState, getSecurityManager, getStatus, getTextEncoding, getXMLFormatter, notifyEvent, removeCallback, setCallbackManager, setCallbackManager, setCurrentState, setFromProperties, setFromResource, setName, setPropsPrefix, setRequestedState, txnCommitWithExceptions, txnRollback, waitForRequestedStateChange
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface org.openadaptor.adaptor.Component
getName, getPropsPrefix, getRequestedState, getStatus, setName, setPropsPrefix
 
Methods inherited from interface org.openadaptor.adaptor.IbafTransactionalResource
txnCommitWithExceptions, txnRollback
 
Methods inherited from interface org.openadaptor.adaptor.Configurable
canUpdateWhileRunning, getAsProperties, setFromProperties
 
Methods inherited from interface org.openadaptor.adaptor.CallbackProvider
addCallback, getCallbackManager, notifyEvent, removeCallback, setCallbackManager, setCallbackManager
 

Field Detail

log

static org.apache.log4j.Logger log
Constructor Detail

AbstractTransporterSink

public AbstractTransporterSink()
Method Detail

setFromProperties

public void setFromProperties(java.util.Properties properties,
                              java.lang.String[] prefixList)
                       throws IbafException
Sets parameters from a property list.

The parameters supported are:

 MessageType
 TransporterManagerClassName
 

Equivalent to calling the set method for each of the supported parameters.

An implementation can add support for further parameters.

Only properties matching supported parameters are set.

Specified by:
setFromProperties in interface Configurable
Overrides:
setFromProperties in class SimpleComponent
Parameters:
properties - The properties list. Can be a SuperProperties object.
prefixList - If non-null, property names are looked up with prefix from prefixList prepended. The prefixList is used in index order until a match is found. For properties requiring a strict match, omly propertyList[0] is checked.

Throws:
IbafException - If problem setting any properties.

init

public void init(java.lang.String name,
                 java.util.Properties properties,
                 java.lang.String prefix,
                 Controller controller)
          throws IbafException
Multi-transporter sinks must implement init. Call super.init(...), and read adaptor properties.

The following properties are supported:

 xxx.MessageType If a valid DOStringWriter has been specified, then
                 messages are sent as text strings otherwise they are sent
                 as Objects. If a DOStringWriter has not been
                 set, and you set MessageType to Text, then messages
                 are sent as text strings using a built-in DataObject
                 XML formatter. Valid values are Text or Object.
 xxx.TransporterManagerClassName
                 If set, use the specified class as the transporter mamager.
                 A concrete implementation of a TransporterSink may ovveride the TransporterManagerClass
                 property and use a fixed AdaptorTransporterManager class.
 

The following properties from AbstractWriter are also available when processing text messages

 DOStringWriter       Writing of record formats is delegated to the specified DOStringWriter class.
                      [default is built-in DO XML writer]
 MessageWriterHook    An XMLFormatter DOMessageWriterDelegate class.
 RecordDelimiter                String that will be written after each message.
 

Other property values are passed to the associated transporter manager and the transporters it manages.

Specified by:
init in interface Component
Overrides:
init in class AbstractSimpleSink
Parameters:
name - The name to be given to the component
properties - Properties object
prefix - Prefix string to search for properties
controller - The component's controller object
Throws:
IbafException - If initialization fails.
See Also:
Controller, SimpleController

getController

public Controller getController()
Returns the controller.


getTransporterManager

public AdaptorTransporterManager getTransporterManager()
Returns the transporter manager delegate.


setTransporterManager

public void setTransporterManager(AdaptorTransporterManager manager)
                           throws IbafException
Sets the transporter manager delegate.

Throws:
IbafException

getMessageType

public java.lang.String getMessageType()
Returns the current message type - Text or Object.


setMessageType

public void setMessageType(java.lang.String messageType)
                    throws IbafException
Set the message type - valid values are Text or Object.

If you set MessageType to Text, then messages are sent as text strings using using any specified DOStringWriter or if no DOStringWriter is set, using a built-in DataObject XML formatter.

Throws:
IbafException

startUp

public void startUp()
             throws IbafException
startUp closes any existing transporters, re-establishes fresh connections and starts the associated Transporters.

Throws:
IbafException - if problem reconnecting or starting any transporter

processMessage

public void processMessage(Message message)
                    throws PipelineException
Sends Dealbus DataObject[] messages to a JMS Queue/Topic.

Specified by:
processMessage in interface Sink
Specified by:
processMessage in class AbstractSimpleSink
Throws:
PipelineException - If message processing fails

cleanUp

public void cleanUp()
             throws IbafException
Closes all conncectors being managed.

Specified by:
cleanUp in interface Component
Overrides:
cleanUp in class AbstractSimpleSink
Throws:
IbafException - if error closing any transporters

terminate

public void terminate()
               throws IbafException
Stops all conncectors being managed.

Specified by:
terminate in interface Component
Overrides:
terminate in class SimpleComponent
Throws:
IbafException - if error stopping any transporters
See Also:
Controller.terminate()

pause

public void pause()
           throws IbafException
Pauses all conncectors being managed.

Specified by:
pause in interface Component
Overrides:
pause in class SimpleComponent
Throws:
IbafException - if error pausing any transporters
See Also:
Controller.pause()

resume

public void resume()
            throws IbafException
Resumes all conncectors being managed.

Specified by:
resume in interface Component
Overrides:
resume in class SimpleComponent
Throws:
IbafException - if error resuming any transporters
See Also:
Controller.resume()

txnBegin

public void txnBegin()
              throws IbafException
Called by a controller when a txnBegin is requested by the Source of a message.

Specified by:
txnBegin in interface IbafTransactionalResource
Overrides:
txnBegin in class SimpleComponent
Throws:
IbafException - If fails to begin the transaction
See Also:
Controller.txnBegin(Source)

txnRollback

public void txnRollback()
                 throws IbafException
Called by a controller when a txnRollback is requested by the Source of a message.

Specified by:
txnRollback in interface IbafTransactionalResource
Overrides:
txnRollback in class SimpleComponent
Throws:
IbafException - If fails to rollback the transaction
See Also:
Controller.txnRollback(Source)

txnCommit

public void txnCommit()
               throws IbafException
Called by a controller when a txnCommit is requested by the Source of a message.

Specified by:
txnCommit in interface IbafTransactionalResource
Overrides:
txnCommit in class SimpleComponent
Throws:
IbafException - If fails to commit the transaction
See Also:
Controller.txnCommit(Source)

sinkHasTransactionRecovery

public boolean sinkHasTransactionRecovery(java.lang.String subject,
                                          IbafException ibe)
Called if a commit or rollback fails to determine whether a recovery attempt should be allowed. Not yet supported for Sinks.

Note that this method is called after txnCommit() or txnRollback() failed for this Source. Since the failure could be due to network outage or server outage, determining the necessary status should not involve a query to the server.

Parameters:
subject - For sources supporting multiple subject subscriptions to indicate which subject requires recovery.
ibe - IbafException raised by failing transaction method

customControl

public java.lang.String customControl(DataObject[] customArgs)
                               throws IbafException
Handle IBAF transporter control messages.

Supported transporter control messages allow dynamic control over transporters managed by a AdaptorTransporterManager.

Handling of transporter control messages is enabled by setting the

  xxx.Controller.RemoteControl.ClassName
 

in the properties file. For example, set

   xxx.Controller.RemoteControl.ClassName = org.openadaptor.adaptor.standard.HTTPRemoteControl
 

and then use a url like the following

   http://localhost/?name=xxx.Controller&method=transporterControl&arg1=ComponentName&arg2=TransporterID&arg3=TransporterControlMethodName&arg4=TransporterControlMethodParam1&passwordPassword
 

where:

  localhost is the name of the host running the HTTP remote controller (assumes port 80),
  name is the controller name (here xxx.Controller)
  method is set to transporterControl
  password is set to the remote control pasword value specified in the properties file
  arg1 is the name of the component (here ComponentName, but set to match name in properties file)
  arg2 is the id (or subject, or alias) of the transporter
  arg3 is the specific transporter control method; one of
          pause   - temporarily pause an existing transporter subscribing/publishing to a subject
          resume  - resume an existing transporter subscribing/publishing to a subject
          stop    - stop an existing transporter subscribing/publishing to a subject (e.g. so that parameters can be updated)
          start   - start, or re-start, an existing transporter subscribing/publishing to a subject;
                    if the transporter is already running it is stopped before re-staring it
          remove  - stop an existing transporter subscribing/publishing to a subject and delete the transporter
          add     - add a new transporter, but do not start, subscribing/publishing to the specified subject
          set     - set transporter parameters
  arg4 .. argN are set to any required arguments for the transporter control method (curently add or set)
               These (i.e. each TransporterControlMethodParam) should be a property string in the form name=value.
               This supports a varable order and number of parameters.
 

Result string is returned to invoker, so send meaningful diagnostics on error.

Note: This method is designed to be called via the IBAF remote control interface. A remote contoller can enquire the supported properties and current values for a transporter using the getAsProperties() method. It can therefore dynamically display transporter properties and values, and return them in a matching format

Specified by:
customControl in interface Component
Overrides:
customControl in class SimpleComponent
Parameters:
customArgs - DataObject[] array with one element in format defined by IBAF remote control interface.
Returns:
Control specific string on success. Otherwise a meaningful diagnostics message on error.
Throws:
IbafException - if the control request fails
See Also:
SimpleController