Using Neo4j.Driver? Now you can EXTEND it!

Some Code

Hot on the heels of Neo4jClient 4.0.0, I was doing some work with the Neo4j.Driver (the official client for Neo4j), and in doing so, I realised I was writing a lot of boiler plate code.

So I started adding extension methods to help me, and as my extension methods became more involved, I moved them to another project, and then… well… decided to release them!

TL;DR; you can get the Neo4j.Driver.Extensions package on Nuget, and the GitHub page is here.


The Problem

Let’s first look at the problem. Neo4j.Driver is quite verbose, you end up having lots of ‘magic strings’ throughout the codebase, which can lead to problems in runtime, and one of the reasons we’re using .NET is to try to avoid runtime errors when we can get compilation errors.

Let’s take a look at a ‘standard’ read query. Here we’re executing the following Cypher to get a movie with a given title.

MATCH (m:Movie)
WHERE m.title = $title
RETURN m

We MATCH a Movie based on it’s title and return it, easy. Code wise we have this:

public async Task<Movie> GetMovieByTitle(string title)
{
    var session = _driver.AsyncSession();
    var results = await session.ReadTransactionAsync(async tx =>
    {
        var query = "MATCH (m:Movie) WHERE m.title = $title RETURN m";
        var cursor = await tx.RunAsync(query, new {title});
        var fetched = await cursor.FetchAsync();

        while (fetched)
        {
            var node = cursor.Current["m"].As<INode>();
            var movie = new Movie
            {
                Title = node.Properties["title"].As<string>(),
                Tagline = node.Properties["tagline"].As<string>(),
                Released = node.Properties["released"].As<int?>()
            };
            return movie;
        }

        return null;
    });

    return results;
}

We’re using transactional functions to give us future proofing should we decide to connect to a Cluster, as the function will retry the query if the current cluster member we’re connected to takes the unfortunate decision to ‘move on’.

Let’s take a closer look at the creation of the Movie object:

var node = cursor.Current["m"].As<INode>();
var movie = new Movie
{
    Title = node.Properties["title"].As<string>(),
    Tagline = node.Properties["tagline"].As<string>(),
    Released = node.Properties["released"].As<int?>()
};
return movie;

In our first line, we pull the Current IRecord from the IResultCursor, by an identifier, and attempt to get it as an INode. This is ok, I know in my query that’s what I’ve asked for (RETURN m). Then I proceed to go through the properties in my Movie – assigning the properties from the node into the right place.

For the string properties (title and tagline) it’s just an As<string>() call – which works fine, as if the property isn’t there, we just get null anyway. The released property is more complex, to make the code simpler – I’ve used .As<int?>() (nullable int) as I happen to know the default movies database does have some nodes without the released property – as we’re schema-free here.

What I could have done would be:

var released = node.Properties["released"].As<int?>();
if(released.HasValue)
    Released = released.Value;
else { /* ?!?!? */ }

Which would make my Movie class slightly tighter – but I guess, if my data can have nodes without the property – then my models should too… 🙂

GetValue

Aaaaanyways. The first step in the extension methods was to create a ‘GetValue’ method:

var node = cursor.Current["m"].As<INode>();
var movie = new Movie
{
    Title = node.GetValue<string>("title"),
    Tagline = node.GetValue<string>("tagline"),
    Released = node.GetValue<int?>("released")
};
return movie;

This method will return default if the Properties property doesn’t contain a given key, otherwise it will try to call As<T> with the associated Exceptions (FormatException and InvalidCastException) that can take place.

I mean. If that was it, you’d be right to say that this was a waste of time. But, it does simplify the call a bit… onwards!

GetContent (NetStandard 2.1 only)

I’d like it to be a bit simpler, so let’s try to sort out the while loop. Remember we have this:

var cursor = await tx.RunAsync(query, new {title});
var fetched = await cursor.FetchAsync();

while (fetched)
{
    var node = cursor.Current["m"].As<INode>();
    /* object creation code here */
}

We FetchAsync from the cursor, then while that returns true we parse our Current into INode and then create our obj.

Instead of the Fetch/While loop, we can do this instead:

var cursor = await tx.RunAsync(query, new {title});
await foreach(var node in cursor.GetContent<INode>("m")) 
{
    /* object creation code here */
}

This removes the cursor.Current["m"].As<INode>() line, and simplifies the while into a pleasing foreach.

NB. This is NetStandard 2.1 as it uses the IAsyncEnumerable interface which isn’t in NetStandard 2.0

ToObject

OK, so we’re starting to look a bit better, some of the boiler plate is going, is there anything else we can do?

OF COURSE!

Instead of all the object creation, requiring you to go through the properties (and what if you add one later in development and forget to update this code!?!) – we can use ToObject

This works on an INode (and we’ll see later other things), and allows you to pass in a Type as a generic parameter, which will be parsed, and returned to you filled if possible:

var cursor = await tx.RunAsync(query, new {title});

await foreach (var node in cursor.GetContent<INode>("m")) 
    return node.ToObject<Movie>();

Neo4jProperty

We should probably pause here to talk about Neo4jPropertyAttribute so far, we’ve had the properties from Neo4j all Lowercase, but the observant of you will have noticed that the Movie class seems to have Upper camel case naming conventions as .NET typically does.

When we’re doing the GetValue approach – not such an issue – as we define the identifier ourselves (GetValue('title')). I think it’s probably pretty obvious that I’m going to be using Reflection here to work out what property to put where but OH NOES my properties are all Upper camel case, but the data is all Lower camel case. WHAT TO DO?

This is how a property is normally defined:

public string Title {get;set;}

Basic stuff. But we can add the Neo4jProperty attribute:

[Neo4jProperty(Name = "title")]
public string Title {get;set;}

And lo and behold, the properties will be reflected properly!
As an added extra – you can also tell the serialization process to Ignore a property if you want:

[Neo4jProperty(Ignore = true)]
public string Title {get;set;}

So the full Movie class looks like:

public class Movie
{
    [Neo4jProperty(Name = "title")]
    public string Title { get; set; }

    [Neo4jProperty(Name = "released")]
    public int? Released { get; set; }

    [Neo4jProperty(Name = "tagline")]
    public string Tagline { get; set; }
}

GetRecords with ToObject (NetStandard 2.1 only)

So – we’ve seen it works on INode, but, what if we want to return a different thing than just a node, what about, the properties? So change our Query to:

var query = "MATCH (m:Movie) WHERE m.title = $title RETURN m.title AS title, m.tagline AS tagline, m.released AS released"

Well. No worries! We’ve still got a working prospect:

await foreach (var record in cursor.GetRecords())
    return record.ToObject<Movie>();

Here, I’m using the GetRecords extension method to get each IRecord and attempt to cast it to a Movie. This works as the properties of Movie match the names of the aliases in the Cypher.

RunReadTransactionForObjects

In the previous examples, for simplification, I’ve not shown the code around the outside, but – if we take the most recent one (GetRecords example), it actually looks like this:

var query = "MATCH (m:Movie) WHERE m.title = $title RETURN m";
var session = _driver.AsyncSession();
var results = await session.ReadTransactionAsync(async x =>
{
    var cursor = await x.RunAsync(query, new {title});

    await foreach (var node in cursor.GetContent<INode>("m")) 
        return node.ToObject<Movie>();

    return null;
});

return results;

And we’ll do that for almost all the queries we’re going to run, so let’s look at how we can reduce that code as well…

var query = "MATCH (m:Movie) WHERE m.title = $title RETURN m";
var session = _driver.AsyncSession();

var movie = 
    (await session.RunReadTransactionForObjects<Movie>(query, new {title}, "m"))
    .Single();
return movie;

I’ve added some newlines to make it a bit more readable, but now, we take the session and call RunReadTransactionForObjects<T> on it, to return the results of the query as T (in this case Movie).

The RunReadTransactionForObjects<T> method is returning an IEnumerable<T> hence I can use .Single() (or indeed any of the LINQ methods).

Put it all together

There are other extension methods in there, and some which are no doubt missing (PR away!) but I’m quite pleased that I can get from this code:

public async Task<Movie> GetMovieByTitle(string title)
{
    var query = "MATCH (m:Movie) WHERE m.title = $title RETURN m";
    var session = _driver.AsyncSession();
    var results = await session.ReadTransactionAsync(async tx =>
    {        
        var cursor = await tx.RunAsync(query, new {title});
        var fetched = await cursor.FetchAsync();

        while (fetched)
        {
            var node = cursor.Current["m"].As<INode>();
            var movie = new Movie
            {
                Title = node.Properties["title"].As<string>(),
                Tagline = node.Properties["tagline"].As<string>(),
                Released = node.Properties["released"].As<int?>()
            };
            return movie;
        }

        return null;
    });

    return results;
}

To:

public async Task<Movie> GetMovieByTitle(string title)
{
    var query = "MATCH (m:Movie) WHERE m.title = $title RETURN m";
    var session = _driver.AsyncSession();

    var results = 
        (await session.RunReadTransactionForObjects<Movie>(query, new {title}, "m"))
        .Single();
    return results;
}

PHEW!

Long post eh?

If you got this far – well done! Please, try it, log bug reports (on GitHub– comments on here are easy to miss).

Neo4jClient 4.0

After what probably seems like ages to you (and indeed me) Neo4jClient 4.0 has finally left the pre-release stages and is now in a stable release.

Being a major version change, that means there are breaking changes, and you should be in the process of testing stuff before you just use it. Having said that, the changes which are there hopefully make sense, and will make it better in the long run.

It should be noted, that some of the things done are only applicable to Neo4j 4.x servers, and if you’re not using .NET Core, or not using Transactions – staying with the 3.2.x release of the client will be fine for now.

OK. Onto the changes, we have 4 broad categories, Breaking changes, General changes, Additions and Removals. I’ll try to demo as many of them as possible with code, but for the self-explanatory ones – well – I probably won’t 😮

Breaking

These are ones which will require you to do some code changes, how much will vary depending on your codebase.

Async Only

There are no ‘Sync’ methods anymore, decision wise – .NET code has been increasingly moving towards Async, and the client has always supported it. If there is enough clamour – I will look into re-adding the Sync wrappers, but at the moment, less-code = less to maintain.

No MSDTC (TransactionScope) Support

The 3.x versions of the client used TransactionScope to provide transaction support – which had a nice benefit of being able to support MSDTC. But, as I’m sure many know, it also prevented .NET Core support for transactions due to the availability of TransactionScope and the supporting classes in Core. With .NetStandard 2.0 TransactionScope was added, but the supporting classes not, and whilst now they are available in NetStandard 2.1 – I don’t want to push the minimum requirements for the Client to NetStandard 2.1. As a consequence, the decision has been made to remove support for TransactionScope.

Long term – the aim is to have an ITransactionManager that you can inject into the Client to allow you to provide a home rolled TransactionScope manager if you really want it. This doesn’t exist – and won’t for a while as because as far as I’m aware, there was only one company using it, and even they said they were moving away from it.

Neo4j Server 3.5+ only

Neo4j don’t support server versions lower than 3.5, and whilst the GraphClient should work with any of the 3.x servers (which support transactions), the BoltGraphClient will only work back to 3.5.

Personally – I’m not to worried about this, aside from Transactions all of the additions to the client wouldn’t work on older server versions anyhow, as they didn’t exist. Basically – if you use an older version of the server – use the older client!

Other

There are other changes that will come with the ‘Removals’ section below, but I thought I’d write about them there!

General Things

So some general changes here about the client, you might find it interesting, you might not :/

NetStandard 2.0

Thanks to a PR from tobymiller1 – (https://github.com/tobymiller1) the client is back to being just one project, and targets NetStandard 2.0

URI Schemes

The client now supports all the schemes Neo4j does, so neo4j, neo4j+s, neo4j+sc and the bolt equivalents.

Transactions

I wasn’t sure if this should be Addition or Removal or … so I’ve gone with ‘General’. Transactions needed to be changed, as I noted above, and as part of the ability to target Neo4j 4.x we needed to support Multi-tenancy – that’s multi-databases within one server.

Let’s have a look at some examples:

using(var tx = gc.BeginTransaction(TransactionScopeOption.Join, null, "neo4jclient"))
{
    await gc.Cypher.Create("(n:Node {Id:1})").ExecuteWithoutResultsAsync();
    var insideResults = await gc.Cypher.Match("(n:Node)").Return(n => n.As<Node>()).ResultsAsync;
    insideResults.Dump("In the Transcation");

    await tx.RollbackAsync();
}

var outsideResults = await gc.Cypher.Match("(n:Node)").Return(n => n.As<Node>()).ResultsAsync;
outsideResults.Dump("Out of the Transcation");

Which gives out:

Let’s take the code apart a bit.

using(var tx = gc.BeginTransaction(TransactionScopeOption.Join, null, "neo4jclient"))

Transactions are IDisposable – so you ideally should be using them with a using statement, but if not, remember to dispose()! In this version, you have to supply the TransactionScopeOption and a bookmark (the null) in this case, to be able to define the database ("neo4jclient"). That’ll probably be simplified later…

Next, we’re just executing Cypher, as we’ve done plenty of times before, and as I’m using LinqPad I can call .Dump() on any output to get the results (we saw in the picture). So I can access the stuff I’ve put in the database, within the transaction.

I then .RollbackAsync() the transaction, the effects of which can be seen by the second .Dump() I execute showing nothing in the database. Also note, with the second Results query – I have to provide a WithDatabase parameter, else I would be querying the default database.

You HAVE to .CommitAsync() a transaction for it to be committed, if I didn’t have the .RollbackAsync()) call, and just let the tx be Dispose()-ed by the using statements closing, it would automatically be rolled back. In the code above, the .RollbackAsync() call is redundant.

Write transactions are on one database only, an attempt to write to another database (using WithDatabase or Use) will result in a ClientException – nb. you can use Use to read from another database in a transaction.

Additions

New stuff for you to use! After each heading will be a list of the versions of Neo4j Server that the additions will work on.

DefaultDatabase [4.x]

Multi-tenancy brings a load of new things to the Neo4j world, but how can you use them from Neo4jClient?? First off let’s talk about DefaultDatabase. This is a property of the GraphClient/BoltGraphClient itself. By default it’s set to ‘neo4j’ (which is the default the server has), but you can set it to any other database, and every query will be against that one.

var gc = new GraphClient(...){DefaultDatabase = "neo4jclient"};

.WithDatabase() [4.x]

But what if you want to run just one of your queries (or more!) against another database? That’s where WithDatabase comes in. WithDatabase is a per query setting to choose the database a query will run on:

gc.Cypher.WithDatabase("neo4jclient").Match(...)

CreateDatabase (system) [4.x]

Want to create a database? Sure, you can do that! This needs to be executed against the system database, and you have to use the WithDatabase call do to that:

gc.Cypher.WithDatabase("system").CreateDatabase("neo4jclient", true).ExecuteWithoutResultsAsync();

Wait? What’s this true parameter? It’s the ifNotExists parameter which means that we will only create the database if it doesn’t exist.

StartDatabase (system) [4.x]

Well, now we’ve created our database, we need to start it:

gc.Cypher.WithDatabase("system").StartDatabase("neo4jclient").ExecuteWithoutResultsAsync();

If you run this on an already started database, it’ll do nothing!

StopDatabase (system) [4.x]

We’ve started it, now let’s stop it. No surprises here:

gc.Cypher.WithDatabase("system").StopDatabase("neo4jclient").ExecuteWithoutResultsAsync();

Again, as per the StartDatabase method, stopping a stopped database does nothing.

DropDatabase (system) [4.x]

Dropping a database is the quickest way to ‘truncate’ your entire database without resorting to stopping the server. As with CreateDatabase we have two parameters here, the first is the database name, the second is dumpData (default false). If dumpData is false then calling DropDatabase will delete all the data files, if it’s true the data will first be dumped to the location the server has specified (see the documenation for more information)

gc.Cypher.WithDatabase("system").DropDatabase("neo4jclient", true).ExecuteWithoutResultsAsync();

If you run this on a database that has already been dropped – you will get a FatalDiscoveryException thrown, if you want to avoid that, you need to use:

DropDatabaseIfExists (system) [4.x]

Pleasingly this is the same as DropDatabase just that you won’t get the exception if the database doesn’t exist. You have the same option to dumpData or not. It’s all up to you!

gc.Cypher.WithDatabase("system").DropDatabaseIfExists("neo4jclient", true).ExecuteWithoutResultsAsync();

Use [4.x]

Use() allows you to use a database within a query or query part. It means you can avoid having to use WithDatabase, or you can use it within a query that is using another database. Whaaaat?! Confusing?! Yes!

gc.Cypher.Use("neo4jclient").Match("(n)").Return(n => n.As<Node>())

Gives us:

USE neo4jclient
MATCH (n)
RETURN n

BUT you can’t do this for things like the system database calls above, you have to use the WithDatabase clause for that. Use is particularly Useful (ha!) for Fabric use cases.

WithQueryStats [3.5,4.x]

With 3.x when you executed a query, you couldn’t tell what that query had done, in fact, all you could know was that you executed a query (especially if it was a ‘non results’ one). Now you can use the .OperationCompleted event to get the stats of your query:

void OnGraphClientOnOperationCompleted(object o, OperationCompletedEventArgs e)
{        
    e.QueryStats.Dump();
}
gc.OperationCompleted += OnGraphClientOnOperationCompleted;
await gc.Cypher.WithQueryStats.Create("(n:Node {Id: 10, Db: 'neo4j'})").ExecuteWithoutResultsAsync();
gc.OperationCompleted -=OnGraphClientOnOperationCompleted;

Which will give you a QueryStats object that looks like:

Check that out! I added a new Label, 1 Node and 2 Properties!

This isn’t on by default, as it sends back more data over the wire, and if you don’t need it (which so far 100% of people haven’t) then it’s optional!

Neo4jIgnoreAttribute [3.5,4.x]

From a PR by @Clooney24) This will provide the ability to ignore properties for the BoltGraphClient as well as the GraphClient.

So, let’s have our class:

public class Node 
{
    public string Db {get;set;}
    public int Id {get;set;}

    [Neo4jIgnore]
    public string Ignored {get;set;}
}

We’ve defined the Ignored property with the [Neo4jIgnore] attribute, so now when we insert the data:

var node = new Node { 
        Id = 11, 
        Db = gc.DefaultDatabase, 
        Ignored = "You won't see this!" 
    };
await gc.Cypher.WithQueryStats.Create("(n:Node $newNode)").WithParam("newNode", node).ExecuteWithoutResultsAsync();

and then pull it back:

(await gc.Cypher.Use("neo4j").Match("(n)").Return(n => n.As<Node>()).ResultsAsync).Dump("Ignored");

We can see that Ignored is null:

And this isn’t just because we’re ignoring bringing it back, but it’s not in the database either:

Removals

These are all things that have been removed, largely, they were marked as [Obsolete] so you can’t say you weren’t warned! If you are using these, then you need to stay on a 3.x release of the client.

Start

This hasn’t been around since well, 3.0 I think, and has certainly deprecated for a long time. As it wouldn’t work in 3.5 onwards anyway I’m content to remove it.

Create(string, params object[]) 4/n

Again, marked as [Obsolete] and you should be using the alternative Create options instead.

Return<T>(string, CypherResultMode)

This was accidently made public, it was never intended to be, and as it has had at least 1 major version of Obsolete-ness, it’s gone.

StartBit

This pretty much comes with the Start code above.

Gremlin support

If you’re using Gremlin – there are no benefits to this version of the client, so stick with what you have. You should progress to Cypher if you can, it’s actively developed and is very closely linked to the new GQL standards that ISO have started to work on.

From a ‘career’ point of view, this means that learning Cypher is like learning GQL but with maybe a different accent, but not a different language. As GQL becomes a standard, other Graph databases will start to use it, and you’ll be ahead of the game.

Finally

Some other bits of tidying up!

URIs

You may (or may not) have noticed that the URI for the client has changed from: https://github.com/Readify/Neo4jClient to https://github.com/DotNet4Neo4j/Neo4jClient. This means that the project is now part of an Organisation (DotNet4Neo4j) which is focused on things that link Neo4j and .NET together.

Err.

I think that’s about it.

༼ つ ◕_◕ ༽つ

Neo4j 4.0 is around the corner and I want to use SSIS with it. Can I?

Yes!

That’s the TL;DR; out of the way 🙂 The slightly longer version is that you need to use the 1.4.0.0 version of the Neo4j SSIS Components.

This is the main change to the components for 1.4.0.0 – there are a couple of things tidied up and hopefully a smoother experience with the syntax highlighting in the Cypher editor screens.

If you want to get your hands on the components – please visit: http://bit.ly/neo4jssis register and you’ll be sent out the download link. Registration is only used to let you know of updates to the tools, no marketing!

Neo4j – SSIS – Connection Manager Love

I’ve not written about the SSIS components for a bit, but I have been working on them – adding things, improving things. One area that was always very basic was the Connection interface. Or lack thereof – indeed the interface was basically just the properties pane of Visual Studio – mmmm Functional.

A quick reminder

Now though, you can double click on the Connection Manager and you’ll get a better interface – one that makes it easier to read!

I’ve also added two new properties to help with connections, IPv6 and Use Encryption, which hopefully are pretty obvious as to what they mean.

This is a short post as there’s not much to add, those of you on the mailing list will have found out about these updates last year, and if you want to get your hands on the toolset – please visit: http://bit.ly/neo4jssis register and you’ll be sent out the download link. Registration is only used to let you know of updates to the tools, no marketing!

Neo4j & NiFi – Getting NiFi Running

Neo4j and Apache NiFi

Another day, another ETL tool, this time Apache NiFi which is described as:

An easy to use, powerful, and reliable system to process and distribute data.

I’ve used SSIS and Kettle in the past, so I figured I’d be able to get this bad boy running easy enough – I mean – it’s ‘easy to use’ right? That’s not a descriptive sentence often used to describe SSIS or Kettle, so should be safe.

Unfortunately – as with politics, bare faced lies are apparently acceptable in the software world as well. I’d probably replace ‘easy to use’ with ‘usable’ – as to the other keywords – powerful / reliable – I can’t confirm or deny the truth of these.

Obviously – my experience is very very small – so take it with a pinch of salt – what I will say is that the documentation is OK, but doesn’t really describe just what the ____ is going on.

Complaints over – let’s crack on.

Important information

This is using NiFi 1.10.0, there are no graph bundles for lower versions – and in this version they are not packaged by default.

Running NiFi

I tried 4 ways to run NiFi, initially – I went with running it off of my local machine – which – yes – runs Windows 10. But I also have Java installed, so saved me doing any extra leg work.

Windows 10 – Local

I downloaded the zip, unzipped to a folder – ran bin\run-nifi.bat – it started, showed me a lot of Java output – but that’s cool – I’m used to that. I connected to the URI – (http://localhost:8080/nifi) and – good news! It’s there.

Stop it (CTRL+C), restart – and it never ever succeeds in starting again. For why? I have no idea. I extracted it into a fresh directory and ran it – no dice, I rebooted, no dice, nothing I could do would sort it out – I had no instances of Java running ,netstat said the ports were not in use. Nada.

Windows Server 2019 – VM

On second thoughts – and with much frustration from the Local attempts – I decide that maybe, just maybe running this away from my work machine would make more sense, I have a Server 2019 ISO lying around – so – why not.

Long story shorter – same deal – though without the initial success – OK – so the obvious link here is Windows, let’s go Ubuntu.

Ubuntu 18.04 VM

Install. Run. Fail.

There’s a pattern. I followed the instructions to the letter – but no joy. At this point there are 2 options, but I wanted to be able to use my laptop later – so had to put the hammer back and go with Docker.

Docker for Windows Desktop

Back on the main work machine O/S – I install docker – wait what? You didn’t have Docker installed already? Well – no – I’d used it a while ago – but found it messed up my machine way too much. But hey – after 3 fails, it’s time for a win – and I need a win.

Crack open PowerShell and run:

docker run --name nifi-run <code>
-p 8080:8080</code><br>
-d `<br>
apache/nifi:latest

Open up Chrome and go to the NiFi homepage (with the expectation of failure) – and… we have a winner!

Having been caught out by this before – I stop and start it again, and it’s still working! MAGIC.

Adding the Graph Bundles

Now we have NiFi actually running, we need to get the Graph Bundles in place, and firstly, we need to download those, so go get them from the releases page of GitHub.

Download

You only need the first 3 for connecting to Neo4j, and why would you want to connect to another GraphDB eh? Oh yeah – Sadomasochism – forgot.

Stick these files into something simple to remember, in my case, I went with D:\Docker as we’ll need to reference them for the Docker container to be made.

Also – I’m adding an ‘Import’ volume to the Docker container – to allow me to pass data into NiFi – my initial intention was (and in many ways still is) to be able to read a CSV file from this folder – and insert that into Neo4j.

Create the Docker Container

Pretty much the same command as before, only this time I’m adding 4 -v parameters to the call. 3 of them are putting the .nar files (downloaded above) into the container, the last is the ‘Import’ folder.

docker run --name nifi `
-p 8080:8080 `
-v D:/Docker/nifi-graph-client-service-api-nar-1.10.0.nar:/opt/nifi/nifi-1.10.0/lib/nifi-graph-client-service-api-nar-1.10.0.nar `
-v D:/Docker/nifi-graph-nar-1.10.0.nar:/opt/nifi/nifi-1.10.0/lib/nifi-graph-nar-1.10.0.nar `
-v D:/Docker/nifi-neo4j-cypher-service-nar-1.10.0.nar:/opt/nifi/nifi-1.10.0/lib/nifi-neo4j-cypher-service-nar-1.10.0.nar `
-v D:/Docker/Import:/opt/nifi/nifi-1.10.0/data-in `
-d `
apache/nifi:latest

Aaaand,

Boom!

This seems like a good place to pause – we have NiFi running with the Graph bundles there, next time we’ll execute some queries against it.

Or. try.

Reactive Neo4j using .NET

Version 4.0 of Neo4j is being actively worked on, and aside from the new things in the database itself, the drivers get an update as well – and one of the big updates is the addition of a Reactive way to develop against the DB.

Now – I’ve not done reactive programming for a long time, I think I did play around with it when .NET 4 was first released, but I have no idea where that blog post has gone – so I may as well start as new.

I found it! Not the post, but the application – MousePath – which is now on GitHub: MousePath – aside from it ‘working’ it’s not performant in any way.

What is Rx/Reactive?

Reactive in .NET is all about the IObservable<T>/IObserver<T> interfaces. They’ve been around since .NET 4, but personally I’ve never really used them. They allow application code to react to data being pushed to it, rather than the more traditional way of requesting the data.

There’s a good book (Intro to Rx) which I will been using to work this out, which is freely available online: http://introtorx.com/ .

Starting off

For this project, we’re going to need the nuget package – which in this case isn’t Neo4j.Driver – but Neo4j.Driver.Reactive. When we add this to our project – and create a driver in the normal way- we can see we now have an ‘RxSession‘ which is an extension method of the IDriver.

So let’s create a reactive session and see what we can see.

RxSession

We get IObservable as opposed to the AsyncSession giving us Tasks

AsyncSession

Doing a Run-ner

So, back to our RxSession, lets do a basic version, just using Run

var session = Driver.RxSession();
var rxStatementResult = session.Run("MATCH (m:Movie) RETURN m.title");
rxStatementResult
    .Records()
    .Subscribe(
        record => Console.WriteLine("Got: " + record.Values["m.title"].As<string>())
    );

In here, we’re hooking up to the ol’ classic Movies database, and simply writing the titles to the screen. NB – Driver is a static property of type IDriver I have defined elsewhere.

The first two lines look pretty much like our normal code – the only real difference being the use of the ‘RxSession‘ as opposed to just ‘Session‘.

Run on an RxSession returns an IRxStatementResult – which has 3 methods we’re interested in, (well actually only 1 at the moment) – Records(), Consume() and Keys().

Records() gets us the records from the database, so the stuff we want to do things with, Consume() whips through those records so we can get an IResultSummary telling us what is going on, and Keys() gets us the keys that are returned, in the simple statement I’ve done – ‘m.title‘ is the only key.

Records() is what we’re using, as we want to deal with the data, Records() return us an IObservable<IRecord> and being IObservable – we need to Subscribe() to it to get the data. Subscribing means we will provide an IObserver that will be notified whenever an IRecord arrives.

In this case, we have the contents being written to the console. Aces.

Quitting

Being a console app – doing tiny amounts of work – I largely don’t need to worry about disposing of my resources, but let’s imagine resource usage is something we do care about. How do you go about disposing of your resources?

IDisposable? INosable! – the IRxSession doesn’t implement IDisposable, instead we have to Close<T>() it – and this is where things have got a little fuzzy for me – I’m not entirely sure I’m closing it correctly.

var session = Driver.RxSession();
var rxStatementResult = session.Run("MATCH (m:Movie) RETURN m.title");
rxStatementResult
    .Records()
    .Subscribe(
        record => Console.WriteLine("Got: " + record.Values["m.title"].As<string>()));

session.Close<IRecord>();

Now, I expect to get either no results, or a smaller subset (depending on the speed of the code running) – what I get is the close being called, but still getting the full amount of data – I suspect I misunderstand what is going on here.

Let’s say I do want a smaller subset – or to quit – how do I do it? Well, the Subscribe() method actually returns an IObservable<IRecord> – which is also IDisposable – so we ‘unsubscribe’ by disposing of our subscriber:

var session = Driver.RxSession();
var rxStatementResult = session.Run("MATCH (m:Movie) RETURN m.title");
var subscription = rxStatementResult
    .Records()
    .Subscribe(record => Console.WriteLine("Got: " + record.Values["m.title"].As<string>()));

await Task.Delay(220);
subscription.Dispose();
session.Close<IRecord>();

The ‘delay’ magic number there is enough time to get some records, but not all, anything less than that gave no results, anything more – all the results. ¯\_(ツ)_/¯

Ooook So – Why Rx?

It seems more complex right? Subscribe(), unsubscribe – no foreach in sight! What’s the point?

My understanding – and this could be/probably is wrong – is that by using Rx – we’re reducing our overheads – i.e. instead of streaming everything, we can just stream what we’re consuming at the time.

The other key benefits come from things like ‘.Buffer‘ and the other commands (Skip, Last, etc) allowing you to stream things in a better way.

One nice thing about Rx in .NET is that it’s not the same as async – you don’t have to have your entire stack in Rx to get the benefits – you can do bits and pieces where you need to – if you’ve got a lot of data maybe it makes sense for a given query.

Execute Cypher Task Updates

Last week, Anabranch released version 1.1 of the tools for Neo4j – which included a very welcome addition to the toolset – being able to pull data from a Neo4j instance.

After doing the demo post – I noticed a peculiarity – a quirk if you will with how the ‘Execute Cypher Task’ (see here) worked with multiple Neo4j Connection Managers defined – it would execute the Cypher against all the Connection Managers, not any specific one. This makes sense in some form – as a Control Flow Task doesn’t have a ‘Connection’ selector unlike a Data Flow Task.

Version 1.2 of the tools fixes this and tidies up the Execute Cypher Task to have a better user interface as well.

Adding Cypher

The Cypher box still has highlighting, but now lines up to the edges properly.

Choosing A Connection

You can choose from a drop down which connection you want to use. You will only see Neo4j Connection Managers here. The name of the Connection Manager will be the one you set it to.

Validation

You will get a red cross on your task if you’re missing things (in this case the connection). If you look in the ‘Error List’ you will be able to see all the errors:

Getting 1.2

To get version 1.2, please visit: http://bit.ly/neo4jssis register and you’ll be sent out the download link. Registration is only used to let you know of updates to the tools, no marketing!

Using a Data Flow to move data **from** Neo4j in SSIS

That’s right everyone! We’re going from Neo4j this time, and this is a new release, the old version (1.0.0.0) didn’t have a ‘Neo4j as a Source’ component, 1.1.0.0 does.

In the last post we took data from a file and ingested it into Neo4j, so far so good – but one of the things we were missing was the ability to also pull from Neo4j, now the circle is complete, and in this post – I’m going to show you how to pull from one Neo4j instance into another. That’s right – Neo4j to Neo4j!

As always – the video below shows the moving version of this post – but not everyone wants that.

The Setup

OK, more complex than normal, as we need multiple instances of Neo4j running – and whilst that’s not rocket science – it is more complex than normal. I don’t want to go into it particularly – but hey! I would run one DB in Neo4j Desktop, then download one of the server editions (for this Community will be just fine!)

Ports!

You need to change the ports on your new server, as the Desktop ones will be using 7474/7687 etc – So open up the neo4j.conf file and change the following settings:

These are the ports I’m using – but go crazy and pick whatever you want – it’s your database after all. Aaaanyhews – I’m going to assume you know how to start your server version of the database. If not – there’s loads of stuff online about how to do it – and if it becomes clear that we’re in a world of pain here – I’ll write one 🙂

Clear the DBs

WARNING!!! – which I don’t think we need – but here you go – make sure you know which DB you are doing this on! Don’t delete your production DB by mistake!!! (•_•)

On both the DBs we’re going to clear them, and add the ‘Movies’ demo set to one of them so – open up your browser window to both instances (http://localhost:7474 and http://localhost:7676 in my case) and execute:

MATCH (n) DETACH DELETE n

Then in one of the databases (and I will be using my 7474 database) execute:

:play movies

And step to the second step and put the movie data into your database. You can check the data is all there by running:

MATCH (n) RETURN COUNT(n)

You should get 171 nodes. OK, now we’re all set up and ready to go!

Let’s SSIS

As another assumption – I’m going with the fact that you know how to start up Visual Studio and create a new Package.

Let’s first add one connection:

And, obvs pick Neo4j:

Oooh – note the ‘version’ there as well – if yours says lower than that – then bad times 🙁

Rename it to something like ‘The Source’ or whatever you find memorable:

Make sure the user / pass and server are all correct:

Looking good! Now – repeat – for the other server – remembering the port will be different – and choosing a different name, something like ‘The Destination’ for example, and you should end up with this state of affairs:

Let’s add a ‘Data Flow’ to our package now, again you can rename if you want. I did, but don’t let that force you into doing anything:

Double click on it, and we’re into Data Flow design heaven!

Add the Source

Drag the ‘Execute Cypher Source’ component from the toolbox onto the page:

Double click on it to enter the ‘Edit’ page:

The Cypher we’re going to execute is:

MATCH (m:Movie) RETURN m.title AS title

Now – some TOP TIPS. This works best if you RETURN specific columns, SSIS doesn’t know what to do with a full node, and using the AS there makes the output columns easier to use.

Once you’ve got the Cypher – you need to select the Connection to use (see the picture) – which is why naming them nicely is SUPER useful.

Once you’ve done that, hit ‘Refresh’ to get the Output Columns populated:

Job done. Good work!

Add the Destination

No surprises for guessing this involves dragging the Destination to the page.

Next, join up the Source to the Destination:

The UI for this is not as fully fledged out as the other, so unfortunately we need to head into the Advanced Editor. So Right click on it, and open the Advanced Editor:

First we want to set the connection:

Again – naming!!

Then we’re going to go to the ‘Input’ tab and select our input from the Source:

Press OK to save all that, and then double click on the Destination item and go to the Cypher Editor:

First off – you can see the ‘title’ listed in the parameters, so that’s good – Cypher wise we’re doing a MERGE- so we only get one ‘Cloud Atlas’ (because no-one needs more than one of those).

MERGE (:Movie {title: $title})

At this point, we have our two things and no red crosses or errors anywhere, so let’s run it!

Run it!

No surprises – we press ‘Start’ and get the ‘liney’ version of the page which hopefully you see as:

38 rows (hahaha Rows!) and if you go to your ‘Destination’ database you should see the movies there.

I want it

Of course you do – these controls are currently in an open beta, to register to get the controls, please go to: http://bit.ly/neo4jssis

Using a Data Flow to move data from who knows where to Neo4j in SSIS

In what is rapidly becoming a series of posts – we look into another of the components in the Anabranch SSIS Components for Neo4j package. The last post looked at using the “Execute Cypher Task” from within a Control Flow, but that’s not so useful, I mean – it’s great for doing things like Deleting a DB, adding indexes etc, but when we want to get Data from one source to another, we gotta go all Data Flowy.

I’m working on the principle that you’ve gone through the last post, as well, I’m going to pick up from where we left off, and I make no apologies for my assumptions.

Clear the DB

I should mention – please check which DB instance you are connected to – nothing says ‘problem’ quite like deleting your production database.

Let’s first clear the Neo4j instance back to an empty state, run:

MATCH (n) DETACH DELETE n

In the browser.

Clear Package

We don’t want the Execute Cypher Task any more, so select it – and press Delete, or go all Mousey and right-click – the choice is yours

Deleting the mouse way

Let’s Data Flow (Task)!

Drag a Data Flow Task onto the Control Flow workspace:

Double click on the Task to be taken to the Data Flow workspace, which will be empty. So let’s drag a ‘Flat File Source’ to the space:

Double click on the Flat File Source, and the editor will pop up. We need to add a new Connection Manager, so press ‘New…’

Now, we want to use a CSV file, you can use the one I use by downloading from this link, it’s not very exciting I’m afraid, just some names 🙂 Anyhews – fill in the details that match your file (the ones in this picture match my file, the only thing I’ve changed from default is the Code page to be 65001 (UTF-8))

Then click on the ‘Columns’ bit on the left hand side, to make sure it all looks ok, and press OK. You’ll be back to the ‘Flat File Source Editor’ – and you should now click on the ‘Columns’ bit here too:

Make sure at least the First/Last names are checked here – obviously if you’re using your own file – pick your columns! Press OK and go back to the workspace.

Now drag an ‘Execute Cypher Destination’ task to the workspace:

Drag the ‘Blue arrow’ from the Flat File Source, and attach it to the Execute Cypher task:

Then, right click on the execute cypher task, and select ‘Show Advanced Editor…’

First, set the connection manager, we want to use our existing Neo4j Connection Manager

Then we want to select the ‘Input Columns’, just pick them all for now:

Press OK, and then Double click on the Execute Cypher Task, to get the Cypher Editor

Add the Cypher as I have above:

CREATE (:User {First: $FirstName, Last: $LastName})

And press OK.

Do some SSISing!

Now, all that’s left to do is press Start (or Right-click – Execute Task) whichever is your preference!

It’ll run, and give you the following:

Which you can check in your DB by running:

MATCH (n) RETURN COUNT(n)

Things are a bit more interesting now, as we’re pulling from a different source and putting into the database, obviously SSIS supports loads of sources – with

These controls are currently in an open beta, to register to get the controls, please go to: http://bit.ly/neo4jssis

Neo4j & SSIS – Connecting and executing Cypher in a Control Flow

Last Friday, Anabranch released the first beta version of it’s connector to Neo4j from SSIS. Aside from a post saying that it existed, I didn’t go into detail, so this is going to be a series of posts on how you can use your existing SSIS infrastructure with Neo4j.

Today we’re going to look at 2 parts of the connector, the Neo4j Connection Manager (CM) and the Execute Cypher Task (ECT). The CM is fundamental to all the controls, without it, you can’t connect to the database. I’ll go into what it does, settings etc in another post, but for now – it’s enough to know that it provides the connection. The ECT allows us to execute Cypher against a given connection manager.

** NOTE **
In version 1.0.0(beta) – the ECT will only work with the first CM you add to the package

This video covers the same topic as the text version below:

I’m going to develop this in Visual Studio 2017, at the time of writing – I found the 2019 SSIS packages to be a bit flakey, whereas the 2017 has been sturdy so far – from a ‘demo’ point of view though – the 2019 process is exactly the same after you have it all installed.

SETUP
If you’ve never developed against SSIS before, you’ll need a couple of things, firstly SSDT (specifically the Integration services bits), Visual Studio – I think the community edition should work, but I can’t confirm. You’ll also need the Anabranch Ssis Controls for Neo4j – assuming you’ve registered ( http://anabranch.co.uk/Projects/Neo4jSsis) and have the download link, you’ll want the 2017 x86 version of the controls – (for VS2019 as well!).

Download and install the controls. NB. You want to install these when Visual Studio isn’t running – as we’re in the heady world of the GAC here, and VS won’t find them unless it’s started with them there.

Do do this example yourself – you’ll also need a Neo4j database instance running, I’d recommend using the Neo4j Desktop as it makes it easier to manage the process.

Create your first package

1. Start up Visual Studio
2. Create a new Integration Services project

New Project…

3. In the new Package.dtsx file, we need to add a Connection Manager. Right click on the bottom ‘Connection Managers’ bar and add a Neo4j connection – if you don’t see it – you might have to restart Visual Studio, or possibly your machine.

Then select the Neo4j Connection:

You’ll now see it in the ‘Connection Managers’ section:

Select it – and change the connection properties to ones that match your database instance – at the moment this is done via the properties window:

At this stage, we have a connection – but we’re not using it, so let’s add a task to execute:

Drag the ‘Execute Cypher Task’ to the Control Flow, and double click on it. Then add the following Cypher:

CREATE (:Node {Id:1})

Press OK

Then we can execute the task:

Once that’s done:

If we go to our Neo4j Database, we can run:

MATCH (n:Node) RETURN n

If we look at the ‘Id’ property – we can see it is ‘1’

So. Now we have an SSIS integration package executing against a Neo4j database.

These controls are currently in an open beta, to register to get the controls, please go to: http://bit.ly/neo4jssis