4. Livesync

Livesync is an Indico plugin that allows information to be exported to other systems in a regular basis, and to keep track of what has been exported. It relies on “agents”, basically interfaces that convert Indico metadata into some format that can be read by the target system, and negociate the protocol for data delivery.

4.1. The Multipointer Track

The basic data structure used in livesync is the MultiPointerTrack (MPT). It keeps all the information concerning changes that have been done to the data storage, and the curent status of each agent.

class indico.ext.livesync.struct.MultiPointerTrack(elemContainer)

Bases: persistent.Persistent

A MultiPointerTrack is a kind of structure that is based on an IOBTree, where each entry contains an ordered set (or list, depending on the implementation) of elements. Then, several “pointers” can be created, which point to different positions of the track (very much like runners in a race track). This class is abstract, implementations should be derived.

add(intTS, value)

Adds a value to the container corresponding to a specific timestamp

addPointer(pid, startPos=None)

Registers a new pointer

getCurrentPosition(pid)

Returns the current entry (set/list) for a given pointer id

getPointerTimestamp(pid)

Gets the current ‘position’ of a pointer (id)

iterate(fromPos=None, till=None, func=<function <lambda> at 0x3d50488>)

Generator that iterates through the data structure

mostRecentTS(maximum=None)

Returns most recent timestamp in track (minimum key) If ‘maximum’ is provided, return it if less recent

movePointer(pid, pos)

Moves a given pointer (id) to a given timestamp

oldestTS()

Returns least recent timestamp in track (maximum key)

pointerIterItems(pid, till=None)

Iterates over the positions that are left (till the end of the track) for a given pointer (id) - iterates over key-value pairs (iteritems)

pointerIterValues(pid, till=None)

Iterates over the positions that are left (till the end of the track) for a given pointer (id) - iterates over values

prepareEntry(ts)

Creates an empty sub-structure (elemContainer) for a given timestamp

removePointer(pid)

Removes a pointer from the list

values(*args)

Return values or ranges (timestamps) of the structure

class indico.ext.livesync.struct.SetMultiPointerTrack

Bases: indico.ext.livesync.struct.MultiPointerTrack

OOSet-based MultiPointerTrack implementation. As OOSets are ordered, order is not lost. Order will depend on the __cmp__ method implemented by the contained objects.

4.2. SyncManager

SyncManager is basically a container for agents, and provides an interface for both agent management/querying and basic MPT manipulation.

class indico.ext.livesync.agent.SyncManager(granularity=100)

Bases: persistent.Persistent

Stores live sync configuration parameters and “agents”. It is basically an “Agent Manager”

Parameters:granularity – integer, number of seconds per MPT entry
add(timestamp, action)

Adds a specific action to the specified timestamp

advance(agentId, newLastTS)

Advances the agent “pointer” to the specified timestamp

getAllAgents()

Returns the agent dictionary

classmethod getDBInstance()

Returns the instance of SyncManager currently in the DB

getGranularity()

Returns the granularity that is set for the MPT

getTrack()

Rerturns the MPT

objectExcluded(obj)

Decides whether a particular object should be ignored or not

query(agentId=None, till=None)

Queries the agent for a given timespan

registerNewAgent(agent)

Registers the agent, placing it in a mapping structure

removeAgent(agent)

Removes an agent

reset(agentsOnly=False, trackOnly=False)

Resets database structures

Warning

This erases any agents and contents in the MPT

4.3. Agents

So far, PushSyncAgent is the only available agent type.

class indico.ext.livesync.agent.SyncAgent(aid, name, description, updateTime, access=None)

Bases: indico.util.fossilize.Fossilizable, persistent.Persistent

Represents an “agent” (service)

record_str((obj, objId, status))

Translates the objects/states to an easy to read textual representation

class indico.ext.livesync.agent.PushSyncAgent(aid, name, description, updateTime, access=None)

Bases: indico.ext.livesync.agent.SyncAgent

PushSyncAgents are agents that actively send data to remote services, instead of waiting to be queried.

Parameters:
  • aid – agent ID
  • name – agent name
  • description – a description of the agent
  • access – an Indico user/group that has equivalent access
_generateRecords(data, lastTS, dbi=None)
Parameters:
  • data – iterable containing data to be converted
  • lastTS

Takes the raw data (i.e. “event created”, etc) and transforms it into a sequence of ‘record/action’ pairs.

Basically, this function reduces actions to remove server “commands”

i.e. modified 1234, deleted 1234 becomes just delete 1234

Overloaded by agents

_run(data, logger=None, monitor=None, dbi=None, task=None)

Overloaded - will contain the specific agent code

acknowledge()

Called to signal that the information has been correctly processed (usually called by periodic task)

run(currentTS, logger=None, monitor=None, dbi=None, task=None)

Main method, called when agent needs to be run

4.4. CLI

There is a Command-line interface available for livesync. It can be easily invoked:

jdoe $ indico_livesync
usage: indico_livesync [-h] {destroy,list,agent} ...
indico_livesync: error: too few arguments
jdoe $

4.4.1. Listing the MPT

It is easy to obtain a listing of what is currently stored in the MPT:

jdoe $ indico_livesync list
12970820 <ActionWrapper@0x8df65ac (<MaKaC.conference.Contribution object at 0x8df65ec>) [data_changed,title_changed] 1297082086>
12970819 <ActionWrapper@0x8df662c (<MaKaC.conference.Contribution object at 0x8df65ec>) [data_changed,title_changed] 1297081920>
12970815 <ActionWrapper@0x8df666c (<MaKaC.conference.Contribution object at 0x8df65ec>) [data_changed] 1297081537>
12970815 <ActionWrapper@0x8df66ac (<Conference 100994@0x8df67ac>) [data_changed] 1297081528>
12970815 <ActionWrapper@0x8df66ec (<MaKaC.conference.Contribution object at 0x8df65ec>) [data_changed,created] 1297081528>
12970815 <ActionWrapper@0x8df672c (<MaKaC.conference.Category object at 0x8e48dac>) [data_changed] 1297081517>
12970815 <ActionWrapper@0x8df676c (<Conference 100994@0x8df67ac>) [data_changed,title_changed,created] 1297081517>

A query interval can also be specified (]from, to]):

jdoe $ indico_livesync list --from 12970816 --to 12970819
12970819 <ActionWrapper@0x8db65ac (<MaKaC.conference.Contribution object at 0x8db65ec>) [data_changed,title_changed] 1297081920>
jdoe $

4.5. Development Tools

In order to make life easier for plugin developers, we have included a few tools that make things simpler:

class indico.ext.livesync.agent.RecordUploader(logger, agent, batchSize=1000)

Bases: object

Encapsulates record uploading behavior.

_uploadBatch(batch)
Parameters:batch – list of records

To be overloaded by uploaders. Does the actual upload.

iterateOver(iterator, dbi=None)

Consumes an iterator, uploading the records that are returned dbi can be passed, so that the cache is cleared once in a while

4.6. Internals

Here are included the listeners and other components that are part of the livesync plugin type.

class indico.ext.livesync.components.ObjectChangeListener

Bases: indico.core.extpoint.base.Component

This component listens for events and directs them to the MPT. Implements IAccessControlListener, IObjectLifeCycleListener and IMetadataChangeListener

class indico.ext.livesync.components.RequestListener

Bases: indico.core.extpoint.base.Component

This component manages the ContextManager area that stores livesync actions

class indico.ext.livesync.components.RequestListenerContext

Bases: object

This class is mainly useful for testing or CLI scripting

class indico.ext.livesync.tasks.LiveSyncUpdateTask(frequency, **kwargs)

Bases: indico.modules.scheduler.tasks.periodic.PeriodicTask

A task that periodically checks which sources need to be “pushed”

Parameters:frequency – a valid dateutil frequency specifier (DAILY, HOURLY, etc...)